Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log #6668

Merged
merged 3 commits into from
Apr 16, 2024

Conversation

YalikWang
Copy link
Contributor

Purpose of this pull request

Fix issue 6645 and 6665.
Reduce ThreadInterruptedException log print on the console.
Commit a correct offset to the broker when snapshot.

Does this PR introduce any user-facing change?

No

How was this patch tested?

test locally and test with existed test cases

Check list

@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 9, 2024

Thanks @YalikWang for created this PR! Is it possible to add some test cases for these bugs?

@YalikWang
Copy link
Contributor Author

Thanks @YalikWang for created this PR! Is it possible to add some test cases for these bugs?

Yes, I am willing to add some test cases. e2e or unit test ?Do you have any additional suggestions about it ?

@Hisoka-X Hisoka-X linked an issue Apr 9, 2024 that may be closed by this pull request
3 tasks
@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 9, 2024

Thanks @YalikWang for created this PR! Is it possible to add some test cases for these bugs?

Yes, I am willing to add some test cases. e2e or unit test ?Do you have any additional suggestions about it ?

E2E would be better. Because we should check offset in rocketmq after connector read data, this is only way to make sure same bug not happen again.

@Hisoka-X
Copy link
Member

The CI failed. Could you fix it?
image

@YalikWang
Copy link
Contributor Author

The CI failed. Could you fix it? image

I am trying. The reason for this failure is that I did not set up the correct consumer.group. I fixed it today, but the problem still exists: In E2E testing, when the batch job ends, the consume offset cannot be read through RocketmqAdminApi. I hope it can be resolved tomorrow。

@YalikWang
Copy link
Contributor Author

YalikWang commented Apr 11, 2024

@Hisoka-X
I found that,in e2e testing, rocketmq source connector won't submit consume offset when batch job finished with flink13. I did not test with zeta e2e because of env error.
This is my job conf:

env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 1000

  # You can set spark configuration here
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 1
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}

source {
  Rocketmq {
    name.srv.addr = "rocketmq-e2e:9876"
    topics = "test_topic_text_offset"
    result_table_name = "rocketmq_table"
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    format = text
    # The default field delimiter is ","
    field_delimiter = ","
  }
}

transform {
}

sink {
  Console{
    source_table_name = "rocketmq_table"
  }
}

And this is the consumerOffset.json in rocektmq broker container (4.9.4):

 cat consumerOffset.json
{
        "offsetTable":{
                "test_topic_text_offset@SeaTunnel-Consumer-Group":{0:0,1:0,2:0,3:0
                }
        }
}

This is topic status in rocektmq broker container (4.9.4):
./mqadmin topicStatus -n localhost:9876 -t test_topic_text_offset

#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 10
broker-a 1 0 0
broker-a 2 0 0
broker-a 3 0 0

If I call the function to submit offset when there is no more element (RocketMqSourceReader#pollNext) or reader close (RocketMqSourceReader#pollNext), it does work. But I'm not sure if this modification is appropriate. Could you give me some advice about it ?

@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 11, 2024

I found that,in e2e testing, rocketmq source connector won't submit consume offset when batch job finished with flink13.

In batch mode, this is a known issue, we can skip it for now. It should work fine in zeta

If I call the function to submit offset when there is no more element (RocketMqSourceReader#pollNext) or reader close (RocketMqSourceReader#pollNext), it does work. But I'm not sure if this modification is appropriate. Could you give me some advice about it ?

Offset only can be submited in notifyCheckpointComplete.

@YalikWang
Copy link
Contributor Author

I have rolled back the submission regarding rocketmq consume offset verification in the e2e test, and now the CI has passed. Perhaps in the future, we can add this check in e2e test.

@Hisoka-X
Copy link
Member

I have rolled back the submission regarding rocketmq consume offset verification in the e2e test, and now the CI has passed. Perhaps in the future, we can add this check in e2e test.

How about only verify it in the zeta?

@YalikWang
Copy link
Contributor Author

I have rolled back the submission regarding rocketmq consume offset verification in the e2e test, and now the CI has passed. Perhaps in the future, we can add this check in e2e test.

How about only verify it in the zeta?

I added an e2e test case to check the consumption offset, which only runs with zeta.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over all LGTM.

@hailin0 hailin0 merged commit b7480e1 into apache:dev Apr 16, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants