-
Notifications
You must be signed in to change notification settings - Fork 8
/
Drone.swift
135 lines (119 loc) · 4.92 KB
/
Drone.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//
// Drone.swift
// Pioneer
//
// Created by d-exclaimation on 4:55 PM.
// Copyright © 2021 d-exclaimation. All rights reserved.
//
import Foundation
import Desolate
import Vapor
import GraphQL
import Graphiti
extension Pioneer {
/// Drone acting as concurrent safe actor for each client managing operations and subscriptions
actor Drone: AbstractDesolate {
typealias GraphQLNozzle = Nozzle<Future<GraphQLResult>>
private let process: Process
private let schema: Schema<Resolver, Context>
private let resolver: Resolver
private let proto: SubProtocol.Type
init(_ process: Process, schema: Schema<Resolver, Context>, resolver: Resolver, proto: SubProtocol.Type) {
self.schema = schema
self.resolver = resolver
self.proto = proto
self.process = process
}
enum Act {
case start(oid: String, gql: GraphQLRequest)
case stop(oid: String)
case ended(oid: String)
case output(oid: String, GraphQLMessage)
case acid
}
// Mark: -- States --
var status: Signal = .running
var nozzles: [String: GraphQLNozzle] = [:]
func onMessage(msg: Act) async -> Signal {
switch msg {
// Start subscriptions, setup pipe pattern, and callbacks
case .start(oid: let oid, gql: let gql):
// Guards for getting all the required subscriptions stream
guard let subscriptionResult = await subscription(gql: gql) else {
process.send(GraphQLMessage.errors(id: oid, type: proto.next, [.init(message: "Internal server error")]).jsonString)
break
}
guard let subscription = subscriptionResult.stream else {
let res = GraphQLResult(errors: subscriptionResult.errors)
process.send(GraphQLMessage.from(type: proto.next, id: oid, res).jsonString)
process.send(GraphQLMessage(id: oid, type: proto.complete).jsonString)
break
}
guard let nozzle = subscription.nozzle() else {
print("Cannot get nozzle")
break
}
let next = proto.next
nozzles.update(oid, with: nozzle)
// Transform nozzle into flow
let flow: AsyncCompactMapSequence<GraphQLNozzle, Act> =
nozzle.compactMap { (future: Future<GraphQLResult>) async -> Act? in
guard let res = try? await future.get() else { return nil }
return .output(oid: oid, .from(type: next, id: oid, res))
}
// Pipe all messages into the Actor itself
flow.pipe(to: oneself,
onComplete: {
.ended(oid: oid)
},
onFailure: { _ in
.ended(oid: oid)
}
)
// Stop subscription, shutdown nozzle and remove it so preventing overflow of any messages
case .stop(oid: let oid):
guard let nozzle = nozzles[oid] else { break }
nozzles.delete(oid)
nozzle.shutdown()
// Send an ending message
// but prevent completion message if nozzle doesn't exist
// e.g: - Shutdown-ed operation
case .ended(oid: let oid):
guard nozzles.has(oid) else { break }
let message = GraphQLMessage(id: oid, type: proto.complete)
process.send(message.jsonString)
// Push message to websocket connection
// but prevent completion message if nozzle doesn't exist
// e.g: - Shutdown-ed operation
case .output(oid: let oid, let message):
guard nozzles.has(oid) else { break }
process.send(message.jsonString)
// Kill actor
case .acid:
nozzles.values.forEach { $0.shutdown() }
nozzles = [:]
return .stopped
}
return .running
}
private func subscription(gql: GraphQLRequest) async -> SubscriptionResult? {
try? await schema
.subscribe(
request: gql.query,
resolver: resolver,
context: process.ctx,
eventLoopGroup: process.req.eventLoop,
variables: gql.variables ?? [:],
operationName: gql.operationName
)
.get()
}
deinit {
nozzles.forEach { (oid, nozzle) in
let message = GraphQLMessage(id: oid, type: proto.complete)
process.send(message.jsonString)
nozzle.shutdown()
}
}
}
}