-
Notifications
You must be signed in to change notification settings - Fork 407
/
CocoaMQTTDeliver.swift
298 lines (235 loc) · 9.05 KB
/
CocoaMQTTDeliver.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
//
// CocoaMQTTDeliver.swift
// CocoaMQTT
//
// Created by HJianBo on 2019/5/2.
// Copyright © 2019 emqx.io. All rights reserved.
//
import Foundation
import Dispatch
protocol CocoaMQTTDeliverProtocol: class {
var delegateQueue: DispatchQueue { get set }
func deliver(_ deliver: CocoaMQTTDeliver, wantToSend frame: Frame)
}
private struct InflightFrame {
/// The infligth frame maybe a `FramePublish` or `FramePubRel`
var frame: Frame
var timestamp: TimeInterval
init(frame: Frame) {
self.init(frame: frame, timestamp: Date.init(timeIntervalSinceNow: 0).timeIntervalSince1970)
}
init(frame: Frame, timestamp: TimeInterval) {
self.frame = frame
self.timestamp = timestamp
}
}
extension Array where Element == InflightFrame {
func filterMap(isIncluded: (Element) -> (Bool, Element)) -> [Element] {
var tmp = [Element]()
for e in self {
let res = isIncluded(e)
if res.0 {
tmp.append(res.1)
}
}
return tmp
}
}
// CocoaMQTTDeliver
class CocoaMQTTDeliver: NSObject {
/// The dispatch queue is used by delivering frames in serially
private var deliverQueue = DispatchQueue.init(label: "deliver.cocoamqtt.emqx", qos: .default)
weak var delegate: CocoaMQTTDeliverProtocol?
fileprivate var inflight = [InflightFrame]()
fileprivate var mqueue = [Frame]()
var mqueueSize: UInt = 1000
var inflightWindowSize: UInt = 10
/// Retry time interval millisecond
var retryTimeInterval: Double = 5000
private var awaitingTimer: CocoaMQTTTimer?
var isQueueEmpty: Bool { get { return mqueue.count == 0 }}
var isQueueFull: Bool { get { return mqueue.count >= mqueueSize }}
var isInflightFull: Bool { get { return inflight.count >= inflightWindowSize }}
var isInflightEmpty: Bool { get { return inflight.count == 0 }}
var storage: CocoaMQTTStorage?
func recoverSessionBy(_ storage: CocoaMQTTStorage) {
let frames = storage.takeAll()
guard frames.count >= 0 else {
return
}
// Sync to push the frame to mqueue for avoiding overcommit
deliverQueue.sync {
for f in frames {
mqueue.append(f)
}
self.storage = storage
printInfo("Deliver recvoer \(frames.count) msgs")
printDebug("Recover message \(frames)")
}
deliverQueue.async { [weak self] in
guard let wself = self else { return }
wself.tryTransport()
}
}
/// Add a FramePublish to the message queue to wait for sending
///
/// return false means the frame is rejected because of the buffer is full
func add(_ frame: FramePublish) -> Bool {
guard !isQueueFull else {
printError("Sending buffer is full, frame \(frame) has been rejected to add.")
return false
}
// Sync to push the frame to mqueue for avoiding overcommit
deliverQueue.sync {
mqueue.append(frame)
_ = storage?.write(frame)
}
deliverQueue.async { [weak self] in
guard let wself = self else { return }
wself.tryTransport()
}
return true
}
/// Acknowledge a PUBLISH/PUBREL by msgid
func ack(by frame: Frame) {
var msgid: UInt16
if let puback = frame as? FramePubAck { msgid = puback.msgid }
else if let pubrec = frame as? FramePubRec { msgid = pubrec.msgid }
else if let pubcom = frame as? FramePubComp { msgid = pubcom.msgid }
else { return }
deliverQueue.async { [weak self] in
guard let wself = self else { return }
let acked = wself.ackInflightFrame(withMsgid: msgid, type: frame.type)
if acked.count == 0 {
printWarning("Acknowledge by \(frame), but not found in inflight window")
} else {
// TODO: ACK DONT DELETE PUBREL
for f in acked {
if frame is FramePubAck || frame is FramePubComp {
wself.storage?.remove(f)
}
}
printDebug("Acknowledge frame id \(msgid) success, acked: \(acked)")
wself.tryTransport()
}
}
}
/// Clean Inflight content to prevent message blocked, when next connection established
///
/// !!Warning: it's a temporary method for hotfix #221
func cleanAll() {
deliverQueue.sync { [weak self] in
guard let wself = self else { return }
_ = wself.mqueue.removeAll()
_ = wself.inflight.removeAll()
}
}
}
// MARK: Private Funcs
extension CocoaMQTTDeliver {
// try transport a frame from mqueue to inflight
private func tryTransport() {
if isQueueEmpty || isInflightFull { return }
// take out the earliest frame
if mqueue.isEmpty { return }
let frame = mqueue.remove(at: 0)
deliver(frame)
// keep trying after a transport
self.tryTransport()
}
/// Try to deliver a frame
private func deliver(_ frame: Frame) {
if frame.qos == .qos0 {
// Send Qos0 message, whatever the in-flight queue is full
// TODO: A retrict deliver mode is need?
sendfun(frame)
} else {
sendfun(frame)
inflight.append(InflightFrame(frame: frame))
// Start a retry timer for resending it if it not receive PUBACK or PUBREC
if awaitingTimer == nil {
awaitingTimer = CocoaMQTTTimer.every(retryTimeInterval / 1000.0, name: "awaitingTimer") { [weak self] in
guard let wself = self else { return }
wself.deliverQueue.async {
wself.redeliver()
}
}
}
}
}
/// Attemp to redliver in-flight messages
private func redeliver() {
if isInflightEmpty {
// Revoke the awaiting timer
awaitingTimer = nil
return
}
let nowTimestamp = Date(timeIntervalSinceNow: 0).timeIntervalSince1970
for (idx, frame) in inflight.enumerated() {
if (nowTimestamp - frame.timestamp) >= (retryTimeInterval/1000.0) {
var duplicatedFrame = frame
duplicatedFrame.frame.dup = true
duplicatedFrame.timestamp = nowTimestamp
inflight[idx] = duplicatedFrame
printInfo("Re-delivery frame \(duplicatedFrame.frame)")
sendfun(duplicatedFrame.frame)
}
}
}
@discardableResult
private func ackInflightFrame(withMsgid msgid: UInt16, type: FrameType) -> [Frame] {
var ackedFrames = [Frame]()
inflight = inflight.filterMap { frame in
// -- ACK for PUBLISH
if let publish = frame.frame as? FramePublish,
publish.msgid == msgid {
if publish.qos == .qos2 && type == .pubrec { // -- Replace PUBLISH with PUBREL
let pubrel = FramePubRel(msgid: publish.msgid)
var nframe = frame
nframe.frame = pubrel
nframe.timestamp = Date(timeIntervalSinceNow: 0).timeIntervalSince1970
_ = storage?.write(pubrel)
sendfun(pubrel)
ackedFrames.append(publish)
return (true, nframe)
} else if publish.qos == .qos1 && type == .puback {
ackedFrames.append(publish)
return (false, frame)
}
}
// -- ACK for PUBREL
if let pubrel = frame.frame as? FramePubRel,
pubrel.msgid == msgid && type == .pubcomp {
ackedFrames.append(pubrel)
return (false, frame)
}
return (true, frame)
}
return ackedFrames
}
private func sendfun(_ frame: Frame) {
guard let delegate = self.delegate else {
printError("The deliver delegate is nil!!! the frame will be drop: \(frame)")
return
}
if frame.qos == .qos0 {
if let p = frame as? FramePublish { storage?.remove(p) }
}
delegate.delegateQueue.async {
delegate.deliver(self, wantToSend: frame)
}
}
}
// For tests
extension CocoaMQTTDeliver {
func t_inflightFrames() -> [Frame] {
var frames = [Frame]()
for f in inflight {
frames.append(f.frame)
}
return frames
}
func t_queuedFrames() -> [Frame] {
return mqueue
}
}