Skip to content

Commit

Permalink
fixed issue #726 , 提交测试用例
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 20, 2018
1 parent fe38bc1 commit 77c1c65
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.taobao.tddl.dbsync;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicLong;

import com.taobao.tddl.dbsync.binlog.DirectLogFetcher;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;

public class FetcherPerformanceTest {

public static void main(String args[]) {
DirectLogFetcher fetcher = new DirectLogFetcher();
try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306",
"root",
"hello");
Statement statement = connection.createStatement();
statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

fetcher.open(connection, "mysql-bin.000006", 120L, 2);

AtomicLong sum = new AtomicLong(0);
long start = System.currentTimeMillis();
long last = 0;
long end = 0;

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
while (fetcher.fetch()) {
decoder.decode(fetcher, context);
sum.incrementAndGet();
long current = sum.get();
if (current - last >= 100000) {
end = System.currentTimeMillis();
long tps = ((current - last) * 1000) / (end - start);
System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
last = current;
start = end;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fetcher.close();
} catch (IOException e) {
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.protocol.Message;

public class SimpleCanalClientPermanceTest {

public static void main(String args[]) {
String destination = "example";
String ip = "127.0.0.1";
int batchSize = 1024;
int count = 0;
int sum = 0;
int perSum = 0;
long start = System.currentTimeMillis();
long end = 0;
final ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(100);
try {
final CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
destination,
"",
"");

Thread ackThread = new Thread(new Runnable() {

@Override
public void run() {
while (true) {
try {
long batchId = queue.take();
connector.ack(batchId);
} catch (InterruptedException e) {
}
}
}
});
ackThread.start();

((SimpleCanalConnector) connector).setLazyParseEntry(true);
connector.connect();
connector.subscribe();
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getRawEntries().size();
sum += size;
perSum += size;
count++;
queue.add(batchId);
if (count % 10 == 0) {
end = System.currentTimeMillis();
long tps = (perSum * 1000) / (end - start);
System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
+ " , tps : " + tps);
start = end;
perSum = 0;
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.alibaba.otter.canal.parse;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.index.AbstractLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;

public class MysqlBinlogDumpPerformanceTest {

public static void main(String args[]) {
final MysqlEventParser controller = new MysqlEventParser();
final EntryPosition startPosition = new EntryPosition("mysql-bin.001699", 120L, 100L);
controller.setConnectionCharset(Charset.forName("UTF-8"));
controller.setSlaveId(3344L);
controller.setDetectingEnable(false);
controller.setFilterQueryDml(true);
controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3328), "root", "hello"));
controller.setMasterPosition(startPosition);
controller.setEnableTsdb(false);
controller.setDestination("example");
controller.setTsdbSpringXml("classpath:spring/tsdb/h2-tsdb.xml");
// controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
// controller.setEventBlackFilter(new
// AviaterRegexFilter("canal_tsdb\\..*"));
controller.setParallel(true);
controller.setParallelBufferSize(256);
controller.setParallelThreadSize(16);
controller.setIsGTIDMode(false);
final AtomicLong sum = new AtomicLong(0);
final AtomicLong last = new AtomicLong(0);
final AtomicLong start = new AtomicLong(System.currentTimeMillis());
final AtomicLong end = new AtomicLong(0);
controller.setEventSink(new AbstractCanalEventSinkTest<List<CanalEntry.Entry>>() {

public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination)
throws CanalSinkException,
InterruptedException {

sum.addAndGet(entrys.size());
long current = sum.get();
if (current - last.get() >= 100000) {
end.set(System.currentTimeMillis());
long tps = ((current - last.get()) * 1000) / (end.get() - start.get());
System.out.println(" total : " + sum + " , cost : " + (end.get() - start.get()) + " , tps : " + tps);
last.set(current);
start.set(end.get());
}
return true;
}

});
controller.setLogPositionManager(new AbstractLogPositionManager() {

@Override
public LogPosition getLatestIndexBy(String destination) {
return null;
}

@Override
public void persistLogPosition(String destination, LogPosition logPosition) throws CanalParseException {
}
});

controller.start();

try {
Thread.sleep(100 * 1000 * 1000L);
} catch (InterruptedException e) {
}
controller.stop();
}

public static abstract class AbstractCanalEventSinkTest<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {

public void interrupt() {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.alibaba.otter.canal.parse;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector;
import com.alibaba.otter.canal.parse.driver.mysql.MysqlUpdateExecutor;
import com.alibaba.otter.canal.parse.driver.mysql.packets.HeaderPacket;
import com.alibaba.otter.canal.parse.driver.mysql.packets.client.BinlogDumpCommandPacket;
import com.alibaba.otter.canal.parse.driver.mysql.utils.PacketManager;
import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher;
import com.taobao.tddl.dbsync.binlog.LogContext;
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;

public class MysqlBinlogEventPerformanceTest {

protected static Charset charset = Charset.forName("utf-8");

public static void main(String args[]) {
DirectLogFetcher fetcher = new DirectLogFetcher();
try {
MysqlConnector connector = new MysqlConnector(new InetSocketAddress("127.0.0.1", 3306), "root", "hello");
connector.connect();
updateSettings(connector);
sendBinlogDump(connector, "mysql-bin.000006", 120L, 3);
fetcher.start(connector.getChannel());
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
AtomicLong sum = new AtomicLong(0);
long start = System.currentTimeMillis();
long last = 0;
long end = 0;
while (fetcher.fetch()) {
decoder.decode(fetcher, context);
sum.incrementAndGet();
long current = sum.get();
if (current - last >= 100000) {
end = System.currentTimeMillis();
long tps = ((current - last) * 1000) / (end - start);
System.out.println(" total : " + sum + " , cost : " + (end - start) + " , tps : " + tps);
last = current;
start = end;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fetcher.close();
} catch (IOException e) {
}
}
}

private static void sendBinlogDump(MysqlConnector connector, String binlogfilename, Long binlogPosition, int slaveId)
throws IOException {
BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
binlogDumpCmd.binlogFileName = binlogfilename;
binlogDumpCmd.binlogPosition = binlogPosition;
binlogDumpCmd.slaveServerId = slaveId;
byte[] cmdBody = binlogDumpCmd.toBytes();

HeaderPacket binlogDumpHeader = new HeaderPacket();
binlogDumpHeader.setPacketBodyLength(cmdBody.length);
binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
}

private static void updateSettings(MysqlConnector connector) throws IOException {
update("set @master_binlog_checksum= '@@global.binlog_checksum'", connector);
update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'", connector);
}

public static void update(String cmd, MysqlConnector connector) throws IOException {
MysqlUpdateExecutor exector = new MysqlUpdateExecutor(connector);
exector.update(cmd);
}

}
Loading

0 comments on commit 77c1c65

Please sign in to comment.