-
Notifications
You must be signed in to change notification settings - Fork 73
Closed
Description
The new addition ConcurrentEventStream
utilizes the .mapStream
to transform elements of an AsyncThrowingStream
. However, the method .mapStream
produces 3 issues in 3 different scenarios.
Transformed stream never ended
After transforming the base stream using .mapStream
, the transformed stream never ended even if the base stream has ended.
// Base stream emits values and end
let base = AsyncThrowingStream(Int.self) { con in
let task = Task {
for i in 1...5 {
con.yield(i)
}
con.finish()
}
con.onTermination = { _ in
task.cancel()
}
}
// Transform using the `.mapStream`
let transform = base0.mapStream { "Received from 0: \($0)" }
let task = Task {
for try await _ in transform {
}
print("Done!!")
}
- When:
base
ended withcontinuation.finish()
- Expected:
transform
ended and "Done!!" is printed - Output: "Done!!" is not printed and
transform
never ended
Leaks
After transforming the base stream using .mapStream
, the base stream is not closed/cancelled/disposed of when the transformed stream is cancelled. Therefore, the base stream will keep emitting values even after the transformed one is no longer consumed.
// Base stream should emit value every second
let base0 = AsyncThrowingStream(Int.self) { con in
let task = Task {
for i in 1...5 {
print("Sending for 0: \(i)")
con.yield(i)
try await Task.sleep(nanoseconds: 1000_000_000)
}
con.finish()
}
con.onTermination = { _ in
task.cancel()
}
}
// Transform using the `.mapStream`
let transform0 = base0.mapStream { "Received from 0: \($0)" }
let task0 = Task {
for try await msg in transform0 {
print(msg)
}
}
// Cancel the transformed stream after 1 second (before the base stream ended)
Thread.sleep(forTimeInterval: 1)
task0.cancel()
- When:
transform0
is cancelled butbase0
still can emit more values - Expected:
base0
is cancelled and stops emitting any more values (preventing leaks) - Output:
base0
keeps emitting values
Error lost in transformation
After transforming the base stream using .mapStream
and if the base stream finish with an error, that error is not sent to the transformed stream and is lost in the process.
struct CustomError: Error {}
// Base stream will throw an error when finished
let base1 = AsyncThrowingStream(Int.self) { con in
let task = Task {
for i in 1...5 {
print("Sending for 1: \(i)")
con.yield(i)
}
print("Sending for 1: CustomError")
con.finish(throwing: CustomError())
}
con.onTermination = { _ in
task.cancel()
}
}
let transform1 = base1.mapStream { "Received from 1: \($0)" }
// Should catch error thrown by base stream
Task {
do {
for try await msg in transform1 {
print(msg)
}
return
} catch {
}
print("Done!!")
}
- When: When
base1
ended with an error - Expected:,
transform1
should do the same, and an error should be caught ("Done!!" is printed) - Output:
transform1
does not end, the error is not caught
Solution
Metadata
Metadata
Assignees
Labels
No labels