From 10bb800f85ac047071a942a3323fccebed3dac44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=83=E9=94=8B?= Date: Wed, 18 Jul 2018 18:21:45 +0800 Subject: [PATCH] =?UTF-8?q?fixed=20issue=20#726=20,=20=E4=BC=98=E5=8C=96Se?= =?UTF-8?q?ssionHandler=E7=9A=84=E4=BC=A0=E8=BE=93=E5=A4=84=E7=90=86,?= =?UTF-8?q?=E6=8F=90=E5=89=8D=E5=BA=8F=E5=88=97=E5=8C=96Entry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/impl/SimpleCanalConnector.java | 19 +++- .../mysql/MysqlMultiStageCoprocessor.java | 37 ++++--- .../inbound/mysql/tsdb/DatabaseTableMeta.java | 4 +- .../inbound/mysql/tsdb/MemoryTableMeta.java | 2 +- .../parse/inbound/group/DummyEventStore.java | 42 +++----- pom.xml | 2 +- .../alibaba/otter/canal/protocol/Message.java | 36 +++++++ .../embedded/CanalServerWithEmbedded.java | 22 ++--- .../server/netty/handler/SessionHandler.java | 15 ++- .../canal/sink/entry/EntryEventSink.java | 16 +-- .../entry/HeartBeatEntryEventHandler.java | 4 +- .../sink/entry/group/TimelineBarrier.java | 2 +- .../group/TimelineTransactionBarrier.java | 4 +- .../canal/sink/stub/DummyEventStore.java | 12 +-- .../canal/store/helper/CanalEventUtils.java | 27 +++--- .../memory/MemoryEventStoreWithBuffer.java | 11 +-- .../otter/canal/store/model/Event.java | 97 +++++++++++++++++-- .../MemoryEventStoreMultiThreadTest.java | 7 +- 18 files changed, 252 insertions(+), 107 deletions(-) diff --git a/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java b/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java index 0feb3a27c6..508c01dc3a 100644 --- a/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java +++ b/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java @@ -71,7 +71,7 @@ public class SimpleCanalConnector implements CanalConnector { private volatile boolean connected = false; // 代表connected是否已正常执行,因为有HA,不代表在工作中 private boolean rollbackOnConnect = true; // 是否在connect链接成功后,自动执行rollback操作 private boolean rollbackOnDisConnect = false; // 是否在connect链接成功后,自动执行rollback操作 - + private boolean lazyParseEntry = false; // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析 // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败 private Object readDataLock = new Object(); private Object writeDataLock = new Object(); @@ -328,8 +328,13 @@ private Message receiveMessages() throws IOException { Messages messages = Messages.parseFrom(p.getBody()); Message result = new Message(messages.getBatchId()); - for (ByteString byteString : messages.getMessagesList()) { - result.addEntry(Entry.parseFrom(byteString)); + if (lazyParseEntry) { + // byteString + result.setRawEntries(messages.getMessagesList()); + } else { + for (ByteString byteString : messages.getMessagesList()) { + result.addEntry(Entry.parseFrom(byteString)); + } } return result; } @@ -538,6 +543,14 @@ public void setFilter(String filter) { this.filter = filter; } + public boolean isLazyParseEntry() { + return lazyParseEntry; + } + + public void setLazyParseEntry(boolean lazyParseEntry) { + this.lazyParseEntry = lazyParseEntry; + } + public void stopRunning() { if (running) { running = false; // 设置为非running状态 diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java index 88278d84d3..76c70723ca 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java @@ -2,6 +2,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.locks.LockSupport; import org.apache.commons.lang.StringUtils; @@ -19,6 +20,7 @@ import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; @@ -141,24 +143,35 @@ public void stop() { * 网络数据投递 */ public void publish(LogBuffer buffer) { - if (!isStart() && exception != null) { - throw exception; - } - long next = disruptorMsgBuffer.next(); - MessageEvent event = disruptorMsgBuffer.get(next); - event.setBuffer(buffer); - disruptorMsgBuffer.publish(next); + publish(buffer, null); } public void publish(LogBuffer buffer, String binlogFileName) { if (!isStart() && exception != null) { throw exception; } - long next = disruptorMsgBuffer.next(); - MessageEvent event = disruptorMsgBuffer.get(next); - event.setBuffer(buffer); - event.setBinlogFileName(binlogFileName); - disruptorMsgBuffer.publish(next); + + boolean interupted = false; + do { + try { + long next = disruptorMsgBuffer.tryNext(); + MessageEvent event = disruptorMsgBuffer.get(next); + event.setBuffer(buffer); + if (binlogFileName != null) { + event.setBinlogFileName(binlogFileName); + } + disruptorMsgBuffer.publish(next); + break; + } catch (InsufficientCapacityException e) { + // park + LockSupport.parkNanos(1L); + interupted = Thread.interrupted(); + } + } while (!interupted && isStart()); + + if (exception != null) { + throw exception; + } } @Override diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java index 12cc46f695..bb2af45f27 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/DatabaseTableMeta.java @@ -70,7 +70,9 @@ public boolean init(final String destination) { @Override public Thread newThread(Runnable r) { - return new Thread(r, "[scheduler-table-meta-snapshot]"); + Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]"); + thread.setDaemon(true); + return thread; } }); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java index 127df1c442..9a8863362a 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/tsdb/MemoryTableMeta.java @@ -94,7 +94,7 @@ public TableMeta find(String schema, String table) { tableMeta = tableMetas.get(keys); if (tableMeta == null) { Schema schemaRep = repository.findSchema(schema); - if (schema == null) { + if (schemaRep == null) { return null; } SchemaObject data = schemaRep.findTable(table); diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java b/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java index 24f7f55116..c542e5c089 100644 --- a/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/group/DummyEventStore.java @@ -7,7 +7,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.position.Position; import com.alibaba.otter.canal.store.CanalEventStore; @@ -18,7 +17,7 @@ public class DummyEventStore implements CanalEventStore { private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; - private static final String messgae = "{0} [{1}:{2}:{3}] {4} {5}.{6}"; + private static final String messgae = "{0} [{1}:{2}:{3}]"; public void ack(Position position) throws CanalStoreException { @@ -82,46 +81,40 @@ public boolean tryPut(Event data) throws CanalStoreException { public void put(List datas) throws InterruptedException, CanalStoreException { for (Event data : datas) { - CanalEntry.Header header = data.getEntry().getHeader(); - Date date = new Date(header.getExecuteTime()); + Date date = new Date(data.getExecuteTime()); SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); - if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN - || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) { + if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) { // System.out.println(MessageFormat.format(messgae, new Object[] // { Thread.currentThread().getName(), // header.getLogfilename(), header.getLogfileoffset(), // format.format(date), // data.getEntry().getEntryType(), "" })); - System.out.println(data.getEntry().getEntryType()); + System.out.println(data.getEntryType()); } else { System.out.println(MessageFormat.format(messgae, - new Object[] { Thread.currentThread().getName(), header.getLogfileName(), - String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(), - header.getSchemaName(), header.getTableName() })); + new Object[] { Thread.currentThread().getName(), data.getJournalName(), + String.valueOf(data.getPosition()), format.format(date) })); } } } public boolean put(List datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException { for (Event data : datas) { - CanalEntry.Header header = data.getEntry().getHeader(); - Date date = new Date(header.getExecuteTime()); + Date date = new Date(data.getExecuteTime()); SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); - if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN - || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) { + if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) { // System.out.println(MessageFormat.format(messgae, new Object[] // { Thread.currentThread().getName(), // header.getLogfilename(), header.getLogfileoffset(), // format.format(date), // data.getEntry().getEntryType(), "" })); - System.out.println(data.getEntry().getEntryType()); + System.out.println(data.getEntryType()); } else { System.out.println(MessageFormat.format(messgae, - new Object[] { Thread.currentThread().getName(), header.getLogfileName(), - String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(), - header.getSchemaName(), header.getTableName() })); + new Object[] { Thread.currentThread().getName(), data.getJournalName(), + String.valueOf(data.getPosition()), format.format(date) })); } } return true; @@ -131,23 +124,20 @@ public boolean tryPut(List datas) throws CanalStoreException { System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); for (Event data : datas) { - CanalEntry.Header header = data.getEntry().getHeader(); - Date date = new Date(header.getExecuteTime()); + Date date = new Date(data.getExecuteTime()); SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); - if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN - || data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) { + if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) { // System.out.println(MessageFormat.format(messgae, new Object[] // { Thread.currentThread().getName(), // header.getLogfilename(), header.getLogfileoffset(), // format.format(date), // data.getEntry().getEntryType(), "" })); - System.out.println(data.getEntry().getEntryType()); + System.out.println(data.getEntryType()); } else { System.out.println(MessageFormat.format(messgae, - new Object[] { Thread.currentThread().getName(), header.getLogfileName(), - String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(), - header.getSchemaName(), header.getTableName() })); + new Object[] { Thread.currentThread().getName(), data.getJournalName(), + String.valueOf(data.getPosition()), format.format(date) })); } } diff --git a/pom.xml b/pom.xml index 8ab961ea71..1e8123ef21 100644 --- a/pom.xml +++ b/pom.xml @@ -256,7 +256,7 @@ com.alibaba.fastsql fastsql - 2.0.0_preview_371 + 2.0.0_preview_520 com.alibaba diff --git a/protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java b/protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java index 984766e29b..35adf15d9c 100644 --- a/protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java +++ b/protocol/src/main/java/com/alibaba/otter/canal/protocol/Message.java @@ -8,6 +8,7 @@ import com.alibaba.otter.canal.common.utils.CanalToStringStyle; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.google.protobuf.ByteString; /** * @author zebin.xuzb @ 2012-6-19 @@ -18,11 +19,26 @@ public class Message implements Serializable { private static final long serialVersionUID = 1234034768477580009L; private long id; + @Deprecated private List entries = new ArrayList(); + // row data for performance, see: + // https://github.com/alibaba/canal/issues/726 + private boolean raw = true; + private List rawEntries = new ArrayList(); public Message(long id, List entries){ this.id = id; this.entries = entries == null ? new ArrayList() : entries; + this.raw = false; + } + + public Message(long id, boolean raw, List entries){ + this.id = id; + if (raw) { + this.rawEntries = entries == null ? new ArrayList() : entries; + } else { + this.entries = entries == null ? new ArrayList() : entries; + } } public Message(long id){ @@ -49,6 +65,26 @@ public void addEntry(CanalEntry.Entry entry) { this.entries.add(entry); } + public void setRawEntries(List rawEntries) { + this.rawEntries = rawEntries; + } + + public void addRawEntry(ByteString rawEntry) { + this.rawEntries.add(rawEntry); + } + + public List getRawEntries() { + return rawEntries; + } + + public boolean isRaw() { + return raw; + } + + public void setRaw(boolean raw) { + this.raw = raw; + } + public String toString() { return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE); } diff --git a/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java b/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java index 8bfa8fe6d5..722bb2bcdb 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java @@ -14,7 +14,6 @@ import com.alibaba.otter.canal.common.AbstractCanalLifeCycle; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; -import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.ClientIdentity; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.position.LogPosition; @@ -30,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MigrateMap; +import com.google.protobuf.ByteString; /** * 嵌入式版本实现 @@ -223,14 +223,14 @@ public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, T logger.debug("get successfully, clientId:{} batchSize:{} but result is null", clientIdentity.getClientId(), batchSize); - return new Message(-1, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 + return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 } else { // 记录到流式信息 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); - List entrys = Lists.transform(events.getEvents(), new Function() { + List entrys = Lists.transform(events.getEvents(), new Function() { - public Entry apply(Event input) { - return input.getEntry(); + public ByteString apply(Event input) { + return input.getRawEntry(); } }); if (logger.isInfoEnabled()) { @@ -243,7 +243,7 @@ public Entry apply(Event input) { } // 直接提交ack ack(clientIdentity, batchId); - return new Message(batchId, entrys); + return new Message(batchId, true, entrys); } } } @@ -302,14 +302,14 @@ public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", clientIdentity.getClientId(), batchSize); - return new Message(-1, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 + return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能 } else { // 记录到流式信息 Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange()); - List entrys = Lists.transform(events.getEvents(), new Function() { + List entrys = Lists.transform(events.getEvents(), new Function() { - public Entry apply(Event input) { - return input.getEntry(); + public ByteString apply(Event input) { + return input.getRawEntry(); } }); if (logger.isInfoEnabled()) { @@ -320,7 +320,7 @@ public Entry apply(Event input) { batchId, events.getPositionRange()); } - return new Message(batchId, entrys); + return new Message(batchId, true, entrys); } } diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java index 3eda5a0d7c..8f7b4278f6 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java @@ -135,9 +135,18 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder(); messageBuilder.setBatchId(message.getId()); - if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) { - for (Entry entry : message.getEntries()) { - messageBuilder.addMessages(entry.toByteString()); + if (message.getId() != -1) { + if (message.isRaw()) { + // for performance + if (!CollectionUtils.isEmpty(message.getRawEntries())) { + messageBuilder.addAllMessages(message.getRawEntries()); + } + } else { + if (!CollectionUtils.isEmpty(message.getEntries())) { + for (Entry entry : message.getEntries()) { + messageBuilder.addMessages(entry.toByteString()); + } + } } } packetBuilder.setBody(messageBuilder.build().toByteString()); diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java index bfb4b77190..6511e6ba86 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java @@ -92,11 +92,11 @@ private boolean sinkData(List entrys, InetSocketAddress remote boolean hasHeartBeat = false; List events = new ArrayList(); for (CanalEntry.Entry entry : entrys) { - Event event = new Event(new LogIdentity(remoteAddress, -1L), entry); - if (!doFilter(event)) { + if (!doFilter(entry)) { continue; } + Event event = new Event(new LogIdentity(remoteAddress, -1L), entry); events.add(event); hasRowData |= (entry.getEntryType() == EntryType.ROWDATA); hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT); @@ -111,7 +111,7 @@ private boolean sinkData(List entrys, InetSocketAddress remote } else { // 需要过滤的数据 if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) { - long currentTimestamp = events.get(0).getEntry().getHeader().getExecuteTime(); + long currentTimestamp = events.get(0).getExecuteTime(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval || lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) { @@ -126,15 +126,15 @@ private boolean sinkData(List entrys, InetSocketAddress remote } } - protected boolean doFilter(Event event) { - if (filter != null && event.getEntry().getEntryType() == EntryType.ROWDATA) { - String name = getSchemaNameAndTableName(event.getEntry()); + protected boolean doFilter(CanalEntry.Entry entry) { + if (filter != null && entry.getEntryType() == EntryType.ROWDATA) { + String name = getSchemaNameAndTableName(entry); boolean need = filter.filter(name); if (!need) { logger.debug("filter name[{}] entry : {}:{}", name, - event.getEntry().getHeader().getLogfileName(), - event.getEntry().getHeader().getLogfileOffset()); + entry.getHeader().getLogfileName(), + entry.getHeader().getLogfileOffset()); } return need; diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java index 9d7a987152..ec23aa86b5 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java @@ -18,7 +18,7 @@ public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHand public List before(List events) { boolean existHeartBeat = false; for (Event event : events) { - if (event.getEntry().getEntryType() == EntryType.HEARTBEAT) { + if (event.getEntryType() == EntryType.HEARTBEAT) { existHeartBeat = true; } } @@ -29,7 +29,7 @@ public List before(List events) { // 目前heartbeat和其他事件是分离的,保险一点还是做一下检查处理 List result = new ArrayList(); for (Event event : events) { - if (event.getEntry().getEntryType() != EntryType.HEARTBEAT) { + if (event.getEntryType() != EntryType.HEARTBEAT) { result.add(event); } } diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java index be9edfa0ad..24f457b7b8 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineBarrier.java @@ -139,7 +139,7 @@ private void single(long timestamp) throws InterruptedException { } private Long getTimestamp(Event event) { - return event.getEntry().getHeader().getExecuteTime(); + return event.getExecuteTime(); } } diff --git a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java index 774818f2cd..59bf8fd749 100644 --- a/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java +++ b/sink/src/main/java/com/alibaba/otter/canal/sink/entry/group/TimelineTransactionBarrier.java @@ -113,11 +113,11 @@ private void reset() { } private boolean isTransactionBegin(Event event) { - return event.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN; + return event.getEntryType() == EntryType.TRANSACTIONBEGIN; } private boolean isTransactionEnd(Event event) { - return event.getEntry().getEntryType() == EntryType.TRANSACTIONEND; + return event.getEntryType() == EntryType.TRANSACTIONEND; } } diff --git a/sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java b/sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java index 8a52bb05eb..7f786ef24a 100644 --- a/sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java +++ b/sink/src/test/java/com/alibaba/otter/canal/sink/stub/DummyEventStore.java @@ -60,33 +60,33 @@ public void cleanUntil(Position position) throws CanalStoreException { } public void put(Event data) throws InterruptedException, CanalStoreException { - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); } public boolean put(Event data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException { - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); return true; } public boolean tryPut(Event data) throws CanalStoreException { - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); return true; } public void put(List datas) throws InterruptedException, CanalStoreException { Event data = datas.get(0); - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); } public boolean put(List datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException { Event data = datas.get(0); - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); return true; } public boolean tryPut(List datas) throws CanalStoreException { Event data = datas.get(0); - System.out.println("time:" + data.getEntry().getHeader().getExecuteTime()); + System.out.println("time:" + data.getExecuteTime()); return true; } diff --git a/store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java b/store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java index ac858446ce..4fdbfcba56 100644 --- a/store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java +++ b/store/src/main/java/com/alibaba/otter/canal/store/helper/CanalEventUtils.java @@ -2,7 +2,6 @@ import org.apache.commons.lang.StringUtils; -import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.position.EntryPosition; import com.alibaba.otter.canal.protocol.position.LogPosition; import com.alibaba.otter.canal.store.model.Event; @@ -48,14 +47,13 @@ public static LogPosition min(LogPosition position1, LogPosition position2) { */ public static LogPosition createPosition(Event event) { EntryPosition position = new EntryPosition(); - position.setJournalName(event.getEntry().getHeader().getLogfileName()); - position.setPosition(event.getEntry().getHeader().getLogfileOffset()); - position.setTimestamp(event.getEntry().getHeader().getExecuteTime()); + position.setJournalName(event.getJournalName()); + position.setPosition(event.getPosition()); + position.setTimestamp(event.getExecuteTime()); // add serverId at 2016-06-28 - position.setServerId(event.getEntry().getHeader().getServerId()); - + position.setServerId(event.getServerId()); // add gtid - position.setGtid(event.getEntry().getHeader().getGtid()); + position.setGtid(event.getGtid()); LogPosition logPosition = new LogPosition(); logPosition.setPostion(position); @@ -68,9 +66,9 @@ public static LogPosition createPosition(Event event) { */ public static LogPosition createPosition(Event event, boolean included) { EntryPosition position = new EntryPosition(); - position.setJournalName(event.getEntry().getHeader().getLogfileName()); - position.setPosition(event.getEntry().getHeader().getLogfileOffset()); - position.setTimestamp(event.getEntry().getHeader().getExecuteTime()); + position.setJournalName(event.getJournalName()); + position.setPosition(event.getPosition()); + position.setTimestamp(event.getExecuteTime()); position.setIncluded(included); LogPosition logPosition = new LogPosition(); @@ -84,13 +82,14 @@ public static LogPosition createPosition(Event event, boolean included) { */ public static boolean checkPosition(Event event, LogPosition logPosition) { EntryPosition position = logPosition.getPostion(); - CanalEntry.Entry entry = event.getEntry(); - boolean result = position.getTimestamp().equals(entry.getHeader().getExecuteTime()); + boolean result = position.getTimestamp().equals(event.getExecuteTime()); boolean exactely = (StringUtils.isBlank(position.getJournalName()) && position.getPosition() == null); if (!exactely) {// 精确匹配 - result &= StringUtils.equals(entry.getHeader().getLogfileName(), position.getJournalName()); - result &= position.getPosition().equals(entry.getHeader().getLogfileOffset()); + result &= position.getPosition().equals(event.getPosition()); + if (result) {// short path + result &= StringUtils.equals(event.getJournalName(), position.getJournalName()); + } } return result; diff --git a/store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java b/store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java index 99995a8f80..075d152431 100644 --- a/store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java +++ b/store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java @@ -278,7 +278,7 @@ private Events doGet(Position start, int batchSize) throws CanalStoreExce // 提取数据并返回 for (; next <= end; next++) { Event event = entries[getIndex(next)]; - if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) { + if (ddlIsolation && isDdl(event.getEventType())) { // 如果是ddl隔离,直接返回 if (entrys.size() == 0) { entrys.add(event);// 如果没有DML事件,加入当前的DDL事件 @@ -297,7 +297,7 @@ private Events doGet(Position start, int batchSize) throws CanalStoreExce for (; memsize <= maxMemSize && next <= maxAbleSequence; next++) { // 永远保证可以取出第一条的记录,避免死锁 Event event = entries[getIndex(next)]; - if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) { + if (ddlIsolation && isDdl(event.getEventType())) { // 如果是ddl隔离,直接返回 if (entrys.size() == 0) { entrys.add(event);// 如果没有DML事件,加入当前的DDL事件 @@ -325,9 +325,8 @@ private Events doGet(Position start, int batchSize) throws CanalStoreExce for (int i = entrys.size() - 1; i >= 0; i--) { Event event = entrys.get(i); - if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntry().getEntryType() - || CanalEntry.EntryType.TRANSACTIONEND == event.getEntry().getEntryType() - || isDdl(event.getEntry().getHeader().getEventType())) { + if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntryType() + || CanalEntry.EntryType.TRANSACTIONEND == event.getEntryType() || isDdl(event.getEventType())) { // 将事务头/尾设置可被为ack的点 range.setAck(CanalEventUtils.createPosition(event)); break; @@ -532,7 +531,7 @@ private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) { private long calculateSize(Event event) { // 直接返回binlog中的事件大小 - return event.getEntry().getHeader().getEventLength(); + return event.getRawLength(); } private int getIndex(long sequcnce) { diff --git a/store/src/main/java/com/alibaba/otter/canal/store/model/Event.java b/store/src/main/java/com/alibaba/otter/canal/store/model/Event.java index c31d37f3e3..07e9fead34 100644 --- a/store/src/main/java/com/alibaba/otter/canal/store/model/Event.java +++ b/store/src/main/java/com/alibaba/otter/canal/store/model/Event.java @@ -6,7 +6,10 @@ import com.alibaba.otter.canal.common.utils.CanalToStringStyle; import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; +import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.position.LogIdentity; +import com.google.protobuf.ByteString; /** * store存储数据对象 @@ -18,14 +21,32 @@ public class Event implements Serializable { private static final long serialVersionUID = 1333330351758762739L; private LogIdentity logIdentity; // 记录数据产生的来源 - private CanalEntry.Entry entry; + private ByteString rawEntry; + + private long executeTime; + private EntryType entryType; + private String journalName; + private long position; + private long serverId; + private EventType eventType; + private String gtid; + private long rawLength; public Event(){ } public Event(LogIdentity logIdentity, CanalEntry.Entry entry){ this.logIdentity = logIdentity; - this.entry = entry; + this.entryType = entry.getEntryType(); + this.executeTime = entry.getHeader().getExecuteTime(); + this.journalName = entry.getHeader().getLogfileName(); + this.position = entry.getHeader().getLogfileOffset(); + this.serverId = entry.getHeader().getServerId(); + this.gtid = entry.getHeader().getGtid(); + this.eventType = entry.getHeader().getEventType(); + // build raw + this.rawEntry = entry.toByteString(); + this.rawLength = rawEntry.size(); } public LogIdentity getLogIdentity() { @@ -36,12 +57,76 @@ public void setLogIdentity(LogIdentity logIdentity) { this.logIdentity = logIdentity; } - public CanalEntry.Entry getEntry() { - return entry; + public ByteString getRawEntry() { + return rawEntry; + } + + public void setRawEntry(ByteString rawEntry) { + this.rawEntry = rawEntry; + } + + public long getExecuteTime() { + return executeTime; + } + + public void setExecuteTime(long executeTime) { + this.executeTime = executeTime; + } + + public EntryType getEntryType() { + return entryType; + } + + public void setEntryType(EntryType entryType) { + this.entryType = entryType; + } + + public String getJournalName() { + return journalName; + } + + public void setJournalName(String journalName) { + this.journalName = journalName; + } + + public long getPosition() { + return position; + } + + public void setPosition(long position) { + this.position = position; + } + + public long getServerId() { + return serverId; + } + + public void setServerId(long serverId) { + this.serverId = serverId; + } + + public String getGtid() { + return gtid; + } + + public void setGtid(String gtid) { + this.gtid = gtid; + } + + public long getRawLength() { + return rawLength; + } + + public void setRawLength(long rawLength) { + this.rawLength = rawLength; + } + + public EventType getEventType() { + return eventType; } - public void setEntry(CanalEntry.Entry entry) { - this.entry = entry; + public void setEventType(EventType eventType) { + this.eventType = eventType; } public String toString() { diff --git a/store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java b/store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java index 4209e4f137..9aa216f6f9 100644 --- a/store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java +++ b/store/src/test/java/com/alibaba/otter/cancel/store/memory/buffer/MemoryEventStoreMultiThreadTest.java @@ -161,13 +161,12 @@ public void run() { first = entrys.getPositionRange().getEnd(); for (Event event : entrys.getEvents()) { - this.result.add(event.getEntry().getHeader().getLogfileOffset()); + this.result.add(event.getPosition()); } emptyCount = 0; - System.out.println("offest : " - + entrys.getEvents().get(0).getEntry().getHeader().getLogfileOffset() - + " , count :" + entrys.getEvents().size()); + System.out.println("offest : " + entrys.getEvents().get(0).getPosition() + " , count :" + + entrys.getEvents().size()); ackCount++; if (ackCount == 1) { eventStore.cleanUntil(entrys.getPositionRange().getEnd());