Skip to content

Commit

Permalink
fixed issue #726 , 多线程解析binlog DML事件,提升性能
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 4, 2018
1 parent f0bed4c commit da8bf59
Show file tree
Hide file tree
Showing 24 changed files with 879 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public final void clearAllTables() {

public void reset() {
formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;

mapOfTable.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException
/* Decoding binary-log to event */
event = decode(buffer, header, context);
if (event != null) {
// set logFileName
event.getHeader().setLogFileName(context.getLogPosition().getFileName());
event.setSemival(buffer.semival);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public final class LogHeader {
*/
protected long crc; // ha_checksum

/**
* binlog fileName
*/
protected String logFileName;

/* for Start_event_v3 */
public LogHeader(final int type){
this.type = type;
Expand Down Expand Up @@ -270,6 +275,14 @@ public int getChecksumAlg() {
return checksumAlg;
}

public String getLogFileName() {
return logFileName;
}

public void setLogFileName(String logFileName) {
this.logFileName = logFileName;
}

private void processCheckSum(LogBuffer buffer) {
if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
crc = buffer.getUint32(eventLen - LogEvent.BINLOG_CHECKSUM_LEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public final BitSet getChangeColumns() {
}

public final RowsLogBuffer getRowsBuf(String charsetName) {
return new RowsLogBuffer(rowsBuf.duplicate(), columnLen, charsetName);
return new RowsLogBuffer(rowsBuf, columnLen, charsetName);
}

public final int getFlags(final int flags) {
Expand Down
5 changes: 5 additions & 0 deletions deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256

#################################################
######### destinations #############
#################################################
Expand Down
5 changes: 5 additions & 0 deletions deployer/src/main/resources/spring/default-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,10 @@

<!--是否启用GTID模式-->
<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>
</beans>
5 changes: 5 additions & 0 deletions deployer/src/main/resources/spring/file-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,10 @@

<!--是否启用GTID模式-->
<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>
</beans>
10 changes: 10 additions & 0 deletions deployer/src/main/resources/spring/group-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>

<bean id="eventParser2" class="com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser">
Expand Down Expand Up @@ -260,5 +265,10 @@
<property name="filterTableError" value="${canal.instance.filter.table.error:false}" />
<property name="supportBinlogFormats" value="${canal.instance.binlog.format}" />
<property name="supportBinlogImages" value="${canal.instance.binlog.image}" />

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>
</beans>
5 changes: 5 additions & 0 deletions deployer/src/main/resources/spring/local-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,10 @@
<!--表结构相关-->
<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>
</beans>
5 changes: 5 additions & 0 deletions deployer/src/main/resources/spring/memory-instance.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,10 @@

<!--是否启用GTID模式-->
<property name="isGTIDMode" value="${canal.instance.gtidon:false}"/>

<!-- parallel parser -->
<property name="parallel" value="${canal.instance.parser.parallel:true}" />
<property name="parallelThreadSize" value="${canal.instance.parser.parallelThreadSize:16}" />
<property name="parallelBufferSize" value="${canal.instance.parser.parallelBufferSize:256}" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,37 @@ public byte[] read(int readSize, int timeout) throws IOException {
return data;
}

@Override
public void read(byte[] data, int off, int len, int timeout) throws IOException {
InputStream input = this.input;
int accTimeout = 0;
if (input == null) {
throw new SocketException("Socket already closed.");
}

int n = 0;
while (n < len && accTimeout < timeout) {
try {
int read = input.read(data, off + n, len - n);
if (read > -1) {
n += read;
} else {
throw new IOException("EOF encountered.");
}
} catch (SocketTimeoutException te) {
if (Thread.interrupted()) {
throw new ClosedByInterruptException();
}
accTimeout += SO_TIMEOUT;
}
}

if (n < len && accTimeout >= timeout) {
throw new SocketTimeoutException("Timeout occurred, failed to read " + len + " bytes in " + timeout
+ " milliseconds.");
}
}

public boolean isConnected() {
Socket socket = this.socket;
if (socket != null) {
Expand Down Expand Up @@ -133,4 +164,5 @@ public void close() {
this.socket = null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.IOException;
import java.net.SocketAddress;

import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -202,6 +203,11 @@ public byte[] read(int readSize, int timeout) throws IOException {
} while (true);
}

@Override
public void read(byte[] data, int off, int len, int timeout) throws IOException {
throw new NotImplementedException();
}

public boolean isConnected() {
return channel != null ? true : false;
}
Expand All @@ -224,4 +230,5 @@ public void close() {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public interface SocketChannel {

public byte[] read(int readSize, int timeout) throws IOException;

public void read(byte[] data, int off, int len, int timeout) throws IOException;

public boolean isConnected();

public SocketAddress getRemoteSocketAddress();
Expand Down
4 changes: 4 additions & 0 deletions parse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<!-- test dependency -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,18 @@ public void uncaughtException(Thread t,
protected Throwable exception = null;

protected boolean isGTIDMode = false; // 是否是GTID模式
protected boolean parallel = true; // 是否开启并行解析模式
protected int parallelThreadSize = Runtime.getRuntime()
.availableProcessors() * 60 / 100; // 60%的能力跑解析,剩余部分处理网络
protected int parallelBufferSize = 16 * parallelThreadSize;
protected MultiStageCoprocessor multiStageCoprocessor;

protected abstract BinlogParser buildParser();

protected abstract ErosaConnection buildErosaConnection();

protected abstract MultiStageCoprocessor buildMultiStageCoprocessor();

protected abstract EntryPosition findStartPosition(ErosaConnection connection) throws IOException;

protected void preDump(ErosaConnection connection) {
Expand Down Expand Up @@ -217,20 +224,39 @@ public boolean sink(EVENT event) {
};

// 4. 开始dump数据
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
if (isGTIDMode()) {
erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
if (parallel) {
// build stage processor
multiStageCoprocessor = buildMultiStageCoprocessor();
multiStageCoprocessor.start();

if (isGTIDMode()) {
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), multiStageCoprocessor);
} else {
if (StringUtils.isEmpty(startPosition.getJournalName())
&& startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), multiStageCoprocessor);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
multiStageCoprocessor);
}
}
} else {
if (StringUtils.isEmpty(startPosition.getJournalName())
&& startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
if (isGTIDMode()) {
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
if (StringUtils.isEmpty(startPosition.getJournalName())
&& startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
}
}
}

} catch (TableIdNotFoundException e) {
exception = e;
// 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
Expand Down Expand Up @@ -276,6 +302,9 @@ public boolean sink(EVENT event) {
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位
if (multiStageCoprocessor != null) {
multiStageCoprocessor.reset();
}

if (running) {
// sleep一段时间再进行重试
Expand Down Expand Up @@ -314,6 +343,10 @@ public void stop() {
if (transactionBuffer.isStart()) {
transactionBuffer.stop();
}

if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
multiStageCoprocessor.stop();
}
}

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
Expand Down Expand Up @@ -549,4 +582,29 @@ public boolean isGTIDMode() {
public void setIsGTIDMode(boolean isGTIDMode) {
this.isGTIDMode = isGTIDMode;
}

public boolean isParallel() {
return parallel;
}

public void setParallel(boolean parallel) {
this.parallel = parallel;
}

public int getParallelThreadSize() {
return parallelThreadSize;
}

public void setParallelThreadSize(int parallelThreadSize) {
this.parallelThreadSize = parallelThreadSize;
}

public int getParallelBufferSize() {
return parallelBufferSize;
}

public void setParallelBufferSize(int parallelBufferSize) {
this.parallelBufferSize = parallelBufferSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ public interface ErosaConnection {

/**
* 通过GTID同步binlog
*
* @param gtidSet
* @param func
* @throws IOException
*/
public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException;

// -------------

public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;

public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException;

public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;

ErosaConnection fork();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.alibaba.otter.canal.parse.inbound;

import com.alibaba.otter.canal.common.CanalLifeCycle;
import com.taobao.tddl.dbsync.binlog.LogBuffer;

/**
* 针对解析器提供一个多阶段协同的处理
*
* <pre>
* 1. 网络接收 (单线程)
* 2. 事件基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
* 3. 事件深度解析 (多线程, DML事件数据的完整解析)
* 4. 投递到store (单线程)
* </pre>
*
* @author agapple 2018年7月3日 下午4:54:17
* @since 1.0.26
*/
public interface MultiStageCoprocessor extends CanalLifeCycle {

/**
* 网络数据投递
*/
public void publish(LogBuffer buffer);

public void publish(LogBuffer buffer, String binlogFileName);

public void reset();
}
Loading

0 comments on commit da8bf59

Please sign in to comment.