fix: replace Flux.concat with Flux.merge to fix unicast sink re-subsc…#15
Conversation
…ription bug Using Flux.concat caused sttFlux.share() to be subscribed twice sequentially — after the STT subscription completed, concat would subscribe llmTtsEvents which triggered a second subscription to the unicast audioSink, causing the "sinks only allow a single Subscriber" error. Fix: wrap sttEvents and llmTtsEvents in Flux.merge so both subscribe simultaneously before any emissions, satisfying the unicast constraint.
There was a problem hiding this comment.
Pull request overview
This PR adjusts the voice-processing reactive pipeline to prevent a second subscription to the shared STT stream (and the underlying unicast audio sink) by subscribing to the STT-result stream and the downstream LLM→TTS stream concurrently.
Changes:
- Replace the sequential
Flux.concat(...)chaining betweensttEventsandllmTtsEventswithFlux.merge(...)so both subscribe at the same time. - Remove the explicit
PipelineState.PROCESSINGstate-change event emission from the voice pipeline flow (now only internalstate.set(PROCESSING)remains).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return Flux.concat( | ||
| Flux.just(new PipelineEvent.StateChange(PipelineState.LISTENING)), | ||
| sttEvents.takeUntil(r -> ((PipelineEvent.SttResult) r).isFinal()), | ||
| Flux.just(new PipelineEvent.StateChange(PipelineState.PROCESSING)), | ||
| llmTtsEvents, | ||
| Flux.merge( | ||
| sttEvents.takeUntil(r -> ((PipelineEvent.SttResult) r).isFinal()), | ||
| llmTtsEvents | ||
| ), | ||
| Flux.just(new PipelineEvent.Complete()) |
There was a problem hiding this comment.
This change removes the emission of PipelineEvent.StateChange(PipelineState.PROCESSING) in the voice pipeline. The class-level state machine comment and processTextMessage both indicate clients should receive a PROCESSING transition, and AiChatWebSocketHandler forwards StateChange events to the client as pipeline_state. Consider emitting a PROCESSING StateChange when the first final STT result is received (e.g., prepend it inside the llmTtsEvents branch so it doesn't fire immediately on subscription), so voice and text flows remain consistent.
…ription bug
Using Flux.concat caused sttFlux.share() to be subscribed twice sequentially — after the STT subscription completed, concat would subscribe llmTtsEvents which triggered a second subscription to the unicast audioSink, causing the "sinks only allow a single Subscriber" error.
Fix: wrap sttEvents and llmTtsEvents in Flux.merge so both subscribe simultaneously before any emissions, satisfying the unicast constraint.
📌 变更内容
✅ 测试验证
PR 提交规范提醒: