Skip to content

Commit

Permalink
fixed issue #726 , 单链接复用ByteBuffer数组
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jul 19, 2018
1 parent 5582a38 commit b19eaeb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
Expand All @@ -22,6 +26,20 @@ public class NettyUtils {
public static int HEADER_LENGTH = 4;
public static Timer hashedWheelTimer = new HashedWheelTimer();

public static void write(Channel channel, ByteBuffer body, ChannelFutureListener channelFutureListner) {
byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.limit()).array();
List<ChannelBuffer> components = new ArrayList<ChannelBuffer>(2);
components.add(ChannelBuffers.wrappedBuffer(ByteOrder.BIG_ENDIAN, header));
components.add(ChannelBuffers.wrappedBuffer(body));

if (channelFutureListner == null) {
Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components));
} else {
Channels.write(channel, new CompositeChannelBuffer(ByteOrder.BIG_ENDIAN, components))
.addListener(channelFutureListner);
}
}

public static void write(Channel channel, byte[] body, ChannelFutureListener channelFutureListner) {
byte[] header = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN).putInt(body.length).array();
if (channelFutureListner == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
}

embeddedServer.subscribe(clientIdentity);
ctx.setAttachment(clientIdentity);// 设置状态数据
// ctx.setAttachment(clientIdentity);// 设置状态数据
NettyUtils.ack(ctx.getChannel(), null);
} else {
NettyUtils.error(401,
Expand Down Expand Up @@ -146,7 +146,19 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
size += com.google.protobuf.CodedOutputStream.computeTagSize(5)
+ com.google.protobuf.CodedOutputStream.computeRawVarint32Size(messageSize)
+ messageSize;
// TODO recyle bytes[]
// recyle bytes
// ByteBuffer byteBuffer = (ByteBuffer)
// ctx.getAttachment();
// if (byteBuffer != null && size <=
// byteBuffer.capacity()) {
// byteBuffer.clear();
// } else {
// byteBuffer =
// ByteBuffer.allocate(size).order(ByteOrder.BIG_ENDIAN);
// ctx.setAttachment(byteBuffer);
// }
// CodedOutputStream output =
// CodedOutputStream.newInstance(byteBuffer);
byte[] body = new byte[size];
CodedOutputStream output = CodedOutputStream.newInstance(body);
output.writeEnum(3, PacketType.MESSAGES.getNumber());
Expand All @@ -160,6 +172,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
}
output.checkNoSpaceLeft();
NettyUtils.write(ctx.getChannel(), body, null);

// output.flush();
// byteBuffer.flip();
// NettyUtils.write(ctx.getChannel(), byteBuffer,
// null);
} else {
Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
packetBuilder.setType(PacketType.MESSAGES);
Expand Down

0 comments on commit b19eaeb

Please sign in to comment.