Skip to content

Commit

Permalink
Merge pull request #2 from alibaba/master
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
Wu-Jianqiang committed Feb 23, 2018
2 parents 0b82ee8 + 0af789b commit 863a61d
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 76 deletions.
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<links>
<link>https://github.com/alibaba/canal</link>
</links>
<outputDirectory>${project.build.directory}/apidocs/apidocs/${pom.version}</outputDirectory>
<outputDirectory>${project.build.directory}/apidocs/apidocs/${project.version}</outputDirectory>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.alibaba.otter.canal.common.CanalException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
Expand Down Expand Up @@ -91,18 +92,25 @@ public void init() {
processStart();
}

public void start() {
public synchronized void start() {
super.start();
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
try {
processStart();
if (zkClient != null) {
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);

initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
initRunning();
} else {
processActiveEnter();// 没有zk,直接启动
}
} catch (Exception e) {
logger.error("start failed", e);
// 没有正常启动,重置一下状态,避免干扰下一次start
stop();
}

}

public void release() {
Expand All @@ -113,7 +121,7 @@ public void release() {
}
}

public void stop() {
public synchronized void stop() {
super.stop();

if (zkClient != null) {
Expand Down Expand Up @@ -234,11 +242,7 @@ private void processStop() {

private void processActiveEnter() {
if (listener != null) {
try {
listener.processActiveEnter();
} catch (Exception e) {
logger.error("processActiveEnter failed", e);
}
listener.processActiveEnter();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class MysqlConnector {
// mysql connectinnId
private long connectionId = -1;
private AtomicBoolean connected = new AtomicBoolean(false);
public static final int timeout = 3000; // 3s

public static final int timeout = 5 * 1000; // 5s

public MysqlConnector(){
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
public class SocketChannel {

private static final int period = 10;
private Channel channel = null;
private Object lock = new Object();
private ByteBuf cache = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024); // 缓存大小
Expand All @@ -31,16 +32,16 @@ public void writeCache(ByteBuf buf) throws InterruptedException {
synchronized (lock) {
while (true) {
cache.discardReadBytes();// 回收内存
//source buffer is empty.
// source buffer is empty.
if (!buf.isReadable()) {
break;
}

if (cache.isWritable()) {
cache.writeBytes(buf, Math.min(cache.writableBytes(), buf.readableBytes()));
} else {
//dest buffer is full.
lock.wait(100);
// dest buffer is full.
lock.wait(period);
}
}
}
Expand All @@ -62,7 +63,7 @@ public byte[] read(int readSize) throws IOException {
}
synchronized (this) {
try {
wait(100);
wait(period);
} catch (InterruptedException e) {
throw new java.nio.channels.ClosedByInterruptException();
}
Expand All @@ -76,7 +77,7 @@ public byte[] read(int readSize) throws IOException {
}
} while (true);
}

public byte[] read(int readSize, int timeout) throws IOException {
int accumulatedWaitTime = 0;
do {
Expand All @@ -85,14 +86,14 @@ public byte[] read(int readSize, int timeout) throws IOException {
throw new IOException("socket has Interrupted !");
}

accumulatedWaitTime += 100;
accumulatedWaitTime += period;
if (accumulatedWaitTime > timeout) {
throw new IOException("socket read timeout occured !");
}

synchronized (this) {
try {
wait(100);
wait(period);
} catch (InterruptedException e) {
throw new IOException("socket has Interrupted !");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;

import java.io.IOException;
import java.net.SocketAddress;
Expand Down Expand Up @@ -103,7 +102,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//need output error for troubeshooting.
// need output error for troubeshooting.
logger.error("business error.", cause);
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public void uncaughtException(Thread t, Throwable e) {
context_format += "****************************************************" + SEP;

row_format = SEP
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , delay : {}ms"
+ SEP;

transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;
transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , delay : {}ms" + SEP;

}

Expand Down Expand Up @@ -165,6 +165,8 @@ protected void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = new Date().getTime() - executeTime;
Date date = new Date(entry.getHeader().getExecuteTime());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
Expand All @@ -178,7 +180,8 @@ protected void printEntry(List<Entry> entrys) {
logger.info(transaction_format,
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
String.valueOf(delayTime) });
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
Expand All @@ -193,7 +196,8 @@ protected void printEntry(List<Entry> entrys) {
logger.info(transaction_format,
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
String.valueOf(delayTime) });
}

continue;
Expand All @@ -213,7 +217,8 @@ protected void printEntry(List<Entry> entrys) {
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
String.valueOf(delayTime) });

if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import java.util.ArrayList;
import java.util.List;

import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
import org.apache.commons.lang.StringUtils;

import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;

/**
* 描述数据meta对象,mysql binlog中对应的{@linkplain TableMapLogEvent}包含的信息不全
*
Expand Down Expand Up @@ -127,6 +128,7 @@ public FieldMeta(String columnName, String columnType, boolean nullable, boolean
private boolean key;
private String defaultValue;
private String extra;
private boolean unique;

public String getColumnName() {
return columnName;
Expand Down Expand Up @@ -180,10 +182,21 @@ public void setExtra(String extra) {
this.extra = extra;
}

public boolean isUnique() {
return unique;
}

public void setUnique(boolean unique) {
this.unique = unique;
}

@Override
public String toString() {
return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", defaultValue="
+ defaultValue + ", nullable=" + nullable + ", key=" + key + "]";
return "FieldMeta [columnName=" + columnName + ", columnType=" + columnType + ", nullable=" + nullable
+ ", key=" + key + ", defaultValue=" + defaultValue + ", extra=" + extra + ", unique=" + unique
+ "]";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand All @@ -28,6 +29,8 @@
import com.taobao.tddl.dbsync.binlog.LogDecoder;
import com.taobao.tddl.dbsync.binlog.LogEvent;

import static com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.MASTER_HEARTBEAT_PERIOD_SECONDS;

public class MysqlConnection implements ErosaConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
Expand Down Expand Up @@ -293,6 +296,13 @@ private void updateSettings() throws IOException {
} catch (Exception e) {
logger.warn("update mariadb_slave_capability failed", e);
}

long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
try {
update("SET @master_heartbeat_period=" + periodNano);
} catch (Exception e) {
logger.warn("update master_heartbeat_period failed", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public class DirectLogFetcher extends LogFetcher {

protected static final Logger logger = LoggerFactory.getLogger(DirectLogFetcher.class);

// Master heartbeat interval
public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
// +1s 确保 timeout > heartbeat interval
private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 1) * 1000;

/** Command to dump binlog */
public static final byte COM_BINLOG_DUMP = 18;

Expand Down Expand Up @@ -166,7 +171,7 @@ public boolean fetch() throws IOException {
private final boolean fetch0(final int off, final int len) throws IOException {
ensureCapacity(off + len);

byte[] read = channel.read(len);
byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
System.arraycopy(read, 0, this.buffer, off, len);

if (limit < off + len) limit = off + len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static List<FieldMeta> parserTableMeta(ResultSetPacket packet) {
* size),
"YES"));
meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size)));
// 特殊处理引号
meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues()
.get(nameMaps.get(COLUMN_DEFAULT) + i * size)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ private boolean compareTableMeta(TableMeta source, TableMeta target) {
return false;
}

if (sourceField.isKey() != targetField.isKey()) {
// mysql会有一种处理,针对show create只有uk没有pk时,会在desc默认将uk当做pk
boolean isSourcePkOrUk = sourceField.isKey() || sourceField.isUnique();
boolean isTargetPkOrUk = targetField.isKey() || targetField.isUnique();
if (isSourcePkOrUk != isTargetPkOrUk) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.alibaba.druid.sql.ast.statement.SQLColumnConstraint;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.ast.statement.SQLColumnPrimaryKey;
import com.alibaba.druid.sql.ast.statement.SQLColumnUniqueKey;
import com.alibaba.druid.sql.ast.statement.SQLCreateTableStatement;
import com.alibaba.druid.sql.ast.statement.SQLNotNullConstraint;
import com.alibaba.druid.sql.ast.statement.SQLNullConstraint;
import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem;
import com.alibaba.druid.sql.ast.statement.SQLTableElement;
import com.alibaba.druid.sql.dialect.mysql.ast.MySqlPrimaryKey;
import com.alibaba.druid.sql.dialect.mysql.ast.MySqlUnique;
import com.alibaba.druid.sql.repository.Schema;
import com.alibaba.druid.sql.repository.SchemaObject;
import com.alibaba.druid.sql.repository.SchemaRepository;
Expand Down Expand Up @@ -204,6 +206,8 @@ private void processTableElement(SQLTableElement element, TableMeta tableMeta) {
fieldMeta.setNullable(true);
} else if (constraint instanceof SQLColumnPrimaryKey) {
fieldMeta.setKey(true);
} else if (constraint instanceof SQLColumnUniqueKey) {
fieldMeta.setUnique(true);
}
}
tableMeta.addFieldMeta(fieldMeta);
Expand All @@ -215,6 +219,14 @@ private void processTableElement(SQLTableElement element, TableMeta tableMeta) {
FieldMeta field = tableMeta.getFieldMetaByName(name);
field.setKey(true);
}
} else if (element instanceof MySqlUnique) {
MySqlUnique column = (MySqlUnique) element;
List<SQLSelectOrderByItem> uks = column.getColumns();
for (SQLSelectOrderByItem uk : uks) {
String name = getSqlName(uk.getExpr());
FieldMeta field = tableMeta.getFieldMetaByName(name);
field.setUnique(true);
}
}
}

Expand Down
Loading

0 comments on commit 863a61d

Please sign in to comment.