Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roketmq can not consumer topic in cluster, it's ok in idea #108

Open
luckydarnell opened this issue Jan 5, 2024 · 4 comments
Open

roketmq can not consumer topic in cluster, it's ok in idea #108

luckydarnell opened this issue Jan 5, 2024 · 4 comments

Comments

@luckydarnell
Copy link

luckydarnell commented Jan 5, 2024

In standalone cluster

  • environment:

java version "1.8.0_271" + flink-1.14.6 + rocketmq 4.9.2

  • code
   // use
    dataStream = env.addSource(createMqsource(cfg, dataSource));
   // create RocketMQSource
    public static RocketMQSource<TransData> createMqsource(SourceCfg cfg, DataSourceCfg dataSource) {
        // 构造mq地址
        String[] hostNameSplit = dataSource.getHostname().split(SymbolConstant.SEPARATOR_SEMI_COLON);
        String addr = null;
        for (String s : hostNameSplit) {
            String joinStr = s + SymbolConstant.SEPARATOR_COLON + dataSource.getPort();
            addr = StringUtils.isEmpty(addr) ? joinStr : addr + SymbolConstant.SEPARATOR_SEMI_COLON + joinStr;
        }
        DeserializationSchema<List<MessageExt>, TransData> deserializationSchema = MqDeserializationSchemaFactory.create(cfg);
        // 判断启动模式
        OffsetResetStrategy startUpMode = cfg.getStartupMode() == null || cfg.getStartupMode().equals(StartupModeEnum.LATEST) ?
                OffsetResetStrategy.LATEST : OffsetResetStrategy.EARLIEST;

        RocketMQSourceBuilder<TransData> builder = new RocketMQSourceBuilder<TransData>()
                .setNameServerAddress(addr)
                .setConsumerGroup(cfg.getConsumerGroup())
                .setTopic(cfg.getTopic())
                .setTag(cfg.getTag())
                .setStartFromGroupOffsets(startUpMode)
                .setDeserializer(new RocketMQValueOnlyDeserializationSchemaWrapper<>(deserializationSchema));
        log.info("addr:{}, group:{}, tag:{}", addr, cfg.getConsumerGroup(), cfg.getTag());
        return builder.build();
    }
  • exception infos:

2024-01-05 13:48:01,579 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator [] - Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery.

2024-01-05 13:48:01,589 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: T_AC_RE_WZ_JJJZ(MQ)- registering reader for parallel task 0 @ 127.0.0.1
2024-01-05 13:48:01,886 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true
2024-01-05 13:48:01,887 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true
2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true
2024-01-05 13:48:01,889 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true
2024-01-05 13:48:01,890 INFO RocketmqRemoting [] - closeChannel: close the connection to remote address[10.200.38.118:9876] result: true
2024-01-05 13:48:01,887 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exception while handling result from async call in SourceCoordinator-Source: S(MQ)-. Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to
at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.handlePartitionSplitChanges(RocketMQSourceEnumerator.java:279) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) ~[flink-runtime-1.14.6.jar:1.14.6]
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-core-1.14.6.jar:1.14.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_271]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_271]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_271]
Caused by: org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, FPC_SYNC_FI_AC_REPTILE_1
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:177) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6]
... 7 more
Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <10.200.38.118:9876> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.9.2.jar:4.9.2]
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:377) ~[rocketmq-remoting-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.impl.MQAdminImpl.fetchSubscribeMessageQueues(MQAdminImpl.java:166) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.fetchSubscribeMessageQueues(DefaultMQPullConsumerImpl.java:147) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.fetchSubscribeMessageQueues(DefaultMQPullConsumer.java:290) ~[rocketmq-client-4.9.2.jar:4.9.2]
at org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator.discoverAndInitializePartitionSplit(RocketMQSourceEnumerator.java:248) ~[flink-rocketmq-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-runtime-1.14.6.jar:1.14.6]
... 7 more

@luckydarnell
Copy link
Author

luckydarnell commented Jan 5, 2024

In IntelliJ,every thing is ok,the log:

24/01/05 10:07:16 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Starting the RocketMQSourceEnumerator for consumer group X without periodic partition discovery.
24/01/05 10:07:16 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator: Source Source: s(MQ)- registering reader for parallel task 0 @
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=0] has no committed offset,use Strategy:LATEST instead
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-0 start from offset of: 0
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=2] has no committed offset,use Strategy:LATEST instead
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-2 start from offset of: 0
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=1] has no committed offset,use Strategy:LATEST instead
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-1 start from offset of: 0
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer thread:MessageQueue [topic=topicx, brokerName=broker-a, queueId=3] has no committed offset,use Strategy:LATEST instead
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils: current consumer queue:broker-a-3 start from offset of: 0
24/01/05 10:07:17 INFO org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator: Assigning splits to readers {0=[[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]]}
24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Adding split(s) to reader: [[Topic: topicx, Broker: broker-a, Partition: 1, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 0, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 3, StartingOffset: 0, StoppingTimestamp: 9223372036854775807], [Topic: topicx, Broker: broker-a, Partition: 2, StartingOffset: 0, StoppingTimestamp: 9223372036854775807]]
24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase: Reader received NoMoreSplits event.
24/01/05 10:07:17 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher: Starting split fetcher 0

helpe me

@wsczm
Copy link

wsczm commented Jan 5, 2024

修改死信队列perm值为6,使消息可读

@luckydarnell
Copy link
Author

修改死信队列perm值为6,使消息可读
1、没有创建死信队列
2、队列本身的perm值为6
现在的问题是我在idea里面跑是正常,启动集群在集群里面跑就有问题

@luckydarnell
Copy link
Author

I have tried several times and found that RocketMQSourceFunction interface can run normally。so I guess there are some bugs in the RocketMQSource interface。

  • code
    public static RocketMQSourceFunction<TransData> createMqsource2(SourceCfg cfg, DataSourceCfg dataSource) {
        // 构造mq地址
        String[] hostNameSplit = dataSource.getHostname().split(SymbolConstant.SEPARATOR_SEMI_COLON);
        String addr = null;
        for (String s : hostNameSplit) {
            String joinStr = s + SymbolConstant.SEPARATOR_COLON + dataSource.getPort();
            addr = StringUtils.isEmpty(addr) ? joinStr : addr + SymbolConstant.SEPARATOR_SEMI_COLON + joinStr;
        }
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, addr);
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, cfg.getTag());
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, cfg.getTopic());
        KeyValueDeserializationSchema<TransData> deserializationSchema = MqDeserializationSchemaFactory.crateMqDeserializationSchema(cfg);
        // 判断启动模式
        OffsetResetStrategy startUpMode = cfg.getStartupMode() == null || cfg.getStartupMode().equals(StartupModeEnum.LATEST) ?
                OffsetResetStrategy.LATEST : OffsetResetStrategy.EARLIEST;

        RocketMQSourceFunction<TransData> source = new RocketMQSourceFunction(deserializationSchema, consumerProps);
        // use group offsets.
        // If there is no committed offset,consumer would start from the latest offset.
        source.setStartFromGroupOffsets(startUpMode);
        log.info("addr:{}, group:{}, tag:{}", addr, cfg.getConsumerGroup(), cfg.getTag());
        return source;
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants