Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] topic route changed,job can't recover normally #83

Open
deemogsw opened this issue Jan 17, 2023 · 6 comments
Open

[BUG] topic route changed,job can't recover normally #83

deemogsw opened this issue Jan 17, 2023 · 6 comments

Comments

@deemogsw
Copy link
Contributor

deemogsw commented Jan 17, 2023

1.Add partition
Flink will check partition is changed in function of snapshotState.If route changed,job will switch state from running to failed.
Then job will recover from checkpoint without new message queue record.Offset table alse has none new message queue,NPE will throw in RockertMQSourceFunction$run
image

2.Decrease partition
If partition decreased,can't connect execption will throw in RockertMQSourceFunction$run.After five reries,function of run will close normally.Job will switch state from running to finished rather than failed because thread poll will swallow the connection exception
image

@deemogsw
Copy link
Contributor Author

Refer to the following commit
deemogsw/rocketMQ-flink-connector@b070213

@SOD-DOB
Copy link

SOD-DOB commented Feb 1, 2023

I also encountered the same problem. After modification according to the above link, has the problem been fixed? @deemogsw

@deemogsw
Copy link
Contributor Author

deemogsw commented Feb 6, 2023

@SOD-DOB
Fixed! But it just work for RokcetMQSourceFounction.
You can merge above commit to your own code or using the latest code in my private repertory.

@SOD-DOB
Copy link

SOD-DOB commented Feb 27, 2023

@deemogsw
Have you ever encountered this kind of problem? It looks like I timed out while pulling messages, but nothing unusual was found on the server side of rocketmq

rocketmq-client version: 4.5.2

2023-02-27 08:47:14,109 WARN org.apache.rocketmq.flink.legacy.common.util.RetryUtil [] - RuntimeException, retry 5/5 java.lang.RuntimeException: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:389) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.common.util.RetryUtil.call(RetryUtil.java:52) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$run$2(RocketMQSourceFunction.java:279) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302] Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <xx:10911> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:429) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:375) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:737) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:691) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:199) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:249) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:529) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:364) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] at org.apache.rocketmq.flink.legacy.RocketMQSourceFunction.lambda$null$1(RocketMQSourceFunction.java:288) ~[blob_p-62baae343e6f0a7fca0f63942204ff47885115d5-f336abddcb7f2a64f472c7c5d92f1394:?] ... 5 more

@deemogsw
Copy link
Contributor Author

deemogsw commented Mar 6, 2023

@SOD-DOB This exception looks like an inside error in RMQ broker.You can check the log in this machine.
[send request to xx:10911 ]

@humkum
Copy link
Contributor

humkum commented Mar 12, 2024

See #96

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants