diff --git a/dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java b/dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java new file mode 100644 index 0000000000..03fedccd7e --- /dev/null +++ b/dbsync/src/test/java/com/taobao/tddl/dbsync/FetcherPerformanceTest.java @@ -0,0 +1,57 @@ +package com.taobao.tddl.dbsync; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicLong; + +import com.taobao.tddl.dbsync.binlog.DirectLogFetcher; +import com.taobao.tddl.dbsync.binlog.LogContext; +import com.taobao.tddl.dbsync.binlog.LogDecoder; +import com.taobao.tddl.dbsync.binlog.LogEvent; + +public class FetcherPerformanceTest { + + public static void main(String args[]) { + DirectLogFetcher fetcher = new DirectLogFetcher(); + try { + Class.forName("com.mysql.jdbc.Driver"); + Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", + "root", + "hello"); + Statement statement = connection.createStatement(); + statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'"); + statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'"); + + fetcher.open(connection, "mysql-bin.000006", 120L, 2); + + AtomicLong sum = new AtomicLong(0); + long start = System.currentTimeMillis(); + long last = 0; + long end = 0; + + LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); + LogContext context = new LogContext(); + while (fetcher.fetch()) { + decoder.decode(fetcher, context); + sum.incrementAndGet(); + long current = sum.get(); + if (current - last >= 100000) { + end = System.currentTimeMillis(); + long tps = ((current - last) * 1000) / (end - start); + System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps); + last = current; + start = end; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + fetcher.close(); + } catch (IOException e) { + } + } + } +} diff --git a/example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientPermanceTest.java b/example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientPermanceTest.java new file mode 100644 index 0000000000..c58eb869f4 --- /dev/null +++ b/example/src/main/java/com/alibaba/otter/canal/example/SimpleCanalClientPermanceTest.java @@ -0,0 +1,68 @@ +package com.alibaba.otter.canal.example; +import java.net.InetSocketAddress; +import java.util.concurrent.ArrayBlockingQueue; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.client.impl.SimpleCanalConnector; +import com.alibaba.otter.canal.protocol.Message; + +public class SimpleCanalClientPermanceTest { + + public static void main(String args[]) { + String destination = "example"; + String ip = "127.0.0.1"; + int batchSize = 1024; + int count = 0; + int sum = 0; + int perSum = 0; + long start = System.currentTimeMillis(); + long end = 0; + final ArrayBlockingQueue queue = new ArrayBlockingQueue(100); + try { + final CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111), + destination, + "", + ""); + + Thread ackThread = new Thread(new Runnable() { + + @Override + public void run() { + while (true) { + try { + long batchId = queue.take(); + connector.ack(batchId); + } catch (InterruptedException e) { + } + } + } + }); + ackThread.start(); + + ((SimpleCanalConnector) connector).setLazyParseEntry(true); + connector.connect(); + connector.subscribe(); + while (true) { + Message message = connector.getWithoutAck(batchSize); + long batchId = message.getId(); + int size = message.getRawEntries().size(); + sum += size; + perSum += size; + count++; + queue.add(batchId); + if (count % 10 == 0) { + end = System.currentTimeMillis(); + long tps = (perSum * 1000) / (end - start); + System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start) + + " , tps : " + tps); + start = end; + perSum = 0; + } + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + +} diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java new file mode 100644 index 0000000000..8ca9b0447e --- /dev/null +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java @@ -0,0 +1,89 @@ +package com.alibaba.otter.canal.parse; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import com.alibaba.otter.canal.common.AbstractCanalLifeCycle; +import com.alibaba.otter.canal.parse.exception.CanalParseException; +import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; +import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager; +import com.alibaba.otter.canal.parse.support.AuthenticationInfo; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.position.EntryPosition; +import com.alibaba.otter.canal.protocol.position.LogPosition; +import com.alibaba.otter.canal.sink.CanalEventSink; +import com.alibaba.otter.canal.sink.exception.CanalSinkException; + +public class MysqlBinlogDumpPerformanceTest { + + public static void main(String args[]) { + final MysqlEventParser controller = new MysqlEventParser(); + final EntryPosition startPosition = new EntryPosition("mysql-bin.001699", 120L, 100L); + controller.setConnectionCharset(Charset.forName("UTF-8")); + controller.setSlaveId(3344L); + controller.setDetectingEnable(false); + controller.setFilterQueryDml(true); + controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3328), "root", "hello")); + controller.setMasterPosition(startPosition); + controller.setEnableTsdb(false); + controller.setDestination("example"); + controller.setTsdbSpringXml("classpath:spring/tsdb/h2-tsdb.xml"); + // controller.setEventFilter(new AviaterRegexFilter("test\\..*")); + // controller.setEventBlackFilter(new + // AviaterRegexFilter("canal_tsdb\\..*")); + controller.setParallel(true); + controller.setParallelBufferSize(256); + controller.setParallelThreadSize(16); + controller.setIsGTIDMode(false); + final AtomicLong sum = new AtomicLong(0); + final AtomicLong last = new AtomicLong(0); + final AtomicLong start = new AtomicLong(System.currentTimeMillis()); + final AtomicLong end = new AtomicLong(0); + controller.setEventSink(new AbstractCanalEventSinkTest>() { + + public boolean sink(List entrys, InetSocketAddress remoteAddress, String destination) + throws CanalSinkException, + InterruptedException { + + sum.addAndGet(entrys.size()); + long current = sum.get(); + if (current - last.get() >= 100000) { + end.set(System.currentTimeMillis()); + long tps = ((current - last.get()) * 1000) / (end.get() - start.get()); + System.out.println(" total : " + sum + " , cost : " + (end.get() - start.get()) + " , tps : " + tps); + last.set(current); + start.set(end.get()); + } + return true; + } + + }); + controller.setLogPositionManager(new AbstractLogPositionManager() { + + @Override + public LogPosition getLatestIndexBy(String destination) { + return null; + } + + @Override + public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException { + } + }); + + controller.start(); + + try { + Thread.sleep(100 * 1000 * 1000L); + } catch (InterruptedException e) { + } + controller.stop(); + } + + public static abstract class AbstractCanalEventSinkTest extends AbstractCanalLifeCycle implements CanalEventSink { + + public void interrupt() { + } + } +} diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java new file mode 100644 index 0000000000..e92fecb525 --- /dev/null +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogEventPerformanceTest.java @@ -0,0 +1,82 @@ +package com.alibaba.otter.canal.parse; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicLong; + +import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector; +import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor; +import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket; +import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket; +import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager; +import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher; +import com.taobao.tddl.dbsync.binlog.LogContext; +import com.taobao.tddl.dbsync.binlog.LogDecoder; +import com.taobao.tddl.dbsync.binlog.LogEvent; + +public class MysqlBinlogEventPerformanceTest { + + protected static Charset charset = Charset.forName("utf-8"); + + public static void main(String args[]) { + DirectLogFetcher fetcher = new DirectLogFetcher(); + try { + MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"); + connector.connect(); + updateSettings(connector); + sendBinlogDump(connector, "mysql-bin.000006", 120L, 3); + fetcher.start(connector.getChannel()); + LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); + LogContext context = new LogContext(); + AtomicLong sum = new AtomicLong(0); + long start = System.currentTimeMillis(); + long last = 0; + long end = 0; + while (fetcher.fetch()) { + decoder.decode(fetcher, context); + sum.incrementAndGet(); + long current = sum.get(); + if (current - last >= 100000) { + end = System.currentTimeMillis(); + long tps = ((current - last) * 1000) / (end - start); + System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps); + last = current; + start = end; + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + fetcher.close(); + } catch (IOException e) { + } + } + } + + private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId) + throws IOException { + BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket(); + binlogDumpCmd.binlogFileName = binlogfilename; + binlogDumpCmd.binlogPosition = binlogPosition; + binlogDumpCmd.slaveServerId = slaveId; + byte[] cmdBody = binlogDumpCmd.toBytes(); + + HeaderPacket binlogDumpHeader = new HeaderPacket(); + binlogDumpHeader.setPacketBodyLength(cmdBody.length); + binlogDumpHeader.setPacketSequenceNumber((byte) 0x00); + PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody); + } + + private static void updateSettings(MysqlConnector connector) throws IOException { + update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector); + update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector); + } + + public static void update(String cmd, MysqlConnector connector) throws IOException { + MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector); + exector.update(cmd); + } + +} diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java new file mode 100644 index 0000000000..79b8837538 --- /dev/null +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogParsePerformanceTest.java @@ -0,0 +1,192 @@ +package com.alibaba.otter.canal.parse; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.BitSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector; +import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor; +import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket; +import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket; +import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager; +import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher; +import com.taobao.tddl.dbsync.binlog.LogBuffer; +import com.taobao.tddl.dbsync.binlog.LogContext; +import com.taobao.tddl.dbsync.binlog.LogDecoder; +import com.taobao.tddl.dbsync.binlog.LogEvent; +import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent; +import com.taobao.tddl.dbsync.binlog.event.RowsLogBuffer; +import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent; +import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent; +import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent.ColumnInfo; +import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent; +import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent; + +public class MysqlBinlogParsePerformanceTest { + + protected static Charset charset = Charset.forName("utf-8"); + + public static void main(String args[]) { + DirectLogFetcher fetcher = new DirectLogFetcher(); + try { + MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello"); + connector.connect(); + updateSettings(connector); + sendBinlogDump(connector, "mysql-bin.000006", 120L, 3); + fetcher.start(connector.getChannel()); + final BlockingQueue buffer = new ArrayBlockingQueue(1024); + Thread thread = new Thread(new Runnable() { + + @Override + public void run() { + try { + consumer(buffer); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + thread.start(); + + while (fetcher.fetch()) { + buffer.put(fetcher.duplicate()); + fetcher.consume(fetcher.limit()); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + fetcher.close(); + } catch (IOException e) { + } + } + } + + public static void consumer(BlockingQueue buffer) throws IOException, InterruptedException { + LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); + LogContext context = new LogContext(); + + AtomicLong sum = new AtomicLong(0); + long start = System.currentTimeMillis(); + long last = 0; + long end = 0; + while (true) { + LogEvent event = null; + event = decoder.decode(buffer.take(), context); + int eventType = event.getHeader().getType(); + switch (eventType) { + case LogEvent.ROTATE_EVENT: + break; + case LogEvent.WRITE_ROWS_EVENT_V1: + case LogEvent.WRITE_ROWS_EVENT: + parseRowsEvent((WriteRowsLogEvent) event, sum); + break; + case LogEvent.UPDATE_ROWS_EVENT_V1: + case LogEvent.UPDATE_ROWS_EVENT: + parseRowsEvent((UpdateRowsLogEvent) event, sum); + break; + case LogEvent.DELETE_ROWS_EVENT_V1: + case LogEvent.DELETE_ROWS_EVENT: + parseRowsEvent((DeleteRowsLogEvent) event, sum); + break; + case LogEvent.XID_EVENT: + sum.incrementAndGet(); + break; + case LogEvent.QUERY_EVENT: + sum.incrementAndGet(); + break; + default: + break; + } + + long current = sum.get(); + if (current - last >= 100000) { + end = System.currentTimeMillis(); + long tps = ((current - last) * 1000) / (end - start); + System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps); + last = current; + start = end; + } + } + } + + private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId) + throws IOException { + BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket(); + binlogDumpCmd.binlogFileName = binlogfilename; + binlogDumpCmd.binlogPosition = binlogPosition; + binlogDumpCmd.slaveServerId = slaveId; + byte[] cmdBody = binlogDumpCmd.toBytes(); + + HeaderPacket binlogDumpHeader = new HeaderPacket(); + binlogDumpHeader.setPacketBodyLength(cmdBody.length); + binlogDumpHeader.setPacketSequenceNumber((byte) 0x00); + PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody); + } + + private static void updateSettings(MysqlConnector connector) throws IOException { + update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector); + update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector); + } + + public static void update(String cmd, MysqlConnector connector) throws IOException { + MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector); + exector.update(cmd); + } + + public static void parseRowsEvent(RowsLogEvent event, AtomicLong sum) { + try { + RowsLogBuffer buffer = event.getRowsBuf(charset.name()); + BitSet columns = event.getColumns(); + BitSet changeColumns = event.getChangeColumns(); + while (buffer.nextOneRow(columns)) { + int type = event.getHeader().getType(); + if (LogEvent.WRITE_ROWS_EVENT_V1 == type || LogEvent.WRITE_ROWS_EVENT == type) { + parseOneRow(event, buffer, columns, true); + } else if (LogEvent.DELETE_ROWS_EVENT_V1 == type || LogEvent.DELETE_ROWS_EVENT == type) { + parseOneRow(event, buffer, columns, false); + } else { + parseOneRow(event, buffer, columns, false); + if (!buffer.nextOneRow(changeColumns)) { + break; + } + parseOneRow(event, buffer, changeColumns, true); + } + + sum.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException("parse row data failed.", e); + } + } + + public static void parseOneRow(RowsLogEvent event, RowsLogBuffer buffer, BitSet cols, boolean isAfter) + throws UnsupportedEncodingException { + TableMapLogEvent map = event.getTable(); + if (map == null) { + throw new RuntimeException("not found TableMap with tid=" + event.getTableId()); + } + + final int columnCnt = map.getColumnCnt(); + final ColumnInfo[] columnInfo = map.getColumnInfo(); + for (int i = 0; i < columnCnt; i++) { + if (!cols.get(i)) { + continue; + } + + ColumnInfo info = columnInfo[i]; + buffer.nextValue(info.type, info.meta); + if (buffer.isNull()) { + } else { + buffer.getValue(); + } + } + } +}