Skip to content

Commit

Permalink
fixed issue #1826 , remove kafka transaction send
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jun 26, 2019
1 parent 4957083 commit a847660
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 208 deletions.
Expand Up @@ -10,56 +10,55 @@
*/
public class CanalConstants {

public static final String MDC_DESTINATION = "destination";
public static final String ROOT = "canal";
public static final String CANAL_ID = ROOT + "." + "id";
public static final String CANAL_IP = ROOT + "." + "ip";
public static final String CANAL_PORT = ROOT + "." + "port";
public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port";
public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers";
public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty";
public static final String MDC_DESTINATION = "destination";
public static final String ROOT = "canal";
public static final String CANAL_ID = ROOT + "." + "id";
public static final String CANAL_IP = ROOT + "." + "ip";
public static final String CANAL_PORT = ROOT + "." + "port";
public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port";
public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers";
public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty";

public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations";
public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan";
public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval";
public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir";
public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode";
public static final String CANAL_DESTINATIONS = ROOT + "." + "destinations";
public static final String CANAL_AUTO_SCAN = ROOT + "." + "auto.scan";
public static final String CANAL_AUTO_SCAN_INTERVAL = ROOT + "." + "auto.scan.interval";
public static final String CANAL_CONF_DIR = ROOT + "." + "conf.dir";
public static final String CANAL_SERVER_MODE = ROOT + "." + "serverMode";

public static final String CANAL_DESTINATION_SPLIT = ",";
public static final String GLOBAL_NAME = "global";
public static final String CANAL_DESTINATION_SPLIT = ",";
public static final String GLOBAL_NAME = "global";

public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode";
public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy";
public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml";
public static final String INSTANCE_MODE_TEMPLATE = ROOT + "." + "instance.{0}.mode";
public static final String INSTANCE_LAZY_TEMPLATE = ROOT + "." + "instance.{0}.lazy";
public static final String INSTANCE_MANAGER_ADDRESS_TEMPLATE = ROOT + "." + "instance.{0}.manager.address";
public static final String INSTANCE_SPRING_XML_TEMPLATE = ROOT + "." + "instance.{0}.spring.xml";

public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination";
public static final String CANAL_DESTINATION_PROPERTY = ROOT + ".instance.destination";

public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel";
public static final String CANAL_SOCKETCHANNEL = ROOT + "." + "socketChannel";

public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers";
public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries";
public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize";
public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs";
public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize";
public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory";
public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize";
public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout";
public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage";
public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType";
public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks";
public static final String CANAL_MQ_TRANSACTION = ROOT + "." + "mq.transaction";
public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup";
public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey";
public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey";
public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties";
public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace";
public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel";
public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic";
public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace";
public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable";
public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
public static final String CANAL_MQ_SERVERS = ROOT + "." + "mq.servers";
public static final String CANAL_MQ_RETRIES = ROOT + "." + "mq.retries";
public static final String CANAL_MQ_BATCHSIZE = ROOT + "." + "mq.batchSize";
public static final String CANAL_MQ_LINGERMS = ROOT + "." + "mq.lingerMs";
public static final String CANAL_MQ_MAXREQUESTSIZE = ROOT + "." + "mq.maxRequestSize";
public static final String CANAL_MQ_BUFFERMEMORY = ROOT + "." + "mq.bufferMemory";
public static final String CANAL_MQ_CANALBATCHSIZE = ROOT + "." + "mq.canalBatchSize";
public static final String CANAL_MQ_CANALGETTIMEOUT = ROOT + "." + "mq.canalGetTimeout";
public static final String CANAL_MQ_FLATMESSAGE = ROOT + "." + "mq.flatMessage";
public static final String CANAL_MQ_COMPRESSION_TYPE = ROOT + "." + "mq.compressionType";
public static final String CANAL_MQ_ACKS = ROOT + "." + "mq.acks";
public static final String CANAL_MQ_PRODUCERGROUP = ROOT + "." + "mq.producerGroup";
public static final String CANAL_ALIYUN_ACCESSKEY = ROOT + "." + "aliyun.accessKey";
public static final String CANAL_ALIYUN_SECRETKEY = ROOT + "." + "aliyun.secretKey";
public static final String CANAL_MQ_PROPERTIES = ROOT + "." + "mq.properties";
public static final String CANAL_MQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "mq.enableMessageTrace";
public static final String CANAL_MQ_ACCESS_CHANNEL = ROOT + "." + "mq.accessChannel";
public static final String CANAL_MQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "mq.customizedTraceTopic";
public static final String CANAL_MQ_NAMESPACE = ROOT + "." + "mq.namespace";
public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable";
public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";

public static String getInstanceModeKey(String destination) {
return MessageFormat.format(INSTANCE_MODE_TEMPLATE, destination);
Expand Down
@@ -1,5 +1,15 @@
package com.alibaba.otter.canal.deployer;

import java.io.File;
import java.io.FileFilter;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.kafka.CanalKafkaProducer;
import com.alibaba.otter.canal.rocketmq.CanalRocketMQProducer;
Expand All @@ -8,15 +18,6 @@
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
* Canal server 启动类
Expand Down Expand Up @@ -64,8 +65,8 @@ synchronized void start(Properties properties) throws Throwable {

public boolean accept(File pathname) {
String filename = pathname.getName();
return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename) &&
!"metrics".equalsIgnoreCase(filename);
return pathname.isDirectory() && !"spring".equalsIgnoreCase(filename)
&& !"metrics".equalsIgnoreCase(filename);
}
});
if (instanceDirs != null && instanceDirs.length > 0) {
Expand Down Expand Up @@ -196,17 +197,14 @@ private static MQProperties buildMQProperties(Properties properties) {
if (!StringUtils.isEmpty(aliyunSecretKey)) {
mqProperties.setAliyunSecretKey(aliyunSecretKey);
}
String transaction = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_TRANSACTION);
if (!StringUtils.isEmpty(transaction)) {
mqProperties.setTransaction(Boolean.valueOf(transaction));
}

String producerGroup = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_PRODUCERGROUP);
if (!StringUtils.isEmpty(producerGroup)) {
mqProperties.setProducerGroup(producerGroup);
}

