-
Notifications
You must be signed in to change notification settings - Fork 11
/
websocket.dart
301 lines (270 loc) · 9.53 KB
/
websocket.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import 'dart:async';
import 'dart:convert';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:get/get.dart';
import 'package:imboy/component/helper/func.dart';
import 'package:imboy/component/helper/jwt.dart';
import 'package:imboy/store/provider/user_provider.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:imboy/config/const.dart';
import 'package:imboy/component/helper/datetime.dart';
import 'package:imboy/component/http/http_client.dart';
import 'package:imboy/config/init.dart';
import 'package:imboy/page/passport/passport_view.dart';
import 'package:imboy/service/storage.dart';
import 'package:imboy/store/repository/user_repo_local.dart';
/// WebSocket状态
enum SocketStatus {
SocketStatusConnected, // 已连接
SocketStatusFailed, // 失败
SocketStatusClosed, // 连接关闭
}
class WebSocketService {
WebSocketService._();
static WebSocketService get to {
_instance ??= WebSocketService._();
if (_instance != null) {
_instance!._init();
}
return _instance!;
}
// wsConnectLock 防止token过期的时候产生多个WS链接
bool wsConnectLock = false;
static WebSocketService? _instance;
Iterable<String> protocols = ['text', 'sip'];
// String pingMsg = 'ping';
IOWebSocketChannel? _webSocketChannel; // WebSocket
SocketStatus? _socketStatus; // socket状态
// Timer? _heartBeat; // 心跳定时器 使用 IOWebSocketChannel 的心跳机制
// _heartTimes 必须比 服务端 idle_timeout 小一些
// 服务端设置为128秒,这个设置为120秒,不要超过128秒
final int _heartTimes = 120000; // 心跳间隔(毫秒)
final int _reconnectMax = 10; // 重连次数,默认10次
int _reconnectTimes = 0; // 重连计数器
Timer? _reconnectTimer; // 重连定时器
// 这个waitMsg用于判断是否收到服务端回的心跳信息
// bool waitMsg = false;
// Timer? _timerWait; //
late Function onOpen; // 连接开启回调
late Function onMessage; // 接收消息回调重连定时器
late Function onError; // 连接错误回调
bool get isConnected =>
_webSocketChannel != null && _webSocketChannel?.sink != null;
int lastConnectedAt = 0;
/// 在
void _init() {
if (_socketStatus != SocketStatus.SocketStatusConnected) {
_initWebSocket(onOpen: () {
// initHeartBeat();
}, onMessage: (event) {
// // 收到消息时标志置为true
// waitMsg = true;
// // _timerWait?
// _timerWait?.cancel();
// _timerWait = null;
iPrint("initWebSocket_onMessage $event");
// change(data);
if (event == "pong" || event == "pong2") {
return;
}
Map data = event is Map ? event : json.decode(event);
eventBus.fire(data);
}, onError: (e) {
iPrint(
"> ws onError ${e.runtimeType} | ${e.toString()};"); // 延时1s执行 _reconnect
Future.delayed(const Duration(milliseconds: 1000), () {
iPrint('> ws onError _reconnectTimes: $_reconnectTimes');
_reconnect();
});
});
}
}
/// 初始化WebSocket
/// 这个在main.dart中调用一次就行了
void _initWebSocket({
required Function onOpen,
required Function onMessage,
required Function onError,
}) {
_reconnectTimes = 0;
_reconnectTimer?.cancel();
_reconnectTimer = null;
this.onOpen = onOpen;
this.onMessage = onMessage;
this.onError = onError;
openSocket();
}
/// 开启WebSocket连接
Future<void> openSocket({bool fromReconnect = false}) async {
var connectivityResult = await (Connectivity().checkConnectivity());
if (connectivityResult == ConnectivityResult.none) {
iPrint('> ws openSocket 网络连接异常ws');
return;
}
if (UserRepoLocal.to.isLogin == false) {
iPrint('> ws openSocket is not login');
return;
}
// 链接状态正常,不需要任何处理
if (isConnected) {
iPrint('> ws openSocket _socketStatus: $_socketStatus;');
return;
}
if (wsConnectLock) {
return;
}
wsConnectLock = true;
try {
Map<String, dynamic> headers = await defaultHeaders();
String tk = UserRepoLocal.to.accessToken;
if (tokenExpired(tk) == false) {
tk = await (UserProvider()).refreshAccessTokenApi(
UserRepoLocal.to.refreshToken,
checkNewToken: false);
}
headers[Keys.tokenKey] = tk;
_webSocketChannel = IOWebSocketChannel.connect(
WS_URL,
headers: headers,
pingInterval: Duration(milliseconds: _heartTimes),
protocols: protocols,
);
// _webSocketChannel.innerWebSocket;
// 连接成功,返回WebSocket实例
_socketStatus = SocketStatus.SocketStatusConnected;
// 连接成功,重置重连计数器
_reconnectTimer?.cancel();
_reconnectTimer = null;
if (fromReconnect == false) {
_reconnectTimes = 0;
}
iPrint('> ws openSocket onOpen');
// onOpen onMessage onError onClose
onOpen();
// 接收消息
_webSocketChannel!.stream.listen(
//监听服务器消息 onMessage
(data) => webSocketOnMessage(data),
//连接错误时调用 onError
onError: _webSocketOnError,
//关闭时调用 onClose
onDone: _webSocketOnDone,
//设置错误时取消订阅
cancelOnError: true,
);
lastConnectedAt = DateTimeHelper.utc();
} catch (e) {
closeSocket();
_socketStatus = SocketStatus.SocketStatusFailed;
iPrint("> openSocket $WS_URL error ${e.toString()}");
} finally {
wsConnectLock = false;
}
}
/// WebSocket接收消息回调
webSocketOnMessage(data) {
// iPrint("> ws webSocketOnMessage $data ;");
onMessage(data);
}
/// WebSocket关闭连接回调
_webSocketOnDone() async {
// https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent
// closeCode 1000 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
iPrint('> ws _webSocketOnDone');
if (_webSocketChannel != null && _webSocketChannel!.closeCode != null) {
iPrint(
'> ws _webSocketOnDone closeCode: ${_webSocketChannel!.closeCode} ${DateTime.now()}');
iPrint(
'> ws _webSocketOnDone closeReason: ${_webSocketChannel!.closeReason.toString()}');
// 1000 CLOSE_NORMAL 正常关闭;无论为何目的而创建,该链接都已成功完成任务
// 1001 CLOSE_GOING_AWAY 终端离开,可能因为服务端错误,也可能因为浏览器正从打开连接的页面跳转离开
// 1002 CLOSE_PROTOCOL_ERROR 由于协议错误而中断连接。
// 1003 CLOSE_UNSUPPORTED 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
// 1005 CLOSE_NO_STATUS 保留。 表示没有收到预期的状态码。
// 1007 Unsupported Data 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
// 1009 CLOSE_TOO_LARGE 由于收到过大的数据帧而断开连接。
// 4000–4999 可以由应用使用。
// 4006 通知客户端刷新token消息没有得到确认,系统主动关闭连接
int closeCode = _webSocketChannel?.closeCode ?? 0;
switch (closeCode) {
case 4006:
closeSocket(exit: true);
await StorageService.to.remove(Keys.tokenKey);
Get.offAll(() => PassportPage());
break;
default:
closeSocket();
Future.delayed(const Duration(milliseconds: 1000), () {
iPrint('_webSocketOnDone _reconnectTimes: $_reconnectTimes');
_reconnect();
});
}
}
}
/// WebSocket连接错误回调
_webSocketOnError(e) {
iPrint('> ws _webSocketOnError ${_webSocketOnError.toString()}');
iPrint('> ws _webSocketOnError ${e.toString()}');
WebSocketChannelException ex = e;
_socketStatus = SocketStatus.SocketStatusFailed;
onError(ex.message);
closeSocket();
}
void destroyReconnectTimer() {
_reconnectTimes = 0;
_reconnectTimer?.cancel();
_reconnectTimer = null;
}
/// 关闭WebSocket
void closeSocket({bool exit = false}) {
iPrint('> ws closeSocket ${DateTime.now()}');
// destroyHeartBeat();
destroyReconnectTimer();
iPrint('> ws WebSocket连接关闭 $WS_URL');
_webSocketChannel?.sink.close();
_webSocketChannel = null;
_socketStatus = SocketStatus.SocketStatusClosed;
if (exit) {
_instance = null;
}
}
/// 发送WebSocket消息
Future<bool> sendMessage(String msg) async {
bool result = false;
if (isConnected == false) {
await WebSocketService.to.openSocket();
}
// iPrint('> ws sendMsg $msg');
try {
result = _send(msg);
} catch (e) {
await WebSocketService.to.openSocket();
result = _send(msg);
}
return result;
}
bool _send(String msg) {
_webSocketChannel!.sink.add(msg);
return true;
}
/// 重连机制
void _reconnect() {
iPrint(
'> ws _reconnect _reconnectTimes $_reconnectTimes < $_reconnectMax ${DateTime.now()}');
if (_reconnectTimes < _reconnectMax) {
WebSocketService.to.openSocket(fromReconnect: true);
_reconnectTimes += 1;
_reconnectTimer?.cancel();
_reconnectTimer = Timer.periodic(
Duration(milliseconds: _heartTimes),
(timer) {
WebSocketService.to.openSocket(fromReconnect: true);
},
);
} else {
closeSocket();
return;
}
}
}