Skip to content

[Detail Bug] autoConnect(2) in SSE pipeline prevents cancellation, causing zombie LLM streams after client disconnect #34

@detail-app

Description

@detail-app

Summary

  • Context: SseSupport is a utility component used by ChatController and GuidedLearningController to prepare and multicast Server-Sent Event (SSE) streams for LLM responses.
  • Bug: The prepareDataStream method uses publish().autoConnect(2) to handle multiple internal subscribers, which incorrectly traps cancellation signals and prevents them from reaching the upstream source.
  • Actual vs. expected: When a client disconnects (e.g., by closing a browser tab), the upstream LLM streaming request continues to completion in the background. It should instead propagate the cancellation to the LLM provider to stop token consumption and free up server resources.
  • Impact: This leads to a resource leak and unnecessary costs from the LLM provider, as "zombie" requests continue to stream and process data that will never be delivered or saved.

Code with bug

public Flux<String> prepareDataStream(Flux<String> source, Consumer<String> chunkConsumer) {
    return source.filter(chunk -> chunk != null && !chunk.isEmpty())
            .bufferTimeout(STREAM_CHUNK_COALESCE_MAX_ITEMS, Duration.ofMillis(STREAM_CHUNK_COALESCE_WINDOW_MS))
            .filter(chunkBatch -> !chunkBatch.isEmpty())
            .map(chunkBatch -> String.join("", chunkBatch))
            .doOnNext(chunk -> chunkConsumer.accept(chunk))
            .onBackpressureBuffer(
                    STREAM_BACKPRESSURE_BUFFER_CAPACITY,
                    this::recordDroppedCoalescedChunk,
                    BufferOverflowStrategy.DROP_OLDEST)
            // Two subscribers consume this stream in controllers:
            // 1) text event emission, 2) heartbeat termination signal.
            // autoConnect(2) prevents a race where one subscriber could miss the first chunks.
            .publish()
            .autoConnect(2); // <-- BUG 🔴 [Cancellation from downstream is ignored by autoConnect]
}

Evidence

1. Verification of autoConnect Cancellation Behavior

A test case confirms that autoConnect(n) does not propagate cancellation to the upstream source when all subscribers have disposed of their subscriptions, unlike refCount(n).

@Test
void testAutoConnectDoesNotCancelUpstream() {
    AtomicBoolean cancelled = new AtomicBoolean(false);
    Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
    
    Flux<Integer> source = sink.asFlux()
            .doOnCancel(() -> cancelled.set(true));

    Flux<Integer> shared = source.publish().autoConnect(2);

    var sub1 = shared.subscribe();
    var sub2 = shared.subscribe();

    sub1.dispose();
    sub2.dispose();

    // Verification fails if we expect cancellation
    assertFalse(cancelled.get(), "Upstream should NOT be cancelled with autoConnect");
}

2. Analysis of Chat Termination Logic

In ChatController, the fullResponse is accumulated via a chunkConsumer passed to prepareDataStream. This consumer is called inside a doOnNext operator located before the publish() point.

When a client disconnects:

  1. The HTTP response Flux is cancelled.
  2. The merge operator in the controller cancels dataEvents and heartbeats.
  3. These subscribers disconnect from the autoConnect Flux.
  4. Because autoConnect is used, the connection to the source (the LLM stream) remains active.
  5. doOnNext continues to fire, appending chunks to fullResponse and consuming LLM tokens.
  6. The final doOnComplete block in ChatController is never reached because the main chain was cancelled, so the accumulated fullResponse is discarded instead of being saved to memory, despite the full cost of the request being paid.

Why has this bug gone undetected?

This bug is silent and does not cause visible errors in the UI. In a development environment, the extra few seconds of background processing and the slightly higher LLM token usage are easily overlooked. Additionally, if a user cancels a request, they typically don't check if the partial message was saved to their history, or they assume it wasn't saved because they cancelled it.

Recommended fix

Replace autoConnect(2) with refCount(2) (or refCount(2, Duration.ZERO) for immediate cleanup). refCount(2) provides the same protection against the "first chunks" race by waiting for 2 subscribers before connecting, but it correctly propagates cancellation upstream when the subscriber count drops to zero.

            .publish()
            .refCount(2); // <-- FIX 🟢 [Correctly propagates cancellation to upstream source]

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingdetail

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions