From b19eaeb89a90b980e550e98b38e89efdd5ead222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=83=E9=94=8B?= Date: Thu, 19 Jul 2018 17:55:53 +0800 Subject: [PATCH] =?UTF-8?q?fixed=20issue=20#726=20,=20=E5=8D=95=E9=93=BE?= =?UTF-8?q?=E6=8E=A5=E5=A4=8D=E7=94=A8ByteBuffer=E6=95=B0=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../otter/canal/server/netty/NettyUtils.java | 18 ++++++++++++++++ .../server/netty/handler/SessionHandler.java | 21 +++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java index 22f45617ee..20532400e6 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/NettyUtils.java @@ -2,8 +2,12 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.buffer.CompositeChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.Channels; @@ -22,6 +26,20 @@ public class NettyUtils { public static int HEADER_LENGTH = 4; public static Timer hashedWheelTimer = new HashedWheelTimer(); + public static void write(Channel channel, ByteBuffer body, ChannelFutureListener channelFutureListner) { + byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.limit()).array(); + List components = new ArrayList(2); + components.add(ChannelBuffers.wrappedBuffer(ByteOrder.BIG_ENDIAN, header)); + components.add(ChannelBuffers.wrappedBuffer(body)); + + if (channelFutureListner == null) { + Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components)); + } else { + Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components)) + .addListener(channelFutureListner); + } + } + public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) { byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array(); if (channelFutureListner == null) { diff --git a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java index 8b94d452de..11bba04317 100644 --- a/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java +++ b/server/src/main/java/com/alibaba/otter/canal/server/netty/handler/SessionHandler.java @@ -73,7 +73,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } embeddedServer.subscribe(clientIdentity); - ctx.setAttachment(clientIdentity);// 设置状态数据 + // ctx.setAttachment(clientIdentity);// 设置状态数据 NettyUtils.ack(ctx.getChannel(), null); } else { NettyUtils.error(401, @@ -146,7 +146,19 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex size += com.google.protobuf.CodedOutputStream.computeTagSize(5) + com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize) + messageSize; - // TODO recyle bytes[] + // recyle bytes + // ByteBuffer byteBuffer = (ByteBuffer) + // ctx.getAttachment(); + // if (byteBuffer != null && size <= + // byteBuffer.capacity()) { + // byteBuffer.clear(); + // } else { + // byteBuffer = + // ByteBuffer.allocate(size).order(ByteOrder.BIG_ENDIAN); + // ctx.setAttachment(byteBuffer); + // } + // CodedOutputStream output = + // CodedOutputStream.newInstance(byteBuffer); byte[] body = new byte[size]; CodedOutputStream output = CodedOutputStream.newInstance(body); output.writeEnum(3, PacketType.MESSAGES.getNumber()); @@ -160,6 +172,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } output.checkNoSpaceLeft(); NettyUtils.write(ctx.getChannel(), body, null); + + // output.flush(); + // byteBuffer.flip(); + // NettyUtils.write(ctx.getChannel(), byteBuffer, + // null); } else { Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); packetBuilder.setType(PacketType.MESSAGES);