Skip to content

Commit

Permalink
fixed issue #726 , 优化完毕
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 20, 2018
1 parent b4c6490 commit fe38bc1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,61 @@
import org.junit.Assert;
import org.junit.Test;

import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;

public class DirectLogFetcherTest extends BaseLogFetcherTest {

@Test
public void testSimple() {
DirectLogFetcher fecther = new DirectLogFetcher();
try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");
Connection connection = DriverManager.getConnection("jdbc:mysql://100.81.154.142:3306", "root", "hello");
Statement statement = connection.createStatement();
statement.execute("SET @master_binlog_checksum='@@global.binlog_checksum'");
statement.execute("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");

fecther.open(connection, "mysql-bin.000001", 4L, 2);
fecther.open(connection, "mysql-bin.000006", 120L, 2);

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.UNKNOWN_EVENT);
LogContext context = new LogContext();
while (fecther.fetch()) {
LogEvent event = null;
event = decoder.decode(fecther, context);

if (event == null) {
continue;
// throw new RuntimeException("parse failed");
}

int eventType = event.getHeader().getType();
switch (eventType) {
case LogEvent.ROTATE_EVENT:
binlogFileName = ((RotateLogEvent) event).getFilename();
break;
case LogEvent.WRITE_ROWS_EVENT_V1:
case LogEvent.WRITE_ROWS_EVENT:
parseRowsEvent((WriteRowsLogEvent) event);
break;
case LogEvent.UPDATE_ROWS_EVENT_V1:
case LogEvent.UPDATE_ROWS_EVENT:
parseRowsEvent((UpdateRowsLogEvent) event);
break;
case LogEvent.DELETE_ROWS_EVENT_V1:
case LogEvent.DELETE_ROWS_EVENT:
parseRowsEvent((DeleteRowsLogEvent) event);
break;
case LogEvent.QUERY_EVENT:
parseQueryEvent((QueryLogEvent) event);
break;
case LogEvent.ROWS_QUERY_LOG_EVENT:
parseRowsQueryEvent((RowsQueryLogEvent) event);
break;
case LogEvent.ANNOTATE_ROWS_EVENT:
parseAnnotateRowsEvent((AnnotateRowsEvent) event);
break;
case LogEvent.XID_EVENT:
parseXidEvent((XidLogEvent) event);
break;
default:
break;
}
decoder.decode(fecther, context);
continue;
// if (event == null) {
// continue;
// }
//
// int eventType = event.getHeader().getType();
// switch (eventType) {
// case LogEvent.ROTATE_EVENT:
// binlogFileName = ((RotateLogEvent) event).getFilename();
// break;
// case LogEvent.WRITE_ROWS_EVENT_V1:
// case LogEvent.WRITE_ROWS_EVENT:
// parseRowsEvent((WriteRowsLogEvent) event);
// break;
// case LogEvent.UPDATE_ROWS_EVENT_V1:
// case LogEvent.UPDATE_ROWS_EVENT:
// parseRowsEvent((UpdateRowsLogEvent) event);
// break;
// case LogEvent.DELETE_ROWS_EVENT_V1:
// case LogEvent.DELETE_ROWS_EVENT:
// parseRowsEvent((DeleteRowsLogEvent) event);
// break;
// case LogEvent.QUERY_EVENT:
// parseQueryEvent((QueryLogEvent) event);
// break;
// case LogEvent.ROWS_QUERY_LOG_EVENT:
// parseRowsQueryEvent((RowsQueryLogEvent) event);
// break;
// case LogEvent.ANNOTATE_ROWS_EVENT:
// parseAnnotateRowsEvent((AnnotateRowsEvent) event);
// break;
// case LogEvent.XID_EVENT:
// parseXidEvent((XidLogEvent) event);
// break;
// default:
// break;
// }
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.alibaba.otter.canal.parse.driver.mysql.socket;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -24,7 +25,7 @@ public class BioSocketChannel implements SocketChannel {

BioSocketChannel(Socket socket) throws IOException{
this.socket = socket;
this.input = socket.getInputStream();
this.input = new BufferedInputStream(socket.getInputStream(), 16384);
this.output = socket.getOutputStream();
}

Expand Down Expand Up @@ -164,5 +165,4 @@ public void close() {
this.socket = null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public static BioSocketChannel open(SocketAddress address) throws Exception {
socket.setKeepAlive(true);
socket.setReuseAddress(true);
socket.connect(address, BioSocketChannel.DEFAULT_CONNECT_TIMEOUT);
System.out.println(socket.getReceiveBufferSize());
System.out.println(socket.getSendBufferSize());
return new BioSocketChannel(socket);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public void testSimple() {
controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
controller.setParallel(true);
controller.setParallelBufferSize(4096);
controller.setParallelThreadSize(16);
controller.setIsGTIDMode(false);
controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {

Expand Down

0 comments on commit fe38bc1

Please sign in to comment.