From 5582a38930814d30b147285a4074fe93c72849a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=83=E9=94=8B?= Date: Thu, 19 Jul 2018 13:35:38 +0800 Subject: [PATCH] =?UTF-8?q?fixed=20issue=20#726,=20=E4=BC=98=E5=8C=96Sessi?= =?UTF-8?q?onHandler=E9=87=8C=E7=9B=B4=E6=8E=A5=E6=8B=BCbyte[],=E7=BB=95?= =?UTF-8?q?=E8=BF=87protobuf=E7=9A=84=E5=A4=9A=E6=AC=A1=E6=8B=B7=E8=B4=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/impl/SimpleCanalConnector.java | 3 +- .../parse/inbound/AbstractEventParser.java | 9 +- .../parse/inbound/MultiStageCoprocessor.java | 4 +- .../inbound/mysql/LocalBinLogConnection.java | 6 +- .../parse/inbound/mysql/MysqlConnection.java | 8 +- .../mysql/MysqlMultiStageCoprocessor.java | 16 ++-- .../otter/canal/server/netty/NettyUtils.java | 2 +- .../server/netty/handler/SessionHandler.java | 65 +++++++++---- .../otter/canal/server/ProtocolTest.java | 92 +++++++++++++++++++ 9 files changed, 169 insertions(+), 36 deletions(-) create mode 100644 server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java diff --git a/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java b/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java index 508c01dc3a..0f29794d55 100644 --- a/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java +++ b/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java @@ -319,7 +319,8 @@ public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws } private Message receiveMessages() throws IOException { - Packet p = Packet.parseFrom(readNextPacket()); + byte[] data = readNextPacket(); + Packet p = Packet.parseFrom(data); switch (p.getType()) { case MESSAGES: { if (!p.getCompression().equals(Compression.NONE)) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java index 54bb14e758..c8cdb1bdbf 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java @@ -331,6 +331,11 @@ public void stop() { stopHeartBeat(); // 先停止心跳 parseThread.interrupt(); // 尝试中断 eventSink.interrupt(); + + if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) { + multiStageCoprocessor.stop(); + } + try { parseThread.join();// 等待其结束 } catch (InterruptedException e) { @@ -343,10 +348,6 @@ public void stop() { if (transactionBuffer.isStart()) { transactionBuffer.stop(); } - - if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) { - multiStageCoprocessor.stop(); - } } protected boolean consumeTheEventAndProfilingIfNecessary(List entrys) throws CanalSinkException, diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java index 1c5c495d58..96e063e927 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/MultiStageCoprocessor.java @@ -21,9 +21,9 @@ public interface MultiStageCoprocessor extends CanalLifeCycle { /** * 网络数据投递 */ - public void publish(LogBuffer buffer); + public boolean publish(LogBuffer buffer); - public void publish(LogBuffer buffer, String binlogFileName); + public boolean publish(LogBuffer buffer, String binlogFileName); public void reset(); } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java index f97c4ccd1f..6c59465b56 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java @@ -221,7 +221,11 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess while (fetcher.fetch()) { LogBuffer buffer = fetcher.duplicate(); fetcher.consume(fetcher.limit()); - coprocessor.publish(buffer, binlogfilename); // set filename + // set filename + if (!coprocessor.publish(buffer, binlogfilename)) { + needContinue = false; + break; + } } if (needContinue) {// 读取下一个 diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java index 46f7e300f9..d01a210e88 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java @@ -206,7 +206,9 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess while (fetcher.fetch()) { LogBuffer buffer = fetcher.duplicate(); fetcher.consume(fetcher.limit()); - coprocessor.publish(buffer); + if (!coprocessor.publish(buffer)) { + break; + } } } finally { fetcher.close(); @@ -230,7 +232,9 @@ public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOEx while (fetcher.fetch()) { LogBuffer buffer = fetcher.duplicate(); fetcher.consume(fetcher.limit()); - coprocessor.publish(buffer); + if (!coprocessor.publish(buffer)) { + break; + } } } finally { fetcher.close(); diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java index 76c70723ca..65904b3432 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java @@ -135,20 +135,22 @@ public void stop() { } catch (Throwable e) { // ignore } - disruptorMsgBuffer = null; super.stop(); } /** * 网络数据投递 */ - public void publish(LogBuffer buffer) { - publish(buffer, null); + public boolean publish(LogBuffer buffer) { + return publish(buffer, null); } - public void publish(LogBuffer buffer, String binlogFileName) { - if (!isStart() && exception != null) { - throw exception; + public boolean publish(LogBuffer buffer, String binlogFileName) { + if (!isStart()) { + if (exception != null) { + throw exception; + } + return false; } boolean interupted = false; @@ -172,6 +174,8 @@ public void publish(LogBuffer buffer, String binlogFileName) { if (exception != null) { throw exception; } + + return isStart(); } @Override diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java index 531c817fa7..22f45617ee 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java @@ -19,7 +19,7 @@ public class NettyUtils { private static final Logger logger = LoggerFactory.getLogger(NettyUtils.class); - private static int HEADER_LENGTH = 4; + public static int HEADER_LENGTH = 4; public static Timer hashedWheelTimer = new HashedWheelTimer(); public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) { diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java index 8f7b4278f6..8b94d452de 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java @@ -33,13 +33,10 @@ import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; import com.alibaba.otter.canal.server.netty.NettyUtils; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; -/** - * 处理具体的客户端请求 - * - * @author jianghang 2012-10-24 下午02:21:13 - * @version 1.0.0 - */ public class SessionHandler extends SimpleChannelHandler { private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class); @@ -130,27 +127,57 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } // } - Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); - packetBuilder.setType(PacketType.MESSAGES); + if (message.getId() != -1 && message.isRaw()) { + List rowEntries = message.getRawEntries(); + // message size + int messageSize = 0; + messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId()); - Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder(); - messageBuilder.setBatchId(message.getId()); - if (message.getId() != -1) { - if (message.isRaw()) { - // for performance - if (!CollectionUtils.isEmpty(message.getRawEntries())) { + int dataSize = 0; + for (int i = 0; i < rowEntries.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i)); + } + messageSize += dataSize; + messageSize += 1 * rowEntries.size(); + // packet size + int size = 0; + size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, + PacketType.MESSAGES.getNumber()); + size += com.google.protobuf.CodedOutputStream.computeTagSize(5) + + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + + messageSize; + // TODO recyle bytes[] + byte[] body = new byte[size]; + CodedOutputStream output = CodedOutputStream.newInstance(body); + output.writeEnum(3, PacketType.MESSAGES.getNumber()); + + output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED); + output.writeRawVarint32(messageSize); + // message + output.writeInt64(1, message.getId()); + for (int i = 0; i < rowEntries.size(); i++) { + output.writeBytes(2, rowEntries.get(i)); + } + output.checkNoSpaceLeft(); + NettyUtils.write(ctx.getChannel(), body, null); + } else { + Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); + packetBuilder.setType(PacketType.MESSAGES); + + Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder(); + messageBuilder.setBatchId(message.getId()); + if (message.getId() != -1) { + if (message.isRaw() && !CollectionUtils.isEmpty(message.getRawEntries())) { messageBuilder.addAllMessages(message.getRawEntries()); - } - } else { - if (!CollectionUtils.isEmpty(message.getEntries())) { + } else if (!CollectionUtils.isEmpty(message.getEntries())) { for (Entry entry : message.getEntries()) { messageBuilder.addMessages(entry.toByteString()); } } } + packetBuilder.setBody(messageBuilder.build().toByteString()); + NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据 } - packetBuilder.setBody(messageBuilder.build().toByteString()); - NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据 } else { NettyUtils.error(401, MessageFormatter.format("destination or clientId is null", get.toString()).getMessage(), diff --git a/server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java b/server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java new file mode 100644 index 0000000000..cb078ddffb --- /dev/null +++ b/server/src/test/java/com/alibaba/otter/canal/server/ProtocolTest.java @@ -0,0 +1,92 @@ +package com.alibaba.otter.canal.server; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +import com.alibaba.otter.canal.protocol.CanalEntry.Entry; +import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; +import com.alibaba.otter.canal.protocol.CanalEntry.Header; +import com.alibaba.otter.canal.protocol.CanalPacket.Compression; +import com.alibaba.otter.canal.protocol.CanalPacket.Messages; +import com.alibaba.otter.canal.protocol.CanalPacket.Packet; +import com.alibaba.otter.canal.protocol.CanalPacket.PacketType; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; + +public class ProtocolTest { + + @Test + public void testSimple() throws IOException { + Header.Builder headerBuilder = Header.newBuilder(); + headerBuilder.setLogfileName("mysql-bin.000001"); + headerBuilder.setLogfileOffset(1024); + headerBuilder.setExecuteTime(1024); + Entry.Builder entryBuilder = Entry.newBuilder(); + entryBuilder.setHeader(headerBuilder.build()); + entryBuilder.setEntryType(EntryType.ROWDATA); + Entry entry = entryBuilder.build(); + Message message = new Message(3, true, Arrays.asList(entry.toByteString())); + + byte[] body = buildData(message); + Packet packet = Packet.parseFrom(body); + switch (packet.getType()) { + case MESSAGES: { + if (!packet.getCompression().equals(Compression.NONE)) { + throw new CanalClientException("compression is not supported in this connector"); + } + + Messages messages = Messages.parseFrom(packet.getBody()); + Message result = new Message(messages.getBatchId()); + for (ByteString byteString : messages.getMessagesList()) { + result.addEntry(Entry.parseFrom(byteString)); + } + + System.out.println(result); + break; + } + default: { + throw new CanalClientException("unexpected packet type: " + packet.getType()); + } + } + } + + private byte[] buildData(Message message) throws IOException { + List rowEntries = message.getRawEntries(); + // message size + int messageSize = 0; + messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId()); + + int dataSize = 0; + for (int i = 0; i < rowEntries.size(); i++) { + dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i)); + } + messageSize += dataSize; + messageSize += 1 * rowEntries.size(); + // packet size + int size = 0; + size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber()); + size += com.google.protobuf.CodedOutputStream.computeTagSize(5) + + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize; + // TODO recyle bytes[] + byte[] body = new byte[size]; + CodedOutputStream output = CodedOutputStream.newInstance(body); + output.writeEnum(3, PacketType.MESSAGES.getNumber()); + + output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED); + output.writeRawVarint32(messageSize); + // message + output.writeInt64(1, message.getId()); + for (int i = 0; i < rowEntries.size(); i++) { + output.writeBytes(2, rowEntries.get(i)); + } + output.checkNoSpaceLeft(); + + return body; + } +}