What version of Effect is running?
3.21.2 (vendored at repos/effect)
What steps can reproduce the bug?
import { Effect, Stream } from 'effect'
import { Observable } from 'rxjs'
// Synchronous observable that emits 1 value then completes
const obs = new Observable<number>((subscriber) => {
subscriber.next(42)
subscriber.complete()
})
// Wraps the observable via asyncPush with a bounded dropping buffer
const stream = Stream.asyncPush<number, Error>((emit) =>
Effect.acquireRelease(
Effect.sync(() =>
obs.subscribe({
next: (value) => emit.single(value),
error: (error) => emit.fail(new Error(String(error))),
complete: () => emit.end(),
})
),
(sub) => Effect.sync(() => sub.unsubscribe()),
), { bufferSize: 1, strategy: 'dropping' })
await Effect.runPromise(Stream.runCollect(stream))
// HANGS FOREVER — never resolves
The same happens with { bufferSize: 1, strategy: "sliding" }.
When values are emitted asynchronously (e.g. via setTimeout), or when bufferSize >= number of synchronously emitted values + 1, the stream completes normally.
What is the expected behavior?
Stream.runCollect(stream) should return [42] — the stream should emit the synchronously emitted value and then complete normally when the observable calls complete().
What do you see instead?
The Effect hangs forever. The consumer channel is permanently blocked on Queue.take(queue), waiting for a termination signal (Exit.void) that was silently dropped.
Additional information
Root cause
The bug is in makePush (packages/effect/src/internal/stream/emit.ts:51-123). The end() and done() methods follow the same pattern:
// emit.ts lines 104-108
end() {
if (finished) return
finished = true
flush() // (A) offers accumulated values as 1 queue item
queue.unsafeOffer(Exit.void) // (B) offers the termination signal
}
flush() (line 73-78) offers the accumulated buffer to the queue via queue.unsafeOffer(buffer), consuming one queue slot. Then end() offers Exit.void as a second queue slot.
The queue is created by queueFromBufferOptionsPush (stream.ts:595-610):
const queueFromBufferOptionsPush = <A, E>(options?: ...) => {
// ... unbounded case ...
switch (options?.strategy) {
case "sliding":
return Queue.sliding(options.bufferSize ?? 16) // capacity = bufferSize
default:
return Queue.dropping(options?.bufferSize ?? 16) // capacity = bufferSize
}
}
The bounded queue has capacity equal to bufferSize. When bufferSize === 1:
flush() at step (A) offers an array of values → queue fills the only slot (capacity 1, now full)
queue.unsafeOffer(Exit.void) at step (B) tries to offer the termination signal → the backing MutableQueue.bounded(1) is full, offer returns false → the exit signal is silently dropped
The consumer loop in asyncPush (stream.ts:630-633) then blocks forever:
// stream.ts lines 630-633
const loop = core.flatMap(Queue.take(queue), (item) =>
Exit.isExit(item)
? Exit.isSuccess(item) ? core.void : core.failCause(item.cause)
: channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop))
It takes the value array from the queue, writes it to the channel, loops, and calls Queue.take(queue) again — but the queue is empty and no Exit.void will ever arrive because it was dropped.
Why bufferSize >= values.length + 1 works
With bufferSize: 2, the queue has capacity 2. flush() takes slot 1, and unsafeOffer(Exit.void) takes slot 2 — both fit. The stream terminates.
Why async emission works
When values are emitted asynchronously (e.g. via setTimeout), the scheduled flush task runs between emissions. The consumer concurrently drains items from the queue, so the queue never fills completely. When end() eventually runs, there's always room for Exit.void.
This also affects done() (error path)
// emit.ts lines 80-88
function done(exit: Exit.Exit<A, E>) {
if (finished) return
finished = true
if (exit._tag === 'Success') {
buffer.push(exit.value)
}
flush()
queue.unsafeOffer(exit._tag === 'Success' ? Exit.void : exit)
}
The same race exists for error/defect terminations on bounded queues with bufferSize: 1.
Impact
Any consumer that wraps a synchronous source (e.g., converting an RxJS Observable or a synchronous push iterator into a Stream) and uses a bounded dropping/sliding buffer will hang if the source emits synchronously and then completes within the register callback.
Proposed fix
queueFromBufferOptionsPush should create bounded queues with bufferSize + 1 capacity to reserve one slot for the termination signal:
switch (options?.strategy) {
case "sliding":
- return Queue.sliding(options.bufferSize ?? 16)
+ return Queue.sliding((options.bufferSize ?? 16) + 1)
default:
- return Queue.dropping(options?.bufferSize ?? 16)
+ return Queue.dropping((options?.bufferSize ?? 16) + 1)
}
The +1 slot is strictly for internal use by the emit machinery (the termination signal) and does not change the user-visible buffering behavior.
What version of Effect is running?
3.21.2(vendored atrepos/effect)What steps can reproduce the bug?
The same happens with
{ bufferSize: 1, strategy: "sliding" }.When values are emitted asynchronously (e.g. via
setTimeout), or whenbufferSize >= number of synchronously emitted values + 1, the stream completes normally.What is the expected behavior?
Stream.runCollect(stream)should return[42]— the stream should emit the synchronously emitted value and then complete normally when the observable callscomplete().What do you see instead?
The Effect hangs forever. The consumer channel is permanently blocked on
Queue.take(queue), waiting for a termination signal (Exit.void) that was silently dropped.Additional information
Root cause
The bug is in
makePush(packages/effect/src/internal/stream/emit.ts:51-123). Theend()anddone()methods follow the same pattern:flush()(line 73-78) offers the accumulated buffer to the queue viaqueue.unsafeOffer(buffer), consuming one queue slot. Thenend()offersExit.voidas a second queue slot.The queue is created by
queueFromBufferOptionsPush(stream.ts:595-610):The bounded queue has capacity equal to
bufferSize. WhenbufferSize === 1:flush()at step (A) offers an array of values → queue fills the only slot (capacity 1, now full)queue.unsafeOffer(Exit.void)at step (B) tries to offer the termination signal → the backingMutableQueue.bounded(1)is full,offerreturnsfalse→ the exit signal is silently droppedThe consumer loop in
asyncPush(stream.ts:630-633) then blocks forever:It takes the value array from the queue, writes it to the channel, loops, and calls
Queue.take(queue)again — but the queue is empty and noExit.voidwill ever arrive because it was dropped.Why
bufferSize >= values.length + 1worksWith
bufferSize: 2, the queue has capacity 2.flush()takes slot 1, andunsafeOffer(Exit.void)takes slot 2 — both fit. The stream terminates.Why async emission works
When values are emitted asynchronously (e.g. via
setTimeout), the scheduledflushtask runs between emissions. The consumer concurrently drains items from the queue, so the queue never fills completely. Whenend()eventually runs, there's always room forExit.void.This also affects
done()(error path)The same race exists for error/defect terminations on bounded queues with
bufferSize: 1.Impact
Any consumer that wraps a synchronous source (e.g., converting an RxJS Observable or a synchronous push iterator into a Stream) and uses a bounded dropping/sliding buffer will hang if the source emits synchronously and then completes within the register callback.
Proposed fix
queueFromBufferOptionsPushshould create bounded queues withbufferSize + 1capacity to reserve one slot for the termination signal:switch (options?.strategy) { case "sliding": - return Queue.sliding(options.bufferSize ?? 16) + return Queue.sliding((options.bufferSize ?? 16) + 1) default: - return Queue.dropping(options?.bufferSize ?? 16) + return Queue.dropping((options?.bufferSize ?? 16) + 1) }The
+1slot is strictly for internal use by the emit machinery (the termination signal) and does not change the user-visible buffering behavior.