String enableMessageTrace = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
String enableMessageTrace = CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_ENABLE_MESSAGE_TRACE);
if (!StringUtils.isEmpty(enableMessageTrace)) {
mqProperties.setEnableMessageTrace(Boolean.valueOf(enableMessageTrace));
}
Expand All @@ -216,7 +214,8 @@ private static MQProperties buildMQProperties(Properties properties) {
mqProperties.setAccessChannel(accessChannel);
}

String customizedTraceTopic = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
String customizedTraceTopic = CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_CUSTOMIZED_TRACE_TOPIC);
if (!StringUtils.isEmpty(customizedTraceTopic)) {
mqProperties.setCustomizedTraceTopic(customizedTraceTopic);
}
Expand All @@ -226,17 +225,20 @@ private static MQProperties buildMQProperties(Properties properties) {
mqProperties.setNamespace(namespace);
}

String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
String kafkaKerberosEnable = CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
}

String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
}

String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
String kafkaKerberosJaasFilepath = CanalController.getProperty(properties,
CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
}
Expand Down
2 changes: 0 additions & 2 deletions deployer/src/main/resources/canal.properties
Expand Up @@ -115,8 +115,6 @@ canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = true
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
Expand Down
Expand Up @@ -25,15 +25,14 @@ public class MQProperties {
private String acks = "all";
private String aliyunAccessKey = "";
private String aliyunSecretKey = "";
private boolean transaction = false; // 是否开启事务
private Properties properties = new Properties();
private boolean enableMessageTrace = false;
private String accessChannel = null;
private String customizedTraceTopic = null;
private String namespace = "";
private boolean kerberosEnable = false; //kafka集群是否启动Kerberos认证
private String kerberosKrb5FilePath = ""; //启动Kerberos认证时配置为krb5.conf文件的路径
private String kerberosJaasFilePath = ""; //启动Kerberos认证时配置为jaas.conf文件的路径
private boolean kerberosEnable = false; // kafka集群是否启动Kerberos认证
private String kerberosKrb5FilePath = ""; // 启动Kerberos认证时配置为krb5.conf文件的路径
private String kerberosJaasFilePath = ""; // 启动Kerberos认证时配置为jaas.conf文件的路径

public static class CanalDestination {

Expand Down Expand Up @@ -213,14 +212,6 @@ public void setMaxRequestSize(int maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}

public boolean getTransaction() {
return transaction;
}

public void setTransaction(boolean transaction) {
this.transaction = transaction;
}

public Properties getProperties() {
return properties;
}
Expand Down Expand Up @@ -285,32 +276,17 @@ public void setKerberosJaasFilePath(String kerberosJaasFilePath) {
this.kerberosJaasFilePath = kerberosJaasFilePath;
}

@Override public String toString() {
return "MQProperties{" +
"servers='" + servers + '\'' +
", retries=" + retries +
", batchSize=" + batchSize +
", lingerMs=" + lingerMs +
", maxRequestSize=" + maxRequestSize +
", bufferMemory=" + bufferMemory +
", filterTransactionEntry=" + filterTransactionEntry +
", producerGroup='" + producerGroup + '\'' +
", canalBatchSize=" + canalBatchSize +
", canalGetTimeout=" + canalGetTimeout +
", flatMessage=" + flatMessage +
", compressionType='" + compressionType + '\'' +
", acks='" + acks + '\'' +
", aliyunAccessKey='" + aliyunAccessKey + '\'' +
", aliyunSecretKey='" + aliyunSecretKey + '\'' +
", transaction=" + transaction +
", properties=" + properties +
", enableMessageTrace=" + enableMessageTrace +
", accessChannel='" + accessChannel + '\'' +
", customizedTraceTopic='" + customizedTraceTopic + '\'' +
", namespace='" + namespace + '\'' +
", kerberosEnable='" + kerberosEnable + '\'' +
", kerberosKrb5FilePath='" + kerberosKrb5FilePath + '\'' +
", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' +
'}';
@Override
public String toString() {
return "MQProperties{" + "servers='" + servers + '\'' + ", retries=" + retries + ", batchSize=" + batchSize
+ ", lingerMs=" + lingerMs + ", maxRequestSize=" + maxRequestSize + ", bufferMemory=" + bufferMemory
+ ", filterTransactionEntry=" + filterTransactionEntry + ", producerGroup='" + producerGroup + '\''
+ ", canalBatchSize=" + canalBatchSize + ", canalGetTimeout=" + canalGetTimeout + ", flatMessage="
+ flatMessage + ", compressionType='" + compressionType + '\'' + ", acks='" + acks + '\''
+ ", aliyunAccessKey='" + aliyunAccessKey + '\'' + ", aliyunSecretKey='" + aliyunSecretKey + '\''
+ ", properties=" + properties + ", enableMessageTrace=" + enableMessageTrace + ", accessChannel='"
+ accessChannel + '\'' + ", customizedTraceTopic='" + customizedTraceTopic + '\'' + ", namespace='"
+ namespace + '\'' + ", kerberosEnable='" + kerberosEnable + '\'' + ", kerberosKrb5FilePath='"
+ kerberosKrb5FilePath + '\'' + ", kerberosJaasFilePath='" + kerberosJaasFilePath + '\'' + '}';
}
}

0 comments on commit a847660

Please sign in to comment.