diff --git a/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java b/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java index 23a10f01da..78d5b58617 100644 --- a/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java +++ b/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java @@ -8,15 +8,6 @@ import org.junit.Assert; import org.junit.Test; -import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent; -import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent; -import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent; -import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent; -import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent; -import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent; -import com.taobao.tddl.dbsync.binlog.event.XidLogEvent; -import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent; - public class DirectLogFetcherTest extends BaseLogFetcherTest { @Test @@ -24,56 +15,54 @@ public void testSimple() { DirectLogFetcher fecther = new DirectLogFetcher(); try { Class.forName("com.mysql.jdbc.Driver"); - Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello"); + Connection connection = DriverManager.getConnection("jdbc:mysql://100.81.154.142:3306", "root", "hello"); Statement statement = connection.createStatement(); statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'"); statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'"); - fecther.open(connection, "mysql-bin.000001", 4L, 2); + fecther.open(connection, "mysql-bin.000006", 120L, 2); - LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); + LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.UNKNOWN_EVENT); LogContext context = new LogContext(); while (fecther.fetch()) { - LogEvent event = null; - event = decoder.decode(fecther, context); - - if (event == null) { - continue; - // throw new RuntimeException("parse failed"); - } - - int eventType = event.getHeader().getType(); - switch (eventType) { - case LogEvent.ROTATE_EVENT: - binlogFileName = ((RotateLogEvent) event).getFilename(); - break; - case LogEvent.WRITE_ROWS_EVENT_V1: - case LogEvent.WRITE_ROWS_EVENT: - parseRowsEvent((WriteRowsLogEvent) event); - break; - case LogEvent.UPDATE_ROWS_EVENT_V1: - case LogEvent.UPDATE_ROWS_EVENT: - parseRowsEvent((UpdateRowsLogEvent) event); - break; - case LogEvent.DELETE_ROWS_EVENT_V1: - case LogEvent.DELETE_ROWS_EVENT: - parseRowsEvent((DeleteRowsLogEvent) event); - break; - case LogEvent.QUERY_EVENT: - parseQueryEvent((QueryLogEvent) event); - break; - case LogEvent.ROWS_QUERY_LOG_EVENT: - parseRowsQueryEvent((RowsQueryLogEvent) event); - break; - case LogEvent.ANNOTATE_ROWS_EVENT: - parseAnnotateRowsEvent((AnnotateRowsEvent) event); - break; - case LogEvent.XID_EVENT: - parseXidEvent((XidLogEvent) event); - break; - default: - break; - } + decoder.decode(fecther, context); + continue; + // if (event == null) { + // continue; + // } + // + // int eventType = event.getHeader().getType(); + // switch (eventType) { + // case LogEvent.ROTATE_EVENT: + // binlogFileName = ((RotateLogEvent) event).getFilename(); + // break; + // case LogEvent.WRITE_ROWS_EVENT_V1: + // case LogEvent.WRITE_ROWS_EVENT: + // parseRowsEvent((WriteRowsLogEvent) event); + // break; + // case LogEvent.UPDATE_ROWS_EVENT_V1: + // case LogEvent.UPDATE_ROWS_EVENT: + // parseRowsEvent((UpdateRowsLogEvent) event); + // break; + // case LogEvent.DELETE_ROWS_EVENT_V1: + // case LogEvent.DELETE_ROWS_EVENT: + // parseRowsEvent((DeleteRowsLogEvent) event); + // break; + // case LogEvent.QUERY_EVENT: + // parseQueryEvent((QueryLogEvent) event); + // break; + // case LogEvent.ROWS_QUERY_LOG_EVENT: + // parseRowsQueryEvent((RowsQueryLogEvent) event); + // break; + // case LogEvent.ANNOTATE_ROWS_EVENT: + // parseAnnotateRowsEvent((AnnotateRowsEvent) event); + // break; + // case LogEvent.XID_EVENT: + // parseXidEvent((XidLogEvent) event); + // break; + // default: + // break; + // } } } catch (Exception e) { e.printStackTrace(); diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java index 2ed318e8cb..f746189e96 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java @@ -1,5 +1,6 @@ package com.alibaba.otter.canal.parse.driver.mysql.socket; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,7 +25,7 @@ public class BioSocketChannel implements SocketChannel { BioSocketChannel(Socket socket) throws IOException{ this.socket = socket; - this.input = socket.getInputStream(); + this.input = new BufferedInputStream(socket.getInputStream(), 16384); this.output = socket.getOutputStream(); } @@ -164,5 +165,4 @@ public void close() { this.socket = null; } - } diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java index dcc17bad63..a268ff2eba 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannelPool.java @@ -16,6 +16,8 @@ public static BioSocketChannel open(SocketAddress address) throws Exception { socket.setKeepAlive(true); socket.setReuseAddress(true); socket.connect(address, BioSocketChannel.DEFAULT_CONNECT_TIMEOUT); + System.out.println(socket.getReceiveBufferSize()); + System.out.println(socket.getSendBufferSize()); return new BioSocketChannel(socket); } diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java index 6849c5b309..e4d8182329 100644 --- a/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java @@ -41,6 +41,9 @@ public void testSimple() { controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml"); controller.setEventFilter(new AviaterRegexFilter("test\\..*")); controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*")); + controller.setParallel(true); + controller.setParallelBufferSize(4096); + controller.setParallelThreadSize(16); controller.setIsGTIDMode(false); controller.setEventSink(new AbstractCanalEventSinkTest>() {