/
FayeClient+Subscriptions.swift
118 lines (88 loc) · 3.31 KB
/
FayeClient+Subscriptions.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//
// FayeClient+Subscriptions.swift
// Pods
//
// Created by Shams Ahmed on 19/07/2016.
//
//
import Foundation
import SwiftyJSON
// MARK: Private Internal methods
extension FayeClient {
func subscribeQueuedSubscriptions() {
// if there are any outstanding open subscriptions resubscribe
for channel in self.queuedSubscriptions {
_ = removeChannelFromQueuedSubscriptions(channel.subscription)
_ = subscribeToChannel(channel)
}
}
func resubscribeToPendingSubscriptions() {
if !pendingSubscriptions.isEmpty {
print("Faye: Resubscribing to \(pendingSubscriptions.count) pending subscriptions")
for channel in pendingSubscriptions {
_ = removeChannelFromPendingSubscriptions(channel.subscription)
_ = subscribeToChannel(channel)
}
}
}
func unsubscribeAllSubscriptions() {
let all = queuedSubscriptions + openSubscriptions + pendingSubscriptions
all.forEach({ unsubscribeFromChannel($0.subscription) })
}
// MARK:
// MARK: Send/Receive
func send(_ message: NSDictionary) {
writeOperationQueue.async { [unowned self] in
if let string = JSON(message).rawString() {
self.transport?.writeString(string)
}
}
}
func receive(_ message: String) {
readOperationQueue.sync { [unowned self] in
if let jsonData = message.data(using: String.Encoding.utf8, allowLossyConversion: false) {
let json = JSON(data: jsonData)
self.parseFayeMessage(json)
}
}
}
func nextMessageId() -> String {
self.messageNumber += 1
if self.messageNumber >= UINT32_MAX {
messageNumber = 0
}
return "\(self.messageNumber)".encodedString()
}
// MARK:
// MARK: Subscriptions
func removeChannelFromQueuedSubscriptions(_ channel: String) -> Bool {
objc_sync_enter(self.queuedSubscriptions)
defer { objc_sync_exit(self.queuedSubscriptions) }
let index = self.queuedSubscriptions.index { $0.subscription == channel }
if let index = index {
self.queuedSubscriptions.remove(at: index)
return true
}
return false
}
func removeChannelFromPendingSubscriptions(_ channel: String) -> Bool {
objc_sync_enter(self.pendingSubscriptions)
defer { objc_sync_exit(self.pendingSubscriptions) }
let index = self.pendingSubscriptions.index { $0.subscription == channel }
if let index = index {
self.pendingSubscriptions.remove(at: index)
return true
}
return false
}
func removeChannelFromOpenSubscriptions(_ channel: String) -> Bool {
objc_sync_enter(self.pendingSubscriptions)
defer { objc_sync_exit(self.pendingSubscriptions) }
let index = self.openSubscriptions.index { $0.subscription == channel }
if let index = index {
self.openSubscriptions.remove(at: index)
return true
}
return false
}
}