这一部分不是很好理解,算是比较难的部分,为此呢,我特意画了一个逻辑流程图辅助理解。如下面所示👇
在上一个章节中,我们设计了一个类,WorkerRouter。每一个Netty节点都会对应一个WorkerRouter。
它的功能在上一节中已经提到一部分。
第一,它通过订阅Zookeeper指定路径下的字节点的更新,通过监听器,监听Netty节点的状态,当ZK有新的子节点创建时,意味着Netty集群中加入了新的节点。
同样,当节点删除的时候,意味着Netty节点离开集群,从而,WorkerRouter实现了服务发现的功能。
第二,处理服务发现,WorkerRouter还具备路由器的功能。当客户端消息要发送的Session不再当前节点时,需要转发器(本质是模拟一个Netty客户端)转发到远程节点上面。
这里,key是Netty节点的id, value对应的是,一个转发器。
private ConcurrentHashMap<Long, PeerSender> workerMap = new ConcurrentHashMap<>();
之前提到,当有Netty节点加入集群中的时候,会向workerMap中,存储数据:
private void doAfterAdd(ImNode n, PeerSender relaySender) {
if (null != relaySender) {
//关闭老的连接
relaySender.stopConnecting();
}
//创建一个消息转发器
relaySender = new PeerSender(n);
//建立转发的连接
relaySender.doConnect();
workerMap.put(n.getId(), relaySender);
}
@Data
@Builder
@AllArgsConstructor
public class RemoteSession implements ServerSession, Serializable {
private static final long serialVersionUID = -400010884211394846L;
SessionCache cache;
private boolean valid = true;
public RemoteSession(SessionCache cache) {
this.cache = cache;
}
/**
* 通过远程节点,转发
*/
@Override
public void writeAndFlush(Object pkg) {
ImNode imNode = cache.getImNode();
long nodeId = imNode.getId();
//获取转发的 sender
PeerSender sender = WorkerRouter.getInst().route(nodeId);
if (null != sender) {
sender.writeAndFlush(pkg);
}
}
@Override
public String getSessionId() {
//委托
return cache.getSessionId();
}
@Override
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String getUserId() {
//委托
return cache.getUserId();
}
}
路由如何体现?
PeerSender sender = WorkerRouter.getInst().route(nodeId);
这里,其实PeerSender其实是,一个客户端的程序,链接的远程Netty节点就是RemoteSession中存储的远程节点信息。
@Slf4j
@Data
public class PeerSender {
private int reConnectCount = 0;
private Channel channel;
private ImNode imNode;
/**
* 唯一标记
*/
private boolean connectFlag = false;
private UserDTO user;
GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) -> {
log.info("分布式连接已经断开……{}", imNode.toString());
channel = null;
connectFlag = false;
};
private GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess() && ++reConnectCount < 3) {
log.info("连接失败! 在10s之后准备尝试第{}次重连!", reConnectCount);
eventLoop.schedule(PeerSender.this::doConnect, 10, TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info(new Date() + "分布式节点连接成功:{}", imNode.toString());
channel = f.channel();
channel.closeFuture().addListener(closeListener);
// 发送链接成功的通知
Notification<ImNode> notification = new Notification<>(ImWorker.getInst().getLocalNodeInfo());
notification.setType(Notification.CONNECT_FINISHED);
String json = JsonUtil.pojoToJson(notification);
ProtoMsg.Message pkg = NotificationMsgBuilder.buildNotification(json);
writeAndFlush(pkg);
}
};
private Bootstrap bootstrap;
private EventLoopGroup g;
public PeerSender(ImNode n) {
this.imNode = n;
// 客户端的是Bootstrap,服务端的则是 ServerBootstrap。都是AbstractBootstrap的子类。
bootstrap = new Bootstrap();
// 通过nio方式来接收连接和处理连接
g = new NioEventLoopGroup();
}
/**
* 重连
*/
public void doConnect() {
// 服务器ip地址
String host = imNode.getHost();
// 服务器端口
int port = imNode.getPort();
try {
if (bootstrap != null && bootstrap.group() == null) {
bootstrap.group(g);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.remoteAddress(host, port);
// 设置通道初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("decoder", new ProtobufDecoder());
ch.pipeline().addLast("encoder", new ProtobufEncoder());
ch.pipeline().addLast("imNodeHeartBeatClientHandler", new ImNodeHeartBeatClientHandler());
ch.pipeline().addLast("exceptionHandler", new ImNodeExceptionHandler());
}
}
);
log.info(new Date() + "开始连接分布式节点:{}", imNode.toString());
ChannelFuture f = bootstrap.connect();
f.addListener(connectedListener);
// 阻塞
// f.channel().closeFuture().sync();
} else if (bootstrap.group() != null) {
log.info(new Date() + "再一次开始连接分布式节点 : {}", imNode.toString());
ChannelFuture f = bootstrap.connect();
f.addListener(connectedListener);
}
} catch (Exception e) {
log.info("客户端连接失败!" + e.getMessage());
}
}
public void stopConnecting() {
g.shutdownGracefully();
connectFlag = false;
}
public void writeAndFlush(Object pkg) {
if (!connectFlag) {
log.error("分布式节点未连接: {}", imNode.toString());
return;
}
channel.writeAndFlush(pkg);
}
}
WorkerRouter路由器有一个 workerMap,用于封装和保存所有在线的节点数据。
这里需要注意的是,后加入集群的节点,也需要保存之前已经加入集群的节点数据, 其实就是Zk指定路径下的所有字节点(不算自己)
当有消息进行转发的时候,根据远程Netty节点的id,在workerMap 中找出PeerSender,进行消息的转发。
只有涉及到 远程会话 的场景才会使用到WorkerRouter。 本地会话是不需要路由逻辑的,直接由当前Netty节点进行消息的发送。