首先还是先看一下出站流水线的大致流程:
这个正好和前面入站流水线的流程相反。也比较好理解。
依旧使用EmbeddedChannel测试通道来测试数据。
/**
* 解码实例
*/
@Test
public void testJsonDecoder() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加3个handler处理器
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new JsonMsgHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 100; j++) {
String json = new JsonMsg(j).convertToJson();
ByteBuf inBoundBytes = byteBufferWithLengthHead(json);
channel.writeInbound(inBoundBytes);
}
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//服务器端业务处理器
public class JsonMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String json = (String) msg;
JsonMsg jsonMsg = JsonMsg.parseFromJson(json);
Logger.info("收到一个 Json 数据包 =》" + jsonMsg);
}
}
StringDecoder 和 LengthFieldBasedFrameDecoder
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
private final Charset charset;
public StringDecoder() {
this(Charset.defaultCharset());
}
public StringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
} else {
this.charset = charset;
}
}
// 解码方法
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(this.charset));
}
}
...
[main|JsonMsgHandler.channelRead] |> 收到一个 Json 数据包 =》JsonMsg(id=96, content=疯狂创客圈-Java高并发社群)
[main|JsonMsgHandler.channelRead] |> 收到一个 Json 数据包 =》JsonMsg(id=97, content=疯狂创客圈-Java高并发社群)
[main|JsonMsgHandler.channelRead] |> 收到一个 Json 数据包 =》JsonMsg(id=98, content=疯狂创客圈-Java高并发社群)
[main|JsonMsgHandler.channelRead] |> 收到一个 Json 数据包 =》JsonMsg(id=99, content=疯狂创客圈-Java高并发社群)