From fa059804cedf2ee8d45b648135f04504aefad8c1 Mon Sep 17 00:00:00 2001 From: lulu2panpan Date: Thu, 19 Oct 2017 10:50:32 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=AD=BB=E9=94=81bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当通过zk-cursor启动的时候,第一个Event类型是TransactionEnd,那么txState变为2的触发条件不仅仅有ddl和dcl,clear方法中isTransactionEnd的判断应该放到txState.intValue() == 2的后面,否则将导致死锁 --- .../group/TimelineTransactionBarrier.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java index 2ee7f8ce6a..22a734a0d1 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java @@ -59,21 +59,22 @@ public void await(Event event, long timeout, TimeUnit unit) throws InterruptedEx } public void clear(Event event) { - super.clear(event); - - if (isTransactionEnd(event)) { + super.clear(event); + + //应该先判断2,再判断是否是事务尾,因为事务尾也可以导致txState的状态为2 + //如果先判断事务尾,那么2的状态可能永远没机会被修改了,系统出现死锁 + //CanalSinkException被注释的代码是不是可以放开??我们内部使用的时候已经放开了,从代码逻辑的分析上以及实践效果来看,应该抛异常 + if (txState.intValue() == 2) {// 非事务中 + boolean result = txState.compareAndSet(2, 0); + if (result == false) { + throw new CanalSinkException("state is not correct in non-transaction"); + } + } else if (isTransactionEnd(event)) { inTransaction.set(false); // 事务结束并且已经成功写入store,清理标记,进入重新排队判断,允许新的事务进入 - txState.compareAndSet(1, 0); - // if (txState.compareAndSet(1, 0) == false) { - // throw new - // CanalSinkException("state is not correct in transaction"); - // } - } else if (txState.intValue() == 2) {// 非事务中 - txState.compareAndSet(2, 0); - // if (txState.compareAndSet(2, 0) == false) { - // throw new - // CanalSinkException("state is not correct in non-transaction"); - // } + boolean result = txState.compareAndSet(1, 0); + if (result == false) { + throw new CanalSinkException("state is not correct in transaction"); + } } } @@ -90,7 +91,8 @@ protected boolean isPermit(Event event, long state) { return true; // 事务允许通过 } } else if (txState.compareAndSet(0, 2)) { // 非事务保护中 - return true; // DDL/DCL允许通过 + //当基于zk-cursor启动的时候,拿到的第一个Event是TransactionEnd + return true; // DDL/DCL/TransactionEnd允许通过 } } } From 00890f87e5ac9b3e35de14bab6d7c51369db6389 Mon Sep 17 00:00:00 2001 From: Chuanyi Li Date: Thu, 18 Jan 2018 21:11:46 +0800 Subject: [PATCH 02/10] Communication log optimization. --- .../embedded/CanalServerWithEmbedded.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java b/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java index 1d97352209..1b6bd448a4 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java @@ -220,8 +220,8 @@ public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, T events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); if (CollectionUtils.isEmpty(events.getEvents())) { - logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[] { - clientIdentity.getClientId(), batchSize }); + logger.debug("get successfully, clientId:{} batchSize:{} but result is null", + clientIdentity.getClientId(), batchSize); return new Message(-1, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 } else { // 记录到流式信息 @@ -232,13 +232,14 @@ public Entry apply(Event input) { return input.getEntry(); } }); - - logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", - clientIdentity.getClientId(), - batchSize, - entrys.size(), - batchId, - events.getPositionRange()); + if (logger.isInfoEnabled()) { + logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", + clientIdentity.getClientId(), + batchSize, + entrys.size(), + batchId, + events.getPositionRange()); + } // 直接提交ack ack(clientIdentity, batchId); return new Message(batchId, entrys); @@ -297,8 +298,8 @@ public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long } if (CollectionUtils.isEmpty(events.getEvents())) { - logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[] { - clientIdentity.getClientId(), batchSize }); + logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", + clientIdentity.getClientId(), batchSize); return new Message(-1, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 } else { // 记录到流式信息 @@ -309,13 +310,14 @@ public Entry apply(Event input) { return input.getEntry(); } }); - - logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", - clientIdentity.getClientId(), - batchSize, - entrys.size(), - batchId, - events.getPositionRange()); + if (logger.isInfoEnabled()) { + logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", + clientIdentity.getClientId(), + batchSize, + entrys.size(), + batchId, + events.getPositionRange()); + } return new Message(batchId, entrys); } @@ -377,10 +379,12 @@ public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerE // 更新cursor if (positionRanges.getAck() != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck()); - logger.info("ack successfully, clientId:{} batchId:{} position:{}", - clientIdentity.getClientId(), - batchId, - positionRanges); + if (logger.isInfoEnabled()) { + logger.info("ack successfully, clientId:{} batchId:{} position:{}", + clientIdentity.getClientId(), + batchId, + positionRanges); + } } // 可定时清理数据 From 6da1507405e8556cba7e5296db88ceb691808020 Mon Sep 17 00:00:00 2001 From: Chuanyi Li Date: Tue, 6 Feb 2018 20:54:33 +0800 Subject: [PATCH 03/10] Rollback running state and znode when start failed. --- .../running/ServerRunningMonitor.java | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java b/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java index e05357bdba..a0407c9555 100644 --- a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java +++ b/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java @@ -4,6 +4,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.alibaba.otter.canal.common.CanalException; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkInterruptedException; @@ -91,18 +92,25 @@ public void init() { processStart(); } - public void start() { + public synchronized void start() { super.start(); - processStart(); - if (zkClient != null) { - // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start - String path = ZookeeperPathUtils.getDestinationServerRunning(destination); - zkClient.subscribeDataChanges(path, dataListener); + try { + processStart(); + if (zkClient != null) { + // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start + String path = ZookeeperPathUtils.getDestinationServerRunning(destination); + zkClient.subscribeDataChanges(path, dataListener); - initRunning(); - } else { - processActiveEnter();// 没有zk,直接启动 + initRunning(); + } else { + processActiveEnter();// 没有zk,直接启动 + } + } catch (Exception e) { + logger.error("start failed", e); + // 没有正常启动,重置一下状态,避免干扰下一次start + stop(); } + } public void release() { @@ -113,7 +121,7 @@ public void release() { } } - public void stop() { + public synchronized void stop() { super.stop(); if (zkClient != null) { @@ -234,11 +242,7 @@ private void processStop() { private void processActiveEnter() { if (listener != null) { - try { - listener.processActiveEnter(); - } catch (Exception e) { - logger.error("processActiveEnter failed", e); - } + listener.processActiveEnter(); } } From dc0010823f93f83490cf5c776f260521d25a5ae7 Mon Sep 17 00:00:00 2001 From: Chuanyi Li Date: Sat, 10 Feb 2018 23:41:58 +0800 Subject: [PATCH 04/10] [MI] Use mysql master heartbeat to detect phycial tcp connection failure. --- .../canal/parse/inbound/mysql/MysqlConnection.java | 10 ++++++++++ .../parse/inbound/mysql/dbsync/DirectLogFetcher.java | 7 ++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java index 8e9e734b80..e665b4f564 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -28,6 +29,8 @@ import com.taobao.tddl.dbsync.binlog.LogDecoder; import com.taobao.tddl.dbsync.binlog.LogEvent; +import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS; + public class MysqlConnection implements ErosaConnection { private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class); @@ -293,6 +296,13 @@ private void updateSettings() throws IOException { } catch (Exception e) { logger.warn("update mariadb_slave_capability failed", e); } + + long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS); + try { + update("SET @master_heartbeat_period=" + periodNano); + } catch (Exception e) { + logger.warn("update master_heartbeat_period failed", e); + } } /** diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java index 1e8d70dfad..1974ef1e4b 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java @@ -21,6 +21,11 @@ public class DirectLogFetcher extends LogFetcher { protected static final Logger logger = LoggerFactory.getLogger(DirectLogFetcher.class); + // Master heartbeat interval + public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15; + // +1s 确保 timeout > heartbeat interval + static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 1) * 1000; + /** Command to dump binlog */ public static final byte COM_BINLOG_DUMP = 18; @@ -166,7 +171,7 @@ public boolean fetch() throws IOException { private final boolean fetch0(final int off, final int len) throws IOException { ensureCapacity(off + len); - byte[] read = channel.read(len); + byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS); System.arraycopy(read, 0, this.buffer, off, len); if (limit < off + len) limit = off + len; From bb0f3b3a7e20a236ff9c6fe264d1e58831710182 Mon Sep 17 00:00:00 2001 From: Chuanyi Li Date: Sat, 10 Feb 2018 23:41:58 +0800 Subject: [PATCH 05/10] [MI] Use mysql master heartbeat to detect phycial tcp connection failure. --- .../canal/parse/inbound/mysql/MysqlConnection.java | 10 ++++++++++ .../parse/inbound/mysql/dbsync/DirectLogFetcher.java | 7 ++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java index 8e9e734b80..e665b4f564 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -28,6 +29,8 @@ import com.taobao.tddl.dbsync.binlog.LogDecoder; import com.taobao.tddl.dbsync.binlog.LogEvent; +import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS; + public class MysqlConnection implements ErosaConnection { private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class); @@ -293,6 +296,13 @@ private void updateSettings() throws IOException { } catch (Exception e) { logger.warn("update mariadb_slave_capability failed", e); } + + long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS); + try { + update("SET @master_heartbeat_period=" + periodNano); + } catch (Exception e) { + logger.warn("update master_heartbeat_period failed", e); + } } /** diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java index 1e8d70dfad..d021868ca4 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/DirectLogFetcher.java @@ -21,6 +21,11 @@ public class DirectLogFetcher extends LogFetcher { protected static final Logger logger = LoggerFactory.getLogger(DirectLogFetcher.class); + // Master heartbeat interval + public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15; + // +1s 确保 timeout > heartbeat interval + private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 1) * 1000; + /** Command to dump binlog */ public static final byte COM_BINLOG_DUMP = 18; @@ -166,7 +171,7 @@ public boolean fetch() throws IOException { private final boolean fetch0(final int off, final int len) throws IOException { ensureCapacity(off + len); - byte[] read = channel.read(len); + byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS); System.arraycopy(read, 0, this.buffer, off, len); if (limit < off + len) limit = off + len; From cfa90af33c38435659e7143474a58dad76c5fbe0 Mon Sep 17 00:00:00 2001 From: agapple Date: Mon, 12 Feb 2018 11:01:53 +0800 Subject: [PATCH 06/10] fixed unique and upgrade druid --- .../example/AbstractCanalClientTest.java | 15 ++++++++++----- .../otter/canal/parse/inbound/TableMeta.java | 19 ++++++++++++++++--- .../inbound/mysql/dbsync/TableMetaCache.java | 1 + .../inbound/mysql/tsdb/DatabaseTableMeta.java | 4 ++++ .../inbound/mysql/tsdb/MemoryTableMeta.java | 12 ++++++++++++ pom.xml | 2 +- 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java b/example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java index 59a378a0b7..60ed874473 100644 --- a/example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java +++ b/example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java @@ -56,10 +56,10 @@ public void uncaughtException(Thread t, Throwable e) { context_format += "****************************************************" + SEP; row_format = SEP - + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms" + + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , delay : {}ms" + SEP; - transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP; + transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , delay : {}ms" + SEP; } @@ -165,6 +165,8 @@ protected void printEntry(List entrys) { for (Entry entry : entrys) { long executeTime = entry.getHeader().getExecuteTime(); long delayTime = new Date().getTime() - executeTime; + Date date = new Date(entry.getHeader().getExecuteTime()); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { @@ -178,7 +180,8 @@ protected void printEntry(List entrys) { logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), - String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) }); + String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), + String.valueOf(delayTime) }); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; @@ -193,7 +196,8 @@ protected void printEntry(List entrys) { logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), - String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) }); + String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), + String.valueOf(delayTime) }); } continue; @@ -213,7 +217,8 @@ protected void printEntry(List entrys) { new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, - String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) }); + String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), + String.valueOf(delayTime) }); if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql() + SEP); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java index 7f56e440f7..68915ae49c 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/TableMeta.java @@ -3,9 +3,10 @@ import java.util.ArrayList; import java.util.List; -import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent; import org.apache.commons.lang.StringUtils; +import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent; + /** * 描述数据meta对象,mysql binlog中对应的{@linkplain TableMapLogEvent}包含的信息不全 * @@ -127,6 +128,7 @@ public FieldMeta(String columnName, String columnType, boolean nullable, boolean private boolean key; private String defaultValue; private String extra; + private boolean unique; public String getColumnName() { return columnName; @@ -180,10 +182,21 @@ public void setExtra(String extra) { this.extra = extra; } + public boolean isUnique() { + return unique; + } + + public void setUnique(boolean unique) { + this.unique = unique; + } + + @Override public String toString() { - return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", defaultValue=" - + defaultValue + ", nullable=" + nullable + ", key=" + key + "]"; + return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", nullable=" + nullable + + ", key=" + key + ", defaultValue=" + defaultValue + ", extra=" + extra + ", unique=" + unique + + "]"; } + } } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java index cc1e57974d..8493cb3615 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java @@ -104,6 +104,7 @@ public static List parserTableMeta(ResultSetPacket packet) { * size), "YES")); meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size))); + meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size))); // 特殊处理引号 meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues() .get(nameMaps.get(COLUMN_DEFAULT) + i * size))); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java index 8c57b3ac29..4ddf14bac8 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java @@ -446,6 +446,10 @@ private boolean compareTableMeta(TableMeta source, TableMeta target) { if (sourceField.isKey() != targetField.isKey()) { return false; } + + if (sourceField.isUnique() != targetField.isUnique()) { + return false; + } } return true; diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java index f36aa7263b..2cfa1e2f7a 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java @@ -21,12 +21,14 @@ import com.alibaba.druid.sql.ast.statement.SQLColumnConstraint; import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition; import com.alibaba.druid.sql.ast.statement.SQLColumnPrimaryKey; +import com.alibaba.druid.sql.ast.statement.SQLColumnUniqueKey; import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement; import com.alibaba.druid.sql.ast.statement.SQLNotNullConstraint; import com.alibaba.druid.sql.ast.statement.SQLNullConstraint; import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem; import com.alibaba.druid.sql.ast.statement.SQLTableElement; import com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey; +import com.alibaba.druid.sql.dialect.mysql.ast.MySqlUnique; import com.alibaba.druid.sql.repository.Schema; import com.alibaba.druid.sql.repository.SchemaObject; import com.alibaba.druid.sql.repository.SchemaRepository; @@ -204,6 +206,8 @@ private void processTableElement(SQLTableElement element, TableMeta tableMeta) { fieldMeta.setNullable(true); } else if (constraint instanceof SQLColumnPrimaryKey) { fieldMeta.setKey(true); + } else if (constraint instanceof SQLColumnUniqueKey) { + fieldMeta.setUnique(true); } } tableMeta.addFieldMeta(fieldMeta); @@ -215,6 +219,14 @@ private void processTableElement(SQLTableElement element, TableMeta tableMeta) { FieldMeta field = tableMeta.getFieldMetaByName(name); field.setKey(true); } + } else if (element instanceof MySqlUnique) { + MySqlUnique column = (MySqlUnique) element; + List uks = column.getColumns(); + for (SQLSelectOrderByItem uk : uks) { + String name = getSqlName(uk.getExpr()); + FieldMeta field = tableMeta.getFieldMetaByName(name); + field.setUnique(true); + } } } diff --git a/pom.xml b/pom.xml index a951f42cdd..e28e280661 100644 --- a/pom.xml +++ b/pom.xml @@ -254,7 +254,7 @@ com.alibaba druid - 1.1.7-preview_0 + 1.1.8 From edc2fbe1dc359f8083b462bba2c24de30338d976 Mon Sep 17 00:00:00 2001 From: agapple Date: Mon, 12 Feb 2018 11:15:43 +0800 Subject: [PATCH 07/10] fixed issue #507 --- .../parse/inbound/mysql/tsdb/DatabaseTableMeta.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java index 4ddf14bac8..6844f035f6 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java @@ -443,11 +443,10 @@ private boolean compareTableMeta(TableMeta source, TableMeta target) { return false; } - if (sourceField.isKey() != targetField.isKey()) { - return false; - } - - if (sourceField.isUnique() != targetField.isUnique()) { + // mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk + boolean isSourcePkOrUk = sourceField.isKey() || sourceField.isUnique(); + boolean isTargetPkOrUk = targetField.isKey() || targetField.isUnique(); + if (isSourcePkOrUk != isTargetPkOrUk) { return false; } } From 89994c52f96870767b0b0be965389e4c3dccfb96 Mon Sep 17 00:00:00 2001 From: yakirChen Date: Mon, 12 Feb 2018 11:17:54 +0800 Subject: [PATCH 08/10] =?UTF-8?q?[x]=20Fix=20Compile=20Error=20:=20Timelin?= =?UTF-8?q?eTransactionBarrier.java:[70,27]=20=E6=89=BE=E4=B8=8D=E5=88=B0?= =?UTF-8?q?=E7=AC=A6=E5=8F=B7:=20=20=20=E7=B1=BB=20CanalSinkException=20[x?= =?UTF-8?q?]=20Fix=20Maven=20Wranning:=20The=20expression=20${pom.version}?= =?UTF-8?q?=20is=20deprecated.=20Please=20use=20${project.version}=20inste?= =?UTF-8?q?ad.=20[x]=20Fix=20MavenWranning=20:=20'build.plugins.plugin.ver?= =?UTF-8?q?sion'=20for=20org.apache.maven.plugins:maven-jar-plugin=20is=20?= =?UTF-8?q?missing.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/pom.xml | 2 +- pom.xml | 8 ++++++++ .../sink/entry/group/TimelineTransactionBarrier.java | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client/pom.xml b/client/pom.xml index 2778841c20..c736b141c1 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -69,7 +69,7 @@ https://github.com/alibaba/canal - ${project.build.directory}/apidocs/apidocs/${pom.version} + ${project.build.directory}/apidocs/apidocs/${project.version} diff --git a/pom.xml b/pom.xml index e28e280661..7cb94cfb83 100644 --- a/pom.xml +++ b/pom.xml @@ -445,6 +445,14 @@ + + + + maven-jar-plugin + 3.0.2 + + + diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java index 22a734a0d1..5f6c42b3f7 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; +import com.alibaba.otter.canal.sink.exception.CanalSinkException; import com.alibaba.otter.canal.store.model.Event; /** From 514876f60fabc6c24a3119a9ec602cc98f2c6ed6 Mon Sep 17 00:00:00 2001 From: yakirChen Date: Mon, 12 Feb 2018 11:17:54 +0800 Subject: [PATCH 09/10] =?UTF-8?q?[x]=20Fix=20Compile=20Error=20:=20Timelin?= =?UTF-8?q?eTransactionBarrier.java:[70,27]=20=E6=89=BE=E4=B8=8D=E5=88=B0?= =?UTF-8?q?=E7=AC=A6=E5=8F=B7:=20=20=20=E7=B1=BB=20CanalSinkException=20[x?= =?UTF-8?q?]=20Fix=20Maven=20Wranning:=20The=20expression=20${pom.version}?= =?UTF-8?q?=20is=20deprecated.=20Please=20use=20${project.version}=20inste?= =?UTF-8?q?ad.=20[x]=20Fix=20MavenWranning=20:=20'build.plugins.plugin.ver?= =?UTF-8?q?sion'=20for=20org.apache.maven.plugins:maven-jar-plugin=20is=20?= =?UTF-8?q?missing.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/pom.xml | 2 +- pom.xml | 8 ++++++++ .../sink/entry/group/TimelineTransactionBarrier.java | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client/pom.xml b/client/pom.xml index 2778841c20..c736b141c1 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -69,7 +69,7 @@ https://github.com/alibaba/canal - ${project.build.directory}/apidocs/apidocs/${pom.version} + ${project.build.directory}/apidocs/apidocs/${project.version} diff --git a/pom.xml b/pom.xml index e28e280661..7cb94cfb83 100644 --- a/pom.xml +++ b/pom.xml @@ -445,6 +445,14 @@ + + + + maven-jar-plugin + 3.0.2 + + + diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java index 22a734a0d1..5f6c42b3f7 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; +import com.alibaba.otter.canal.sink.exception.CanalSinkException; import com.alibaba.otter.canal.store.model.Event; /** From 2d49b709cabe1626cf4c5c5b8c942bf7571a5d8e Mon Sep 17 00:00:00 2001 From: agapple Date: Mon, 12 Feb 2018 11:43:03 +0800 Subject: [PATCH 10/10] fixed socket wait period 10ms --- .../canal/parse/driver/mysql/MysqlConnector.java | 4 ++-- .../parse/driver/mysql/socket/SocketChannel.java | 15 ++++++++------- .../driver/mysql/socket/SocketChannelPool.java | 5 ++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java index db83f2411f..5d407112a9 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/MysqlConnector.java @@ -43,8 +43,8 @@ public class MysqlConnector { // mysql connectinnId private long connectionId = -1; private AtomicBoolean connected = new AtomicBoolean(false); - - public static final int timeout = 3000; // 3s + + public static final int timeout = 5 * 1000; // 5s public MysqlConnector(){ } diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java index 9eee2ef7c1..b55b672235 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java @@ -15,6 +15,7 @@ */ public class SocketChannel { + private static final int period = 10; private Channel channel = null; private Object lock = new Object(); private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小 @@ -31,7 +32,7 @@ public void writeCache(ByteBuf buf) throws InterruptedException { synchronized (lock) { while (true) { cache.discardReadBytes();// 回收内存 - //source buffer is empty. + // source buffer is empty. if (!buf.isReadable()) { break; } @@ -39,8 +40,8 @@ public void writeCache(ByteBuf buf) throws InterruptedException { if (cache.isWritable()) { cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes())); } else { - //dest buffer is full. - lock.wait(100); + // dest buffer is full. + lock.wait(period); } } } @@ -62,7 +63,7 @@ public byte[] read(int readSize) throws IOException { } synchronized (this) { try { - wait(100); + wait(period); } catch (InterruptedException e) { throw new java.nio.channels.ClosedByInterruptException(); } @@ -76,7 +77,7 @@ public byte[] read(int readSize) throws IOException { } } while (true); } - + public byte[] read(int readSize, int timeout) throws IOException { int accumulatedWaitTime = 0; do { @@ -85,14 +86,14 @@ public byte[] read(int readSize, int timeout) throws IOException { throw new IOException("socket has Interrupted !"); } - accumulatedWaitTime += 100; + accumulatedWaitTime += period; if (accumulatedWaitTime > timeout) { throw new IOException("socket read timeout occured !"); } synchronized (this) { try { - wait(100); + wait(period); } catch (InterruptedException e) { throw new IOException("socket has Interrupted !"); } diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java index 1b7db29fa0..fa16a8291d 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannelPool.java @@ -7,13 +7,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.SocketAddress; @@ -103,7 +102,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - //need output error for troubeshooting. + // need output error for troubeshooting. logger.error("business error.", cause); ctx.close(); }