Skip to content

Commit

Permalink
fixed issue #3267 , fixed rockemq send null message
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Apr 17, 2021
1 parent 85df89f commit 64ea4ff
Showing 1 changed file with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand All @@ -26,6 +28,8 @@
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.utils.ExecutorTemplate;
import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils;
Expand All @@ -46,10 +50,11 @@
@SPI("rocketmq")
public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQProducer {

private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);

private DefaultMQProducer defaultMQProducer;
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
private DefaultMQProducer defaultMQProducer;
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
protected ThreadPoolExecutor sendPartitionExecutor;

@Override
public void init(Properties properties) {
Expand Down Expand Up @@ -85,6 +90,15 @@ public void init(Properties properties) {
} catch (MQClientException ex) {
throw new CanalException("Start RocketMQ producer error", ex);
}

int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
parallelPartitionSendThreadSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(parallelPartitionSendThreadSize * 2),
new NamedThreadFactory("MQ-Parallel-Sender-Partition"),
new ThreadPoolExecutor.CallerRunsPolicy());
}

private void loadRocketMQProperties(Properties properties) {
Expand All @@ -99,11 +113,13 @@ private void loadRocketMQProperties(Properties properties) {
if (!StringUtils.isEmpty(producerGroup)) {
rocketMQProperties.setProducerGroup(producerGroup);
}
String enableMessageTrace = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
String enableMessageTrace = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
if (!StringUtils.isEmpty(enableMessageTrace)) {
rocketMQProperties.setEnableMessageTrace(Boolean.parseBoolean(enableMessageTrace));
}
String customizedTraceTopic = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
String customizedTraceTopic = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
if (!StringUtils.isEmpty(customizedTraceTopic)) {
rocketMQProperties.setCustomizedTraceTopic(customizedTraceTopic);
}
Expand All @@ -119,7 +135,8 @@ private void loadRocketMQProperties(Properties properties) {
if (!StringUtils.isEmpty(retry)) {
rocketMQProperties.setRetryTimesWhenSendFailed(Integer.parseInt(retry));
}
String vipChannelEnabled = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
String vipChannelEnabled = PropertiesUtils.getProperty(properties,
RocketMQConstants.ROCKETMQ_VIP_CHANNEL_ENABLED);
if (!StringUtils.isEmpty(vipChannelEnabled)) {
rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
}
Expand Down Expand Up @@ -163,7 +180,8 @@ public void send(MQDestination destination, com.alibaba.otter.canal.protocol.Mes

public void send(final MQDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName, destination.getDynamicTopicPartitionNum());
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
Expand All @@ -179,7 +197,7 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
mqProperties.isDatabaseHash());
int length = messages.length;

ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < length; i++) {
com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
if (dataPartition != null) {
Expand Down Expand Up @@ -218,14 +236,17 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}

ExecutorTemplate template = new ExecutorTemplate(sendExecutor);
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null) {
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
Expand Down Expand Up @@ -316,6 +337,10 @@ private void sendMessage(List<Message> messages, int partition) {
public void stop() {
logger.info("## Stop RocketMQ producer##");
this.defaultMQProducer.shutdown();
if (sendPartitionExecutor != null) {
sendPartitionExecutor.shutdownNow();
}

super.stop();
}
}

0 comments on commit 64ea4ff

Please sign in to comment.