Skip to content

Commit

Permalink
fixed issue #726, 优化SessionHandler里直接拼byte[],绕过protobuf的多次拷贝
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 19, 2018
1 parent c6eee47 commit 5582a38
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws
}

private Message receiveMessages() throws IOException {
Packet p = Packet.parseFrom(readNextPacket());
byte[] data = readNextPacket();
Packet p = Packet.parseFrom(data);
switch (p.getType()) {
case MESSAGES: {
if (!p.getCompression().equals(Compression.NONE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public void stop() {
stopHeartBeat(); // 先停止心跳
parseThread.interrupt(); // 尝试中断
eventSink.interrupt();

if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
multiStageCoprocessor.stop();
}

try {
parseThread.join();// 等待其结束
} catch (InterruptedException e) {
Expand All @@ -343,10 +348,6 @@ public void stop() {
if (transactionBuffer.isStart()) {
transactionBuffer.stop();
}

if (multiStageCoprocessor != null && multiStageCoprocessor.isStart()) {
multiStageCoprocessor.stop();
}
}

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ public interface MultiStageCoprocessor extends CanalLifeCycle {
/**
* 网络数据投递
*/
public void publish(LogBuffer buffer);
public boolean publish(LogBuffer buffer);

public void publish(LogBuffer buffer, String binlogFileName);
public boolean publish(LogBuffer buffer, String binlogFileName);

public void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess
while (fetcher.fetch()) {
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
coprocessor.publish(buffer, binlogfilename); // set filename
// set filename
if (!coprocessor.publish(buffer, binlogfilename)) {
needContinue = false;
break;
}
}

if (needContinue) {// 读取下一个
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocess
while (fetcher.fetch()) {
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
coprocessor.publish(buffer);
if (!coprocessor.publish(buffer)) {
break;
}
}
} finally {
fetcher.close();
Expand All @@ -230,7 +232,9 @@ public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOEx
while (fetcher.fetch()) {
LogBuffer buffer = fetcher.duplicate();
fetcher.consume(fetcher.limit());
coprocessor.publish(buffer);
if (!coprocessor.publish(buffer)) {
break;
}
}
} finally {
fetcher.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,22 @@ public void stop() {
} catch (Throwable e) {
// ignore
}
disruptorMsgBuffer = null;
super.stop();
}

/**
* 网络数据投递
*/
public void publish(LogBuffer buffer) {
publish(buffer, null);
public boolean publish(LogBuffer buffer) {
return publish(buffer, null);
}

public void publish(LogBuffer buffer, String binlogFileName) {
if (!isStart() && exception != null) {
throw exception;
public boolean publish(LogBuffer buffer, String binlogFileName) {
if (!isStart()) {
if (exception != null) {
throw exception;
}
return false;
}

boolean interupted = false;
Expand All @@ -172,6 +174,8 @@ public void publish(LogBuffer buffer, String binlogFileName) {
if (exception != null) {
throw exception;
}

return isStart();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class NettyUtils {

private static final Logger logger = LoggerFactory.getLogger(NettyUtils.class);
private static int HEADER_LENGTH = 4;
public static int HEADER_LENGTH = 4;
public static Timer hashedWheelTimer = new HashedWheelTimer();

public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.NettyUtils;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;

/**
* 处理具体的客户端请求
*
* @author jianghang 2012-10-24 下午02:21:13
* @version 1.0.0
*/
public class SessionHandler extends SimpleChannelHandler {

private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class);
Expand Down Expand Up @@ -130,27 +127,57 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
}
// }

Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
packetBuilder.setType(PacketType.MESSAGES);
if (message.getId() != -1 && message.isRaw()) {
List<ByteString> rowEntries = message.getRawEntries();
// message size
int messageSize = 0;
messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());

Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
messageBuilder.setBatchId(message.getId());
if (message.getId() != -1) {
if (message.isRaw()) {
// for performance
if (!CollectionUtils.isEmpty(message.getRawEntries())) {
int dataSize = 0;
for (int i = 0; i < rowEntries.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
}
messageSize += dataSize;
messageSize += 1 * rowEntries.size();
// packet size
int size = 0;
size += com.google.protobuf.CodedOutputStream.computeEnumSize(3,
PacketType.MESSAGES.getNumber());
size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+ com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
+ messageSize;
// TODO recyle bytes[]
byte[] body = new byte[size];
CodedOutputStream output = CodedOutputStream.newInstance(body);
output.writeEnum(3, PacketType.MESSAGES.getNumber());

output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
output.writeRawVarint32(messageSize);
// message
output.writeInt64(1, message.getId());
for (int i = 0; i < rowEntries.size(); i++) {
output.writeBytes(2, rowEntries.get(i));
}
output.checkNoSpaceLeft();
NettyUtils.write(ctx.getChannel(), body, null);
} else {
Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
packetBuilder.setType(PacketType.MESSAGES);

Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
messageBuilder.setBatchId(message.getId());
if (message.getId() != -1) {
if (message.isRaw() && !CollectionUtils.isEmpty(message.getRawEntries())) {
messageBuilder.addAllMessages(message.getRawEntries());
}
} else {
if (!CollectionUtils.isEmpty(message.getEntries())) {
} else if (!CollectionUtils.isEmpty(message.getEntries())) {
for (Entry entry : message.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
}
}
}
packetBuilder.setBody(messageBuilder.build().toByteString());
NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
}
packetBuilder.setBody(messageBuilder.build().toByteString());
NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);// 输出数据
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", get.toString()).getMessage(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.alibaba.otter.canal.server;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.junit.Test;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalPacket.Compression;
import com.alibaba.otter.canal.protocol.CanalPacket.Messages;
import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
import com.alibaba.otter.canal.protocol.CanalPacket.PacketType;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;

public class ProtocolTest {

@Test
public void testSimple() throws IOException {
Header.Builder headerBuilder = Header.newBuilder();
headerBuilder.setLogfileName("mysql-bin.000001");
headerBuilder.setLogfileOffset(1024);
headerBuilder.setExecuteTime(1024);
Entry.Builder entryBuilder = Entry.newBuilder();
entryBuilder.setHeader(headerBuilder.build());
entryBuilder.setEntryType(EntryType.ROWDATA);
Entry entry = entryBuilder.build();
Message message = new Message(3, true, Arrays.asList(entry.toByteString()));

byte[] body = buildData(message);
Packet packet = Packet.parseFrom(body);
switch (packet.getType()) {
case MESSAGES: {
if (!packet.getCompression().equals(Compression.NONE)) {
throw new CanalClientException("compression is not supported in this connector");
}

Messages messages = Messages.parseFrom(packet.getBody());
Message result = new Message(messages.getBatchId());
for (ByteString byteString : messages.getMessagesList()) {
result.addEntry(Entry.parseFrom(byteString));
}

System.out.println(result);
break;
}
default: {
throw new CanalClientException("unexpected packet type: " + packet.getType());
}
}
}

private byte[] buildData(Message message) throws IOException {
List<ByteString> rowEntries = message.getRawEntries();
// message size
int messageSize = 0;
messageSize += com.google.protobuf.CodedOutputStream.computeInt64Size(1, message.getId());

int dataSize = 0;
for (int i = 0; i < rowEntries.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream.computeBytesSizeNoTag(rowEntries.get(i));
}
messageSize += dataSize;
messageSize += 1 * rowEntries.size();
// packet size
int size = 0;
size += com.google.protobuf.CodedOutputStream.computeEnumSize(3, PacketType.MESSAGES.getNumber());
size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+ com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize;
// TODO recyle bytes[]
byte[] body = new byte[size];
CodedOutputStream output = CodedOutputStream.newInstance(body);
output.writeEnum(3, PacketType.MESSAGES.getNumber());

output.writeTag(5, WireFormat.WIRETYPE_LENGTH_DELIMITED);
output.writeRawVarint32(messageSize);
// message
output.writeInt64(1, message.getId());
for (int i = 0; i < rowEntries.size(); i++) {
output.writeBytes(2, rowEntries.get(i));
}
output.checkNoSpaceLeft();

return body;
}
}

0 comments on commit 5582a38

Please sign in to comment.