Skip to content

Commit

Permalink
fixed issue #726 , 优化SessionHandler的传输处理,提前序列化Entry
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 18, 2018
1 parent 8e856ac commit 10bb800
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class SimpleCanalConnector implements CanalConnector {
private volatile boolean connected = false; // 代表connected是否已正常执行,因为有HA,不代表在工作中
private boolean rollbackOnConnect = true; // 是否在connect链接成功后,自动执行rollback操作
private boolean rollbackOnDisConnect = false; // 是否在connect链接成功后,自动执行rollback操作

private boolean lazyParseEntry = false; // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析
// 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
private Object readDataLock = new Object();
private Object writeDataLock = new Object();
Expand Down Expand Up @@ -328,8 +328,13 @@ private Message receiveMessages() throws IOException {

Messages messages = Messages.parseFrom(p.getBody());
Message result = new Message(messages.getBatchId());
for (ByteString byteString : messages.getMessagesList()) {
result.addEntry(Entry.parseFrom(byteString));
if (lazyParseEntry) {
// byteString
result.setRawEntries(messages.getMessagesList());
} else {
for (ByteString byteString : messages.getMessagesList()) {
result.addEntry(Entry.parseFrom(byteString));
}
}
return result;
}
Expand Down Expand Up @@ -538,6 +543,14 @@ public void setFilter(String filter) {
this.filter = filter;
}

public boolean isLazyParseEntry() {
return lazyParseEntry;
}

public void setLazyParseEntry(boolean lazyParseEntry) {
this.lazyParseEntry = lazyParseEntry;
}

public void stopRunning() {
if (running) {
running = false; // 设置为非running状态
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

import org.apache.commons.lang.StringUtils;

Expand All @@ -19,6 +20,7 @@
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
Expand Down Expand Up @@ -141,24 +143,35 @@ public void stop() {
* 网络数据投递
*/
public void publish(LogBuffer buffer) {
if (!isStart() && exception != null) {
throw exception;
}
long next = disruptorMsgBuffer.next();
MessageEvent event = disruptorMsgBuffer.get(next);
event.setBuffer(buffer);
disruptorMsgBuffer.publish(next);
publish(buffer, null);
}

public void publish(LogBuffer buffer, String binlogFileName) {
if (!isStart() && exception != null) {
throw exception;
}
long next = disruptorMsgBuffer.next();
MessageEvent event = disruptorMsgBuffer.get(next);
event.setBuffer(buffer);
event.setBinlogFileName(binlogFileName);
disruptorMsgBuffer.publish(next);

boolean interupted = false;
do {
try {
long next = disruptorMsgBuffer.tryNext();
MessageEvent event = disruptorMsgBuffer.get(next);
event.setBuffer(buffer);
if (binlogFileName != null) {
event.setBinlogFileName(binlogFileName);
}
disruptorMsgBuffer.publish(next);
break;
} catch (InsufficientCapacityException e) {
// park
LockSupport.parkNanos(1L);
interupted = Thread.interrupted();
}
} while (!interupted && isStart());

if (exception != null) {
throw exception;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public boolean init(final String destination) {

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "[scheduler-table-meta-snapshot]");
Thread thread = new Thread(r, "[scheduler-table-meta-snapshot]");
thread.setDaemon(true);
return thread;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public TableMeta find(String schema, String table) {
tableMeta = tableMetas.get(keys);
if (tableMeta == null) {
Schema schemaRep = repository.findSchema(schema);
if (schema == null) {
if (schemaRep == null) {
return null;
}
SchemaObject data = schemaRep.findTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.store.CanalEventStore;
Expand All @@ -18,7 +17,7 @@
public class DummyEventStore implements CanalEventStore<Event> {

private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String messgae = "{0} [{1}:{2}:{3}] {4} {5}.{6}";
private static final String messgae = "{0} [{1}:{2}:{3}]";

public void ack(Position position) throws CanalStoreException {

Expand Down Expand Up @@ -82,46 +81,40 @@ public boolean tryPut(Event data) throws CanalStoreException {

public void put(List<Event> datas) throws InterruptedException, CanalStoreException {
for (Event data : datas) {
CanalEntry.Header header = data.getEntry().getHeader();
Date date = new Date(header.getExecuteTime());
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
|| data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntry().getEntryType());
System.out.println(data.getEntryType());

} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
header.getSchemaName(), header.getTableName() }));
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
}

public boolean put(List<Event> datas, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
for (Event data : datas) {
CanalEntry.Header header = data.getEntry().getHeader();
Date date = new Date(header.getExecuteTime());
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
|| data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntry().getEntryType());
System.out.println(data.getEntryType());

} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
header.getSchemaName(), header.getTableName() }));
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}
}
return true;
Expand All @@ -131,23 +124,20 @@ public boolean tryPut(List<Event> datas) throws CanalStoreException {
System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
for (Event data : datas) {

CanalEntry.Header header = data.getEntry().getHeader();
Date date = new Date(header.getExecuteTime());
Date date = new Date(data.getExecuteTime());
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
if (data.getEntry().getEntryType() == EntryType.TRANSACTIONBEGIN
|| data.getEntry().getEntryType() == EntryType.TRANSACTIONEND) {
if (data.getEntryType() == EntryType.TRANSACTIONBEGIN || data.getEntryType() == EntryType.TRANSACTIONEND) {
// System.out.println(MessageFormat.format(messgae, new Object[]
// { Thread.currentThread().getName(),
// header.getLogfilename(), header.getLogfileoffset(),
// format.format(date),
// data.getEntry().getEntryType(), "" }));
System.out.println(data.getEntry().getEntryType());
System.out.println(data.getEntryType());

} else {
System.out.println(MessageFormat.format(messgae,
new Object[] { Thread.currentThread().getName(), header.getLogfileName(),
String.valueOf(header.getLogfileOffset()), format.format(date), header.getEventType(),
header.getSchemaName(), header.getTableName() }));
new Object[] { Thread.currentThread().getName(), data.getJournalName(),
String.valueOf(data.getPosition()), format.format(date) }));
}

}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
<dependency>
<groupId>com.alibaba.fastsql</groupId>
<artifactId>fastsql</artifactId>
<version>2.0.0_preview_371</version>
<version>2.0.0_preview_520</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.alibaba.otter.canal.common.utils.CanalToStringStyle;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.google.protobuf.ByteString;

/**
* @author zebin.xuzb @ 2012-6-19
Expand All @@ -18,11 +19,26 @@ public class Message implements Serializable {
private static final long serialVersionUID = 1234034768477580009L;

private long id;
@Deprecated
private List<CanalEntry.Entry> entries = new ArrayList<CanalEntry.Entry>();
// row data for performance, see:
// https://github.com/alibaba/canal/issues/726
private boolean raw = true;
private List<ByteString> rawEntries = new ArrayList<ByteString>();

public Message(long id, List<Entry> entries){
this.id = id;
this.entries = entries == null ? new ArrayList<Entry>() : entries;
this.raw = false;
}

public Message(long id, boolean raw, List entries){
this.id = id;
if (raw) {
this.rawEntries = entries == null ? new ArrayList<ByteString>() : entries;
} else {
this.entries = entries == null ? new ArrayList<Entry>() : entries;
}
}

public Message(long id){
Expand All @@ -49,6 +65,26 @@ public void addEntry(CanalEntry.Entry entry) {
this.entries.add(entry);
}

public void setRawEntries(List<ByteString> rawEntries) {
this.rawEntries = rawEntries;
}

public void addRawEntry(ByteString rawEntry) {
this.rawEntries.add(rawEntry);
}

public List<ByteString> getRawEntries() {
return rawEntries;
}

public boolean isRaw() {
return raw;
}

public void setRaw(boolean raw) {
this.raw = raw;
}

public String toString() {
return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.position.LogPosition;
Expand All @@ -30,6 +29,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MigrateMap;
import com.google.protobuf.ByteString;

/**
* 嵌入式版本实现
Expand Down Expand Up @@ -223,14 +223,14 @@ public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, T
logger.debug("get successfully, clientId:{} batchSize:{} but result is null",
clientIdentity.getClientId(),
batchSize);
return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
} else {
// 记录到流式信息
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {

public Entry apply(Event input) {
return input.getEntry();
public ByteString apply(Event input) {
return input.getRawEntry();
}
});
if (logger.isInfoEnabled()) {
Expand All @@ -243,7 +243,7 @@ public Entry apply(Event input) {
}
// 直接提交ack
ack(clientIdentity, batchId);
return new Message(batchId, entrys);
return new Message(batchId, true, entrys);
}
}
}
Expand Down Expand Up @@ -302,14 +302,14 @@ public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long
logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null",
clientIdentity.getClientId(),
batchSize);
return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能
return new Message(-1, true, new ArrayList()); // 返回空包,避免生成batchId,浪费性能
} else {
// 记录到流式信息
Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {
List<ByteString> entrys = Lists.transform(events.getEvents(), new Function<Event, ByteString>() {

public Entry apply(Event input) {
return input.getEntry();
public ByteString apply(Event input) {
return input.getRawEntry();
}
});
if (logger.isInfoEnabled()) {
Expand All @@ -320,7 +320,7 @@ public Entry apply(Event input) {
batchId,
events.getPositionRange());
}
return new Message(batchId, entrys);
return new Message(batchId, true, entrys);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,18 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex

Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
messageBuilder.setBatchId(message.getId());
if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {
for (Entry entry : message.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
if (message.getId() != -1) {
if (message.isRaw()) {
// for performance
if (!CollectionUtils.isEmpty(message.getRawEntries())) {
messageBuilder.addAllMessages(message.getRawEntries());
}
} else {
if (!CollectionUtils.isEmpty(message.getEntries())) {
for (Entry entry : message.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
}
}
}
}
packetBuilder.setBody(messageBuilder.build().toByteString());
Expand Down
Loading

0 comments on commit 10bb800

Please sign in to comment.