Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canal.mq.partitionsNum 配置后,rocketmq的queue都发了消息,现象好像是广播 #3267

Closed
gujiachun opened this issue Dec 21, 2020 · 4 comments
Assignees
Labels
Milestone

Comments

@gujiachun
Copy link

canal server 版本为 1.1.5-snapshot

canal中的instance配置文件中,配置了

mq config

canal.mq.topic=customer_info
canal.mq.partitionsNum=4
canal.mq.partitionHash=scb_customer.customer_info:sid,scb_customer.label_customer:sid

新增customer_info表一条记录

Broker | 队列 | 消费者终端 | 代理者位点 | 消费者位点 | 差值 | 上次时间
broker-a | 0 |   | 2 | 0 | 2 | 2020-12-10 22:34:24
broker-a | 1 |   | 2 | 0 | 2 | 1970-01-01 08:00:00
broker-a | 2 |   | 2 | 0 | 2 | 1970-01-01 08:00:00
broker-a | 3 |   | 2 | 0 | 2 | 1970-01-01 08:00:00

上面的4个队列 都增加了消息;;;每新增一条记录,4个队列都插入消息

也就是会产生4个消息(只有1个消息的消息body里面的内容是正确的;其他3个消息 MessageBody 为 null )

@gujiachun
Copy link
Author

原因应该找到了

// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List> partitionFlatMessages = new ArrayList<>();
for (int i = 0; i < destination.getPartitionsNum(); i++) {
partitionFlatMessages.add(new ArrayList<>());
}

            for (FlatMessage flatMessage : flatMessages) {
                FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                    partitionNum,
                    destination.getPartitionHash(),
                    mqProperties.isDatabaseHash());
                int length = partitionFlatMessage.length;
                for (int i = 0; i < length; i++) {
                    partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                }
            }

            ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
            for (int i = 0; i < partitionFlatMessages.size(); i++) {
                final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                **if (flatMessagePart != null) {**
                    final int index = i;
                    template.submit(() -> {
                        List<Message> messages = flatMessagePart.stream()
                            .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
                                SerializerFeature.WriteMapNullValue)))
                            .collect(Collectors.toList());
                        // 批量发送
                        sendMessage(messages, index);
                    });
                }
            }

源码中
if (flatMessagePart != null) 这个判断 有问题;;这个是不可能为null的;调试后,[null];(有值,只是值为null而已)

这个是上面的2个for循环导致的

@gujiachun
Copy link
Author

改了源码,修复了此bug了
改动的地方

for (int i = 0; i < length; i++) {
          if (partitionFlatMessage[i] != null) {//增加null判断
              partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
	}
}

改动的第二个地方

if (flatMessagePart != null && flatMessagePart.size() > 0) {//判断加上size要大于0
                        final int index = i;
                        template.submit(() -> {
                            List<Message> messages = flatMessagePart.stream()
                                .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
                                    SerializerFeature.WriteMapNullValue)))
                                .collect(Collectors.toList());
                            // 批量发送
                            sendMessage(messages, index);
                        });
}

@Beauxie
Copy link

Beauxie commented Dec 22, 2020

最近同样也遇到了这个问题,采用的是RocketMQ,flatMessage=true,将partitionsNum设置为16,根据记录主键Hash分区,每次修改一条数据以后,根据hash策略会往一个队列里面塞入一条数据,但是其它15个队列,会发送15条消息体为"null"的数据,其根本原因在于CanalRocketMQProducer类中的以下代码(211行):

FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());

messagePartition方法内部实现,每次都会根据partitionNum初始化一个new FlatMessage[partitionsNum]的数组,但数组中有且只有一个元素有数据,其余的都为null,然后227~230行在代码中构建Message:

List<Message> messages = flatMessagePart.stream()
                                .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
                                    SerializerFeature.WriteMapNullValue)))
                                .collect(Collectors.toList());

在循环构建的过程中,flatMessage可能为空,但并没有进行处理,而JSON.toJSONBytes会将null数据处理成字符串"null"。
目前本人的解决方案就是在循环构建的过程中,加上filter(Objects::nonNull),来过滤掉null的flatMessage:

                            List<Message> messages = flatMessagePart.stream().filter(Objects::nonNull)
                                .map(flatMessage -> new Message(topicName, JSON.toJSONBytes(flatMessage,
                                    SerializerFeature.WriteMapNullValue)))
                                .collect(Collectors.toList());

重新打包部署以后,问题就解决了

@agapple agapple self-assigned this Apr 17, 2021
@agapple agapple added the bug label Apr 17, 2021
@agapple agapple added this to the v1.1.5 milestone Apr 17, 2021
@agapple agapple closed this as completed Apr 17, 2021
@wdqzdd
Copy link

wdqzdd commented Sep 14, 2021

我也遇到了。感觉是突然出来的。好好的,就突然不行了。就是因为重启了一下

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants