diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java index 6d0d9794c1..d72b756bf7 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java @@ -10,56 +10,55 @@ */ public class CanalConstants { - public static final String MDC_DESTINATION = "destination"; - public static final String ROOT = "canal"; - public static final String CANAL_ID = ROOT + "." + "id"; - public static final String CANAL_IP = ROOT + "." + "ip"; - public static final String CANAL_PORT = ROOT + "." + "port"; - public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port"; - public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers"; - public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty"; + public static final String MDC_DESTINATION = "destination"; + public static final String ROOT = "canal"; + public static final String CANAL_ID = ROOT + "." + "id"; + public static final String CANAL_IP = ROOT + "." + "ip"; + public static final String CANAL_PORT = ROOT + "." + "port"; + public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port"; + public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers"; + public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty"; - public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations"; - public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan"; - public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval"; - public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir"; - public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode"; + public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations"; + public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan"; + public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval"; + public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir"; + public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode"; - public static final String CANAL_DESTINATION_SPLIT = ","; - public static final String GLOBAL_NAME = "global"; + public static final String CANAL_DESTINATION_SPLIT = ","; + public static final String GLOBAL_NAME = "global"; - public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode"; - public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy"; - public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address"; - public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml"; + public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode"; + public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy"; + public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address"; + public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml"; - public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination"; + public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination"; - public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel"; + public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel"; - public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers"; - public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries"; - public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize"; - public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs"; - public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize"; - public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory"; - public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize"; - public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout"; - public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage"; - public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType"; - public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks"; - public static final String CANAL_MQ_TRANSACTION = ROOT + "." + "mq.transaction"; - public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup"; - public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey"; - public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey"; - public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties"; - public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace"; - public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel"; - public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic"; - public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace"; - public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable"; - public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath"; - public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath"; + public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers"; + public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries"; + public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize"; + public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs"; + public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize"; + public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory"; + public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize"; + public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout"; + public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage"; + public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType"; + public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks"; + public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup"; + public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey"; + public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey"; + public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties"; + public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace"; + public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel"; + public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic"; + public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace"; + public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable"; + public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath"; + public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath"; public static String getInstanceModeKey(String destination) { return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination); diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java index 09a227be51..1528a47c6a 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalStater.java @@ -1,5 +1,15 @@ package com.alibaba.otter.canal.deployer; +import java.io.File; +import java.io.FileFilter; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.alibaba.otter.canal.common.MQProperties; import com.alibaba.otter.canal.kafka.CanalKafkaProducer; import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer; @@ -8,15 +18,6 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileFilter; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; /** * Canal server 启动类 @@ -64,8 +65,8 @@ synchronized void start(Properties properties) throws Throwable { public boolean accept(File pathname) { String filename = pathname.getName(); - return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename) && - !"metrics".equalsIgnoreCase(filename); + return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename) + && !"metrics".equalsIgnoreCase(filename); } }); if (instanceDirs != null && instanceDirs.length > 0) { @@ -196,17 +197,14 @@ private static MQProperties buildMQProperties(Properties properties) { if (!StringUtils.isEmpty(aliyunSecretKey)) { mqProperties.setAliyunSecretKey(aliyunSecretKey); } - String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION); - if (!StringUtils.isEmpty(transaction)) { - mqProperties.setTransaction(Boolean.valueOf(transaction)); - } String producerGroup = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_PRODUCERGROUP); if (!StringUtils.isEmpty(producerGroup)) { mqProperties.setProducerGroup(producerGroup); } - String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE); + String enableMessageTrace = CanalController.getProperty(properties, + CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE); if (!StringUtils.isEmpty(enableMessageTrace)) { mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace)); } @@ -216,7 +214,8 @@ private static MQProperties buildMQProperties(Properties properties) { mqProperties.setAccessChannel(accessChannel); } - String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC); + String customizedTraceTopic = CanalController.getProperty(properties, + CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC); if (!StringUtils.isEmpty(customizedTraceTopic)) { mqProperties.setCustomizedTraceTopic(customizedTraceTopic); } @@ -226,17 +225,20 @@ private static MQProperties buildMQProperties(Properties properties) { mqProperties.setNamespace(namespace); } - String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE); + String kafkaKerberosEnable = CanalController.getProperty(properties, + CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE); if (!StringUtils.isEmpty(kafkaKerberosEnable)) { mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable)); } - String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH); + String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, + CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH); if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) { mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath); } - String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH); + String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, + CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH); if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) { mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath); } diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index 106005f1f4..777388a907 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -115,8 +115,6 @@ canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all -# use transaction for kafka flatMessage batch produce -canal.mq.transaction = true #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. diff --git a/server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java b/server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java index 41e4168470..c147c4c5aa 100644 --- a/server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java +++ b/server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java @@ -25,15 +25,14 @@ public class MQProperties { private String acks = "all"; private String aliyunAccessKey = ""; private String aliyunSecretKey = ""; - private boolean transaction = false; // 是否开启事务 private Properties properties = new Properties(); private boolean enableMessageTrace = false; private String accessChannel = null; private String customizedTraceTopic = null; private String namespace = ""; - private boolean kerberosEnable = false; //kafka集群是否启动Kerberos认证 - private String kerberosKrb5FilePath = ""; //启动Kerberos认证时配置为krb5.conf文件的路径 - private String kerberosJaasFilePath = ""; //启动Kerberos认证时配置为jaas.conf文件的路径 + private boolean kerberosEnable = false; // kafka集群是否启动Kerberos认证 + private String kerberosKrb5FilePath = ""; // 启动Kerberos认证时配置为krb5.conf文件的路径 + private String kerberosJaasFilePath = ""; // 启动Kerberos认证时配置为jaas.conf文件的路径 public static class CanalDestination { @@ -213,14 +212,6 @@ public void setMaxRequestSize(int maxRequestSize) { this.maxRequestSize = maxRequestSize; } - public boolean getTransaction() { - return transaction; - } - - public void setTransaction(boolean transaction) { - this.transaction = transaction; - } - public Properties getProperties() { return properties; } @@ -285,32 +276,17 @@ public void setKerberosJaasFilePath(String kerberosJaasFilePath) { this.kerberosJaasFilePath = kerberosJaasFilePath; } - @Override public String toString() { - return "MQProperties{" + - "servers='" + servers + '\'' + - ", retries=" + retries + - ", batchSize=" + batchSize + - ", lingerMs=" + lingerMs + - ", maxRequestSize=" + maxRequestSize + - ", bufferMemory=" + bufferMemory + - ", filterTransactionEntry=" + filterTransactionEntry + - ", producerGroup='" + producerGroup + '\'' + - ", canalBatchSize=" + canalBatchSize + - ", canalGetTimeout=" + canalGetTimeout + - ", flatMessage=" + flatMessage + - ", compressionType='" + compressionType + '\'' + - ", acks='" + acks + '\'' + - ", aliyunAccessKey='" + aliyunAccessKey + '\'' + - ", aliyunSecretKey='" + aliyunSecretKey + '\'' + - ", transaction=" + transaction + - ", properties=" + properties + - ", enableMessageTrace=" + enableMessageTrace + - ", accessChannel='" + accessChannel + '\'' + - ", customizedTraceTopic='" + customizedTraceTopic + '\'' + - ", namespace='" + namespace + '\'' + - ", kerberosEnable='" + kerberosEnable + '\'' + - ", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' + - ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' + - '}'; + @Override + public String toString() { + return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize + + ", lingerMs=" + lingerMs + ", maxRequestSize=" + maxRequestSize + ", bufferMemory=" + bufferMemory + + ", filterTransactionEntry=" + filterTransactionEntry + ", producerGroup='" + producerGroup + '\'' + + ", canalBatchSize=" + canalBatchSize + ", canalGetTimeout=" + canalGetTimeout + ", flatMessage=" + + flatMessage + ", compressionType='" + compressionType + '\'' + ", acks='" + acks + '\'' + + ", aliyunAccessKey='" + aliyunAccessKey + '\'' + ", aliyunSecretKey='" + aliyunSecretKey + '\'' + + ", properties=" + properties + ", enableMessageTrace=" + enableMessageTrace + ", accessChannel='" + + accessChannel + '\'' + ", customizedTraceTopic='" + customizedTraceTopic + '\'' + ", namespace='" + + namespace + '\'' + ", kerberosEnable='" + kerberosEnable + '\'' + ", kerberosKrb5FilePath='" + + kerberosKrb5FilePath + '\'' + ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' + '}'; } } diff --git a/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java b/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java index 99f2010aa0..b947c00d84 100644 --- a/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java +++ b/server/src/main/java/com/alibaba/otter/canal/kafka/CanalKafkaProducer.java @@ -1,12 +1,13 @@ package com.alibaba.otter.canal.kafka; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; -import com.alibaba.otter.canal.common.MQMessageUtils; -import com.alibaba.otter.canal.common.MQProperties; -import com.alibaba.otter.canal.protocol.FlatMessage; -import com.alibaba.otter.canal.protocol.Message; -import com.alibaba.otter.canal.spi.CanalMQProducer; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -15,12 +16,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.otter.canal.common.MQMessageUtils; +import com.alibaba.otter.canal.common.MQProperties; +import com.alibaba.otter.canal.protocol.FlatMessage; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.spi.CanalMQProducer; /** * kafka producer 主操作类 @@ -53,26 +55,18 @@ public void init(MQProperties kafkaProperties) { if (!kafkaProperties.getProperties().isEmpty()) { properties.putAll(kafkaProperties.getProperties()); } - - if (kafkaProperties.getTransaction()) { - properties.put("transactional.id", "canal-transactional-id"); - } else { - properties.put("retries", kafkaProperties.getRetries()); - } - - if (kafkaProperties.isKerberosEnable()){ + properties.put("retries", kafkaProperties.getRetries()); + if (kafkaProperties.isKerberosEnable()) { File krb5File = new File(kafkaProperties.getKerberosKrb5FilePath()); File jaasFile = new File(kafkaProperties.getKerberosJaasFilePath()); - if(krb5File.exists() && jaasFile.exists()){ - //配置kerberos认证,需要使用绝对路径 - System.setProperty("java.security.krb5.conf", - krb5File.getAbsolutePath()); - System.setProperty("java.security.auth.login.config", - jaasFile.getAbsolutePath()); + if (krb5File.exists() && jaasFile.exists()) { + // 配置kerberos认证,需要使用绝对路径 + System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); + System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath()); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.kerberos.service.name", "kafka"); - }else{ + } else { String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it"; logger.error(errorMsg); throw new RuntimeException(errorMsg); @@ -86,13 +80,6 @@ public void init(MQProperties kafkaProperties) { properties.put("value.serializer", StringSerializer.class.getName()); producer2 = new KafkaProducer(properties); } - if (kafkaProperties.getTransaction()) { - if (!kafkaProperties.getFlatMessage()) { - producer.initTransactions(); - } else { - producer2.initTransactions(); - } - } } @Override @@ -114,18 +101,7 @@ public void stop() { @Override public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) { - // 开启事务,需要kafka版本支持 - Producer producerTmp; - if (!kafkaProperties.getFlatMessage()) { - producerTmp = producer; - } else { - producerTmp = producer2; - } - try { - if (kafkaProperties.getTransaction()) { - producerTmp.beginTransaction(); - } if (!StringUtils.isEmpty(canalDestination.getDynamicTopic())) { // 动态topic Map messageMap = MQMessageUtils.messageTopics(message, @@ -143,19 +119,9 @@ public void send(MQProperties.CanalDestination canalDestination, Message message } else { send(canalDestination, canalDestination.getTopic(), message); } - if (kafkaProperties.getTransaction()) { - producerTmp.commitTransaction(); - } callback.commit(); } catch (Throwable e) { logger.error(e.getMessage(), e); - if (kafkaProperties.getTransaction()) { - try { - producerTmp.abortTransaction(); - } catch (Exception e1) { - logger.error(e1.getMessage(), e1); - } - } callback.rollback(); } } @@ -163,7 +129,7 @@ public void send(MQProperties.CanalDestination canalDestination, Message message private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message) throws Exception { if (!kafkaProperties.getFlatMessage()) { - List> records = new ArrayList>(); + List records = new ArrayList(); if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) { Message[] messages = MQMessageUtils.messagePartition(message, canalDestination.getPartitionsNum(), @@ -180,18 +146,11 @@ private void send(MQProperties.CanalDestination canalDestination, String topicNa records.add(new ProducerRecord(topicName, partition, null, message)); } - if (!records.isEmpty()) { - for (ProducerRecord record : records) { - producer.send(record).get(); - } - - if (logger.isDebugEnabled()) { - logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, message.toString()); - } - } + produce(topicName, records, false); } else { // 发送扁平数据json List flatMessages = MQMessageUtils.messageConverter(message); + List records = new ArrayList(); if (flatMessages != null) { for (FlatMessage flatMessage : flatMessages) { if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) { @@ -202,34 +161,59 @@ private void send(MQProperties.CanalDestination canalDestination, String topicNa for (int i = 0; i < length; i++) { FlatMessage flatMessagePart = partitionFlatMessage[i]; if (flatMessagePart != null) { - produce(topicName, i, flatMessagePart); + records.add(new ProducerRecord(topicName, + i, + null, + JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue))); } } } else { final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0; - produce(topicName, partition, flatMessage); + records.add(new ProducerRecord(topicName, + partition, + null, + JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue))); } - if (logger.isDebugEnabled()) { - logger.debug("Send flat message to kafka topic: [{}], packet: {}", - topicName, - JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)); - } + // 每条记录需要flush + produce(topicName, records, true); } } } } - private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException, - InterruptedException { - ProducerRecord record = new ProducerRecord(topicName, - partition, - null, - JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)); - if (kafkaProperties.getTransaction()) { - producer2.send(record); + private void produce(String topicName, List records, boolean flatMessage) { + + Producer producerTmp = null; + if (flatMessage) { + producerTmp = producer2; } else { - producer2.send(record).get(); + producerTmp = producer; + } + + List futures = new ArrayList(); + try { + // 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题 + for (ProducerRecord record : records) { + futures.add(producerTmp.send(record)); + } + } finally { + if (logger.isDebugEnabled()) { + for (ProducerRecord record : records) { + logger.debug("Send message to kafka topic: [{}], packet: {}", topicName, record.toString()); + } + } + // 批量刷出 + producerTmp.flush(); + + // flush操作也有可能是发送失败,这里需要异步关注一下发送结果,针对有异常的直接出发rollback + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } } } diff --git a/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java b/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java index e16ab3d3a1..287edb2f11 100644 --- a/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java +++ b/server/src/main/java/com/alibaba/otter/canal/rocketmq/CanalRocketMQProducer.java @@ -18,6 +18,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.otter.canal.common.CanalMessageSerializer; @@ -29,7 +30,7 @@ public class CanalRocketMQProducer implements CanalMQProducer { - private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class); + private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class); private DefaultMQProducer defaultMQProducer; private MQProperties mqProperties; private static final String CLOUD_ACCESS_CHANNEL = "cloud"; @@ -46,11 +47,14 @@ public void init(MQProperties rocketMQProperties) { rpcHook = new AclClientRPCHook(sessionCredentials); } - defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook, mqProperties.isEnableMessageTrace(), mqProperties.getCustomizedTraceTopic()); - if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())){ + defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), + rpcHook, + mqProperties.isEnableMessageTrace(), + mqProperties.getCustomizedTraceTopic()); + if (CLOUD_ACCESS_CHANNEL.equals(rocketMQProperties.getAccessChannel())) { defaultMQProducer.setAccessChannel(AccessChannel.CLOUD); } - if (!StringUtils.isEmpty(mqProperties.getNamespace())){ + if (!StringUtils.isEmpty(mqProperties.getNamespace())) { defaultMQProducer.setNamespace(mqProperties.getNamespace()); } defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers()); @@ -88,22 +92,6 @@ public void send(final MQProperties.CanalDestination destination, com.alibaba.ot } } - private void sendMessage(Message message, int partition) throws Exception{ - SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() { - @Override - public MessageQueue select(List mqs, Message msg, Object arg) { - if (partition > mqs.size()) { - return mqs.get(partition % mqs.size()); - } else { - return mqs.get(partition); - } - } - }, null); - if (logger.isDebugEnabled()) { - logger.debug("Send Message Result: {}", sendResult); - } - } - public void send(final MQProperties.CanalDestination destination, String topicName, com.alibaba.otter.canal.protocol.Message data) throws Exception { if (!mqProperties.getFlatMessage()) { @@ -203,6 +191,23 @@ public void send(final MQProperties.CanalDestination destination, String topicNa } } + private void sendMessage(Message message, int partition) throws Exception { + SendResult sendResult = this.defaultMQProducer.send(message, new MessageQueueSelector() { + + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + if (partition > mqs.size()) { + return mqs.get(partition % mqs.size()); + } else { + return mqs.get(partition); + } + } + }, null); + if (logger.isDebugEnabled()) { + logger.debug("Send Message Result: {}", sendResult); + } + } + @Override public void stop() { logger.info("## Stop RocketMQ producer##"); diff --git a/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java b/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java index 796a60b53e..a1be10a3ce 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java @@ -10,6 +10,7 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import com.alibaba.otter.canal.common.MQProperties; import com.alibaba.otter.canal.instance.core.CanalInstance; @@ -137,7 +138,7 @@ private void worker(String destination, AtomicBoolean destinationRunning) { } logger.info("## start the MQ producer: {}.", destination); - + MDC.put("destination", destination); final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, ""); while (running && destinationRunning.get()) { try {