Skip to content

Latest commit

 

History

History
284 lines (176 loc) · 8.86 KB

51.基础实战:protobuf入站报文的实现.md

File metadata and controls

284 lines (176 loc) · 8.86 KB

51 protobuf 入站报文的处理

Protobuf入站报文流水线的设计

首先数据进去到channel的时候,是经过protobuf协议序列化过的二进制数据,我们先经过半包处理,通过ProtobufVarint32FrameDecoder解析二进制数据包得到二进制数据。

然后通过profobuf解码器(ProtobufDecode Netty内置的解码器)把二进制数据转换成我们的POJO, 最后就是我们的业务处理器处理了我们的POJO。

Netty提供了一组解决Protobuf二进制报文半包问题的编码器和解码器

ProtobufVarint32LengthFieldPrepender长度编码器

这个编码器的作用是在ProtobufEncoder生成的字节数组之前前置一个varint32数字,表示序列化的二进制字节数量或者长度。

ProtobufVarint32FrameDecoder长度解码器

ProtobufVarint32FrameDecoder和ProtobufVarint32LengthFieldPrepender相互对应,其作用是根据数据包中长度域(varint32类型)中的长度值解码一个足额的字节数组,然后将字节数组交给下一站的解码器ProtobufDecoder。

什么是varint32类型的长度?Protobuf为什么不用int这种固定类型的长度?

varint32是一种紧凑的表示数字的方法,不是一种固定长度(如32位)的数字类型。varint32用一个或多个字节来表示一个数字,值越小,使用的字节数越少,值越大使用的字节数越多。varint32根据值的大小自动进行收缩,能够减少用于保存长度的字节数。也就是说,varint32用一个或多个字节来表示一个数字,int是固定长度的数字。varint32不是固定长度,所以为了更好地减少通信过程中的传输量,消息头中的长度尽量采用varint格式。

至此,Netty内置的Protobuf编码器和解码器已经初步介绍完,可以通过这两组编码器/解码器完成Head-Content (Length + Protobuf Data)协议的数据传输。但是,在更加复杂的传输应用场景下,Netty的内置编码器和解码器是不够用的。例如,在Head部分需要加上魔数字段进行安全验证或者需要对Protobuf字节内容进行加密和解密,或者在其他复杂的传输应用场景下,需要定制属于自己的Protobuf编码器和解码器。

varint32 和 int 类型最大的区别

varint32用一个或多个子节表示一个数字,而int是固定长度,4字节

varint32不是固定长度,所以为了更好的减少通信过程中的传输量,消息头中的长度尽量采用varint格式。

ProtobufDecoder解码器

ProtobufDecoder和ProtobufEncoder相互对应,只不过在使用的时候ProtobufDecoder解码器需要指定一个Protobuf POJO实例作为解码的参考原型(prototype),解码时会根据原型实例找到对应的Parser解析器,将二进制的字节解码为Protobuf POJO实例。

new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance())

在JavaNIO通信中,仅仅使用以上这组编码器和解码器,传输过程中会存在粘包/半包的问题。Netty也提供了配套的Head-Content类型的Protobuf编码器和解码器,在二进制码流之前加上二进制字节数组的长度。

代码演示

服务端代码:

public class ProtoBufServer {

	private final int serverPort;
	ServerBootstrap b = new ServerBootstrap();

	public ProtoBufServer(int port) {
		this.serverPort = port;
	}

