/
AsyncSequence+EventStream.swift
122 lines (114 loc) · 3.7 KB
/
AsyncSequence+EventStream.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//
// AsyncSequence+EventStream.swift
// Pioneer
//
// Created by d-exclaimation on 3:38 PM.
// Copyright © 2021 d-exclaimation. All rights reserved.
//
extension AsyncSequence {
/// Convert Any AsyncSequence to an EventStream for GraphQL Streaming.
///
/// - Returns: EventStream implementation for AsyncSequence.
public func toEventStream() -> EventSource<Element> {
if let nozzle = self as? Nozzle<Element> {
return nozzle.eventStream()
}
return AsyncEventStream<Element, Self>(from: self)
}
/// Convert any AsyncSequence to an EventStream
///
/// - Parameters:
/// - onTermination: onTermination callback
public func toEventStream(
onTermination: @escaping @Sendable () -> Void
) -> EventSource<Element> {
let (new, desolate) = Nozzle<Element>.desolate()
Task.init {
for try await each in self {
await desolate.task(with: each)
}
await desolate.task(with: .none)
}
defer {
new.onTermination(onTermination)
}
return EventNozzle<Element>(from: new)
}
/// Convert any AsyncSequence to an EventStream
///
/// - Parameters:
/// - endValue: Ending value
/// - onTermination: onTermination callback
public func toEventStream(
endValue: @escaping () -> Element,
onTermination: @escaping @Sendable () -> Void
) -> EventSource<Element> {
let (new, desolate) = Nozzle<Element>.desolate()
Task.init {
for try await each in self {
await desolate.task(with: each)
}
await desolate.task(with: endValue())
await desolate.task(with: .none)
}
defer {
new.onTermination(onTermination)
}
return EventNozzle<Element>(from: new)
}
/// Convert any AsyncSequence to an EventStream
///
/// - Parameters:
/// - initialValue: Initial value from subscriptions
/// - onTermination: onTermination callback
public func toEventStream(
initialValue: Element,
onTermination: @escaping @Sendable () -> Void
) -> EventSource<Element> {
let (new, desolate) = Nozzle<Element>.desolate()
Task.init {
await desolate.task(with: initialValue)
for try await each in self {
await desolate.task(with: each)
}
await desolate.task(with: .none)
}
defer {
new.onTermination(onTermination)
}
return EventNozzle<Element>(from: new)
}
/// Convert any AsyncSequence to an EventStream
///
/// - Parameters:
/// - initialValue: Initial value from subscriptions
/// - endValue: Ending value
/// - onTermination: onTermination callback
public func toEventStream(
initialValue: Element,
endValue: @escaping () -> Element,
onTermination: @escaping @Sendable () -> Void
) -> EventSource<Element> {
let (new, desolate) = Nozzle<Element>.desolate()
Task.init {
await desolate.task(with: initialValue)
for try await each in self {
await desolate.task(with: each)
}
await desolate.task(with: endValue())
await desolate.task(with: .none)
}
defer {
new.onTermination(onTermination)
}
return EventNozzle<Element>(from: new)
}
}
extension Nozzle {
/// Convert Any AsyncSequence to an EventStream for GraphQL Streaming.
///
/// - Returns: EventStream implementation for Nozzle.
public func eventStream() -> EventNozzle<Element> {
.init(from: self)
}
}