-
Notifications
You must be signed in to change notification settings - Fork 141
/
Copy pathServerInvocationHandler.swift
181 lines (162 loc) · 6.88 KB
/
ServerInvocationHandler.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
//
// InvocationHandler.swift
// SignalRClient
//
// Created by Pawel Kadluczka on 2/4/18.
// Copyright © 2018 Pawel Kadluczka. All rights reserved.
//
import Foundation
internal protocol ServerInvocationHandler {
var method: String { get }
var arguments: [Encodable] { get }
func createInvocationMessage(invocationId: String) -> HubMessage
func startStreams()
func processStreamItem(streamItemMessage: StreamItemMessage) -> Error?
func processCompletion(completionMessage: CompletionMessage)
func raiseError(error: Error)
}
internal class InvocationHandler<T: Decodable>: ServerInvocationHandler {
private let logger: Logger
public private(set) var method: String
public private(set) var arguments: [Encodable]
private let clientStreamWorkers: [ClientStreamWorker]
private let invocationDidComplete: (T?, Error?) -> Void
init(
logger: Logger, callbackQueue: DispatchQueue, method: String, arguments: [Encodable],
clientStreamWorkers: [ClientStreamWorker],
invocationDidComplete: @escaping (T?, Error?) -> Void
) {
self.logger = logger
self.method = method
self.arguments = arguments
self.clientStreamWorkers = clientStreamWorkers
self.invocationDidComplete = { result, error in
callbackQueue.async {
clientStreamWorkers.forEach { $0.stop() }
invocationDidComplete(result, error)
}
}
}
func createInvocationMessage(invocationId: String) -> HubMessage {
let streamIds = clientStreamWorkers.map { $0.streamId }
logger.log(
logLevel: .debug,
message:
"Creating invocation message for method: '\(method)', invocationId: \(invocationId), streamIds: \(streamIds)"
)
return ServerInvocationMessage(
invocationId: invocationId, target: method, arguments: arguments, streamIds: streamIds)
}
func startStreams() {
clientStreamWorkers.forEach { $0.start() }
}
func processStreamItem(streamItemMessage: StreamItemMessage) -> Error? {
logger.log(logLevel: .error, message: "Stream item message received for non-streaming server side method")
return SignalRError.protocolViolation(
underlyingError: SignalRError.invalidOperation(
message: "Stream item message received for non-streaming server side method"))
}
func processCompletion(completionMessage: CompletionMessage) {
let invocationId = completionMessage.invocationId
logger.log(logLevel: .debug, message: "Processing completion of method with invocationId: \(invocationId)")
if let hubInvocationError = completionMessage.error {
logger.log(
logLevel: .error,
message: "Server method with invocationId \(invocationId) failed with: \(hubInvocationError)")
invocationDidComplete(nil, SignalRError.hubInvocationError(message: hubInvocationError))
return
}
if !completionMessage.hasResult {
logger.log(
logLevel: .debug, message: "Void server method with invocationId \(invocationId) completed successfully"
)
invocationDidComplete(nil, nil)
return
}
do {
logger.log(
logLevel: .debug, message: "Server method with invocationId \(invocationId) completed successfully")
let result = try completionMessage.getResult(T.self)
invocationDidComplete(result, nil)
} catch {
logger.log(
logLevel: .error,
message: "Error while getting result for server method with invocationId \(invocationId)")
invocationDidComplete(nil, error)
}
}
func raiseError(error: Error) {
invocationDidComplete(nil, error)
}
}
internal class StreamInvocationHandler<T: Decodable>: ServerInvocationHandler {
private let logger: Logger
public private(set) var method: String
public private(set) var arguments: [Encodable]
private let clientStreamWorkers: [ClientStreamWorker]
private let streamItemReceived: (T) -> Void
private let invocationDidComplete: (Error?) -> Void
init(
logger: Logger, callbackQueue: DispatchQueue, method: String, arguments: [Encodable],
clientStreamWorkers: [ClientStreamWorker],
streamItemReceived: @escaping (T) -> Void, invocationDidComplete: @escaping (Error?) -> Void
) {
self.logger = logger
self.method = method
self.arguments = arguments
self.clientStreamWorkers = clientStreamWorkers
self.streamItemReceived = { item in callbackQueue.async { streamItemReceived(item) } }
self.invocationDidComplete = { error in
callbackQueue.async {
clientStreamWorkers.forEach { $0.stop() }
invocationDidComplete(error)
}
}
}
func createInvocationMessage(invocationId: String) -> HubMessage {
let streamIds = clientStreamWorkers.map { $0.streamId }
logger.log(
logLevel: .debug,
message:
"Creating invocation message for streaming method: '\(method)', invocationId: \(invocationId), streamIds: \(streamIds)"
)
return StreamInvocationMessage(
invocationId: invocationId, target: method, arguments: arguments, streamIds: streamIds)
}
func startStreams() {
clientStreamWorkers.forEach { $0.start() }
}
func processStreamItem(streamItemMessage: StreamItemMessage) -> Error? {
let invocationId = streamItemMessage.invocationId
logger.log(
logLevel: .debug,
message: "Received stream item message for streaming method with invocationId: '\(invocationId)'")
do {
let value = try streamItemMessage.getItem(T.self)
streamItemReceived(value)
return nil
} catch {
logger.log(
logLevel: .error,
message: "Error while getting stream item value for method with invocationId: '\(invocationId)'")
return error
}
}
func processCompletion(completionMessage: CompletionMessage) {
let invocationId = completionMessage.invocationId
if let invocationError = completionMessage.error {
logger.log(
logLevel: .error,
message: "Streaming server method with invocationId \(invocationId) failed with: \(invocationError)")
invocationDidComplete(SignalRError.hubInvocationError(message: invocationError))
} else {
logger.log(
logLevel: .debug,
message: "Streaming server method with invocationId \(invocationId) completed successfully")
invocationDidComplete(nil)
}
}
func raiseError(error: Error) {
invocationDidComplete(error)
}
}