	public void runServer() {
		//创建reactor 线程组
		EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

		try {
			//1 设置reactor 线程组
			b.group(bossLoopGroup, workerLoopGroup);
			//2 设置nio类型的channel
			b.channel(NioServerSocketChannel.class);
			//3 设置监听端口
			b.localAddress(serverPort);
			//4 设置通道的参数
			b.option(ChannelOption.SO_KEEPALIVE, true);
			b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
			b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

			//5 装配子通道流水线
			b.childHandler(new ChannelInitializer<SocketChannel>() {
				//有连接到达时会创建一个channel
				protected void initChannel(SocketChannel ch) throws Exception {
					// pipeline管理子通道channel中的Handler
					// 向子channel流水线添加3个handler处理器

					// protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器。
					// 有三种方式可以选择:
					// 使用netty提供ProtobufVarint32FrameDecoder
					// 继承netty提供的通用半包处理器 LengthFieldBasedFrameDecoder
					// 继承ByteToMessageDecoder类,自己处理半包

					// 半包的处理
					ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
					// 需要解码的目标类
					ch.pipeline().addLast(new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));
					ch.pipeline().addLast(new ProtobufBussinessHandler());
				}
			});
			// 6 开始绑定server
			// 通过调用sync同步方法阻塞直到绑定成功
			ChannelFuture channelFuture = b.bind().sync();
			Logger.info(" 服务器启动成功,监听端口: " +
					channelFuture.channel().localAddress());

			// 7 等待通道关闭的异步任务结束
			// 服务监听通道会一直等待通道关闭的异步任务结束
			ChannelFuture closeFuture = channelFuture.channel().closeFuture();
			closeFuture.sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 8 优雅关闭EventLoopGroup,
			// 释放掉所有资源包括创建的线程
			workerLoopGroup.shutdownGracefully();
			bossLoopGroup.shutdownGracefully();
		}

	}

	//服务器端业务处理器
	static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
			MsgProtos.Msg protoMsg = (MsgProtos.Msg) msg;
			//经过pipeline的各个decoder,到此Person类型已经可以断定
			Logger.info("收到一个 MsgProtos.Msg 数据包 =》");
			Logger.info("protoMsg.getId():=" + protoMsg.getId());
			Logger.info("protoMsg.getContent():=" + protoMsg.getContent());
		}
	}


	public static void main(String[] args) throws InterruptedException {
		int port = NettyDemoConfig.SOCKET_SERVER_PORT;
		new ProtoBufServer(port).runServer();
	}

}

客户端代码

public class ProtoBufSendClient {
	static String content = "疯狂创客圈:高性能学习社群!";

	private int serverPort;
	private String serverIp;
	Bootstrap b = new Bootstrap();

	public ProtoBufSendClient(String ip, int port) {
		this.serverPort = port;
		this.serverIp = ip;
	}

	public void runClient() {
		//创建reactor 线程组
		EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

		try {
			//1 设置reactor 线程组
			b.group(workerLoopGroup);
			//2 设置nio类型的channel
			b.channel(NioSocketChannel.class);
			//3 设置监听端口
			b.remoteAddress(serverIp, serverPort);
			//4 设置通道的参数
			b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

			//5 装配通道流水线
			b.handler(new ChannelInitializer<SocketChannel>() {
				//初始化客户端channel
				protected void initChannel(SocketChannel ch) throws Exception {
					// 客户端channel流水线添加2个handler处理器
					ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
					ch.pipeline().addLast(new ProtobufEncoder());
				}
			});
			ChannelFuture f = b.connect();
			f.addListener((ChannelFuture futureListener) ->
			{
				if (futureListener.isSuccess()) {
					Logger.info("EchoClient客户端连接成功!");

				} else {
					Logger.info("EchoClient客户端连接失败!");
				}

			});

			// 阻塞,直到连接完成
			f.sync();
			Channel channel = f.channel();

			//发送 Protobuf 对象
			for (int i = 0; i < 1000; i++) {
				MsgProtos.Msg user = build(i, i + "->" + content);
				channel.writeAndFlush(user);
				Logger.info("发送报文数:" + i);
			}
			channel.flush();


			// 7 等待通道关闭的异步任务结束
			// 服务监听通道会一直等待通道关闭的异步任务结束
			ChannelFuture closeFuture = channel.closeFuture();
			closeFuture.sync();

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 优雅关闭EventLoopGroup,
			// 释放掉所有资源包括创建的线程
			workerLoopGroup.shutdownGracefully();
		}

	}

	//构建ProtoBuf对象
	public MsgProtos.Msg build(int id, String content) {
		MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder();
		builder.setId(id);
		builder.setContent(content);
		return builder.build();
	}

	public static void main(String[] args) throws InterruptedException {
		int port = NettyDemoConfig.SOCKET_SERVER_PORT;
		String ip = NettyDemoConfig.SOCKET_SERVER_IP;
		new ProtoBufSendClient(ip, port).runClient();
	}
}