-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
CIMClientHandle.java
136 lines (103 loc) · 4.37 KB
/
CIMClientHandle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.crossoverjie.cim.client.handle;
import com.crossoverjie.cim.client.service.ShutDownMsg;
import com.crossoverjie.cim.client.thread.ReConnectJob;
import com.crossoverjie.cim.client.util.SpringBeanFactory;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Function:
*
* @author crossoverJie
* Date: 16/02/2018 18:09
* @since JDK 1.8
*/
@ChannelHandler.Sharable
public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProto.CIMResProtocol> {
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientHandle.class);
private MsgHandleCaller caller ;
private ThreadPoolExecutor threadPoolExecutor ;
private ScheduledExecutorService scheduledExecutorService ;
private ShutDownMsg shutDownMsg ;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
//LOGGER.info("定时检测服务端是否存活");
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
CIMRequestProto.CIMReqProtocol.class);
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
LOGGER.error("IO error,close Channel");
future.channel().close();
}
}) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端和服务端建立连接时调用
LOGGER.info("cim server connect success!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (shutDownMsg == null){
shutDownMsg = SpringBeanFactory.getBean(ShutDownMsg.class) ;
}
//用户主动退出,不执行重连逻辑
if (shutDownMsg.checkStatus()){
return;
}
if (scheduledExecutorService == null){
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
}
LOGGER.info("客户端断开了,重新连接!");
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
//心跳更新时间
if (msg.getType() == Constants.CommandType.PING){
//LOGGER.info("收到服务端心跳!!!");
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
}
if (msg.getType() != Constants.CommandType.PING) {
//回调消息
callBackMsg(msg.getResMsg());
LOGGER.info(msg.getResMsg());
}
}
/**
* 回调消息
* @param msg
*/
private void callBackMsg(String msg) {
threadPoolExecutor = SpringBeanFactory.getBean("callBackThreadPool",ThreadPoolExecutor.class) ;
threadPoolExecutor.execute(() -> {
caller = SpringBeanFactory.getBean(MsgHandleCaller.class) ;
caller.getMsgHandleListener().handle(msg);
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常时断开连接
cause.printStackTrace() ;
ctx.close() ;
}
}