Feat/realtime voice stt p0 p1#9
Conversation
- Update module list: character, conversation, ai, voice are implemented (were listed as planned) - Add AI streaming architecture docs (WebSocket, LlmProvider, STT/TTS abstraction) - Add critical coding rules: no Lombok, ID String convention - Add all 5 CI/CD workflows, validation scripts - Reference .ai-rules/ for detailed standards instead of duplicating - Reduce from ~450 lines to ~170 lines by removing duplication
- LLM providers: replace collectList()/reduce() with true per-chunk streaming in all 4 providers (Qiniu, OpenAI, Gemini, SiliconFlow) - OpenAI provider: replace fragile indexOf JSON parsing with Jackson - Gemini provider: fix O(n²) string concat with StringBuilder, add per-chunk parseGeminiChunk() for real streaming - AiStreamingService: chain message saves into pipeline with thenMany()/flatMap() instead of fire-and-forget subscribe()
- VolcanTTS: fix Bearer token format ("Bearer; " → "Bearer ")
- XunfeiSTT: replace Thread.sleep with CompletableFuture.delayedExecutor,
remove System.out.println debug statements
- XunfeiTTS: add subscribeOn(boundedElastic) to prevent blocking reactor
- MessageMapper: fix @select → @update on softDeleteByConversationId
- WebSocketConfig: setAllowedOrigins → setAllowedOriginPatterns
Implement full-duplex async streaming architecture: - PipelineState: state machine (IDLE→LISTENING→PROCESSING→SPEAKING) - PipelineEvent: type-safe event hierarchy replacing Map<String,Object> (SttResult, LlmTextChunk, SentenceAudio, BargeInAck, etc.) - SentenceChunker: splits LLM token stream into sentence-level chunks for parallel TTS synthesis (punctuation-aware, force-cut at 80 chars) - StreamingPipelineOrchestrator: per-session orchestrator managing STT→LLM→TTS lifecycle, barge-in with conversation history truncation, sentence-level TTS with flatMapSequential(concurrency=2) - AiChatWebSocketHandler: rewrite to delegate to orchestrator, add barge_in message type, type-safe event dispatching New WebSocket protocol messages: Client→Server: barge_in Server→Client: barge_in_ack, pipeline_state, sentence_audio
There was a problem hiding this comment.
Pull request overview
This PR upgrades the server-side real-time voice chat pipeline by moving WebSocket handling to a typed, orchestrated STT → LLM → TTS flow (with sentence-level TTS output and barge-in support), while also improving LLM provider streaming behavior and fixing a few protocol/HTTP header issues.
Changes:
- Refactor WebSocket voice/text chat to a
StreamingPipelineOrchestrator+ typedPipelineEventmodel, including sentence chunking and barge-in. - Make LLM providers emit “true streaming” chunks with incremental
accumulatedContent; improve OpenAI parsing via Jackson. - Fix/adjust supporting integrations (MyBatis annotation for UPDATE, WebSocket origin patterns, TTS/STT client improvements, Volcan auth header).
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| vocata-server/src/main/java/com/vocata/conversation/mapper/MessageMapper.java | Fix MyBatis annotation for soft-delete UPDATE. |
| vocata-server/src/main/java/com/vocata/config/WebSocketConfig.java | Switch to setAllowedOriginPatterns("*") for handler registration. |
| vocata-server/src/main/java/com/vocata/ai/websocket/AiChatWebSocketHandler.java | Major refactor to orchestrator-driven STT/LLM/TTS, new event dispatching, barge-in support. |
| vocata-server/src/main/java/com/vocata/ai/tts/impl/XunfeiStreamTtsClient.java | Offload synthesis work to boundedElastic scheduler. |
| vocata-server/src/main/java/com/vocata/ai/tts/impl/VolcanTtsClient.java | Fix Bearer Authorization header format. |
| vocata-server/src/main/java/com/vocata/ai/stt/impl/XunfeiWebSocketSttClient.java | Clean up logging; replace blocking sleep with delayed close. |
| vocata-server/src/main/java/com/vocata/ai/service/AiStreamingService.java | Ensure message persistence ordering around LLM/TTS, adjust chaining. |
| vocata-server/src/main/java/com/vocata/ai/pipeline/StreamingPipelineOrchestrator.java | New orchestrator implementing full streaming pipeline and barge-in. |
| vocata-server/src/main/java/com/vocata/ai/pipeline/SentenceChunker.java | New utility to segment token stream into sentence-sized chunks for TTS. |
| vocata-server/src/main/java/com/vocata/ai/pipeline/PipelineState.java | New pipeline state enum. |
| vocata-server/src/main/java/com/vocata/ai/pipeline/PipelineEvent.java | New typed event model replacing Map<String,Object> responses. |
| vocata-server/src/main/java/com/vocata/ai/llm/impl/SiliconFlowLlmProvider.java | True streaming SSE parsing + incremental accumulated content. |
| vocata-server/src/main/java/com/vocata/ai/llm/impl/QiniuLlmProvider.java | True streaming SSE parsing + incremental accumulated content. |
| vocata-server/src/main/java/com/vocata/ai/llm/impl/OpenAiLlmProvider.java | Replace string-parsing with Jackson-based parsing. |
| vocata-server/src/main/java/com/vocata/ai/llm/impl/GeminiLlmProvider.java | Switch to per-chunk parsing approach for streaming. |
| CLAUDE.md | Update project/dev documentation to reflect current architecture and commands. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private final AtomicReference<PipelineState> state = new AtomicReference<>(PipelineState.IDLE); | ||
| private volatile Disposable currentPipeline; | ||
| private volatile Sinks.Many<byte[]> currentAudioSink; |
There was a problem hiding this comment.
bargeIn()/dispose() attempt to cancel currentPipeline and complete currentAudioSink, but these fields are never assigned anywhere in this class. As a result, barge-in likely won't actually stop the in-flight LLM/TTS work (and currentAudioSink completion is a no-op), leading to continued downstream processing after a barge-in request. Capture and store the active pipeline Disposable (e.g., via doOnSubscribe/doFinally or Flux.usingWhen) and (if needed) the active audio sink, or remove these fields and rely solely on the caller-managed subscription/sink cancellation to keep the behavior correct and obvious.
| public static Flux<String> chunkToSentences(Flux<UnifiedAiStreamChunk> tokenStream) { | ||
| return Flux.create(sink -> { | ||
| StringBuilder buffer = new StringBuilder(); | ||
|
|
||
| tokenStream.subscribe( | ||
| chunk -> { | ||
| String content = chunk.getContent(); | ||
| if (content == null || content.isEmpty()) { |
There was a problem hiding this comment.
SentenceChunker.chunkToSentences directly calls tokenStream.subscribe(...) inside Flux.create but never disposes that subscription when the downstream cancels (e.g., WebSocket disconnect or barge-in). This can leak the upstream subscription and keep LLM streaming work running after the client has cancelled. Keep the Disposable returned by subscribe and register sink.onCancel(...)/sink.onDispose(...) to dispose it (and consider using Flux.push/handle to avoid manual subscribe).
| protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) { | ||
| SessionState state = sessions.get(session.getId()); | ||
| if (state != null && state.audioSink != null) { | ||
| byte[] audioData = message.getPayload().array(); |
There was a problem hiding this comment.
message.getPayload().array() is unsafe for BinaryMessage: the underlying ByteBuffer may be direct or not array-backed, and array() can throw UnsupportedOperationException or include bytes outside the readable range. Extract only the remaining bytes safely (e.g., duplicate buffer, read remaining() into a new byte[]), then emit that into the sink.
| byte[] audioData = message.getPayload().array(); | |
| java.nio.ByteBuffer payload = message.getPayload().duplicate(); | |
| byte[] audioData = new byte[payload.remaining()]; | |
| payload.get(audioData); |
| state.pipelineSubscription = state.orchestrator | ||
| .processTextMessage(conversationUuid, userId, text) | ||
| .doFinally(sig -> state.pipelineSubscription = null) | ||
| .subscribe( | ||
| event -> dispatchEvent(session, event), | ||
| error -> { | ||
| logger.error("文字管线失败", error); | ||
| sendJsonSafe(session, Map.of("type", "error", | ||
| "error", error.getMessage(), | ||
| "timestamp", System.currentTimeMillis())); | ||
| } | ||
| ); | ||
| } |
There was a problem hiding this comment.
handleTextInput overwrites state.pipelineSubscription without disposing any existing subscription. If a client sends multiple text_message requests quickly (or sends text while voice pipeline is active), multiple pipelines can run concurrently and interleave events on the same WebSocket session. Dispose/guard the previous subscription (or reject with a clear error) before starting a new one, similar to the audio session guard.
| private final Map<String, SessionState> sessions = new ConcurrentHashMap<>(); | ||
|
|
||
| // ═══════════════════════════════════════════════════════ | ||
| // 连接生命周期 | ||
| // ═══════════════════════════════════════════════════════ | ||
|
|
||
| @Override | ||
| public void afterConnectionEstablished(WebSocketSession session) throws Exception { | ||
| logger.info("AI语音WebSocket连接建立: {}", session.getId()); | ||
| logger.info("WebSocket连接建立: {}", session.getId()); | ||
|
|
||
| // 验证用户身份 | ||
| String authenticatedUserId = authenticateUser(session); | ||
| if (authenticatedUserId == null) { | ||
| logger.error("WebSocket连接验证失败,关闭连接: {}", session.getId()); | ||
| session.close(CloseStatus.NOT_ACCEPTABLE.withReason("身份验证失败")); | ||
| return; | ||
| } | ||
|
|
||
| // 将认证的用户ID存储到session中 | ||
| session.getAttributes().put("authenticatedUserId", authenticatedUserId); | ||
| logger.info("WebSocket用户认证成功: {} - 用户ID: {}", session.getId(), authenticatedUserId); | ||
|
|
||
| session.sendMessage(new TextMessage(objectMapper.writeValueAsString(Map.of( | ||
| "type", "status", | ||
| "message", "WebSocket连接已建立", | ||
| "timestamp", System.currentTimeMillis() | ||
| )))); | ||
| // 为此会话创建专属 Orchestrator | ||
| StreamingPipelineOrchestrator orchestrator = new StreamingPipelineOrchestrator( | ||
| llmProvider, sttClient, ttsClient, conversationService, | ||
| conversationMapper, messageMapper, characterMapper, | ||
| characterChatCountService, aiPromptEnhanceService, defaultLlmModel); | ||
|
|
||
| sessions.put(session.getId(), new SessionState(orchestrator)); | ||
|
|
There was a problem hiding this comment.
This refactor replaces the old voiceSessions + aiStreamingService.processVoiceMessage(...) flow with per-session StreamingPipelineOrchestrator. There is an existing unit test (AiChatWebSocketHandlerTest) that still reflects the old API/fields (e.g., it uses reflection to read voiceSessions and mocks aiStreamingService.processVoiceMessage), which will now fail to compile/run. Update or replace the tests to cover the new orchestrator-based behavior (sessions lifecycle, barge-in, sentence audio framing).
| llmProvider.streamChat(llmRequest) | ||
| .reduce("", (accumulated, chunk) -> | ||
| accumulated + (chunk.getContent() != null ? chunk.getContent() : "")) | ||
| .flatMap(fullResponse -> | ||
| saveMessage(conversation.getId(), fullResponse, SenderType.CHARACTER, userIdLong) | ||
| .doOnSuccess(msg -> logger.debug("已保存AI消息: {}", msg.getId())) | ||
| .thenReturn(new LlmResponse(fullResponse, character.getName(), true)) | ||
| ) |
There was a problem hiding this comment.
The reduce("", (accumulated, chunk) -> accumulated + ...) pattern repeatedly concatenates immutable Strings, which is O(n²) in the number of streamed chunks and can become expensive for long responses. Prefer accumulating with a StringBuilder (e.g., collect(StringBuilder::new, StringBuilder::append).map(StringBuilder::toString)) before persisting the full response.
| private Mono<UnifiedAiStreamChunk> parseOpenAiResponse(String jsonData) { | ||
| try { | ||
| // 这里应该使用JSON解析库,简化处理 | ||
| // 在实际项目中应该使用Jackson或Gson | ||
| com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); | ||
| com.fasterxml.jackson.databind.JsonNode root = mapper.readTree(jsonData); | ||
|
|
There was a problem hiding this comment.
parseOpenAiResponse creates a new ObjectMapper for every SSE line, which adds avoidable allocations and GC pressure under streaming load. Reuse a single shared ObjectMapper instance (e.g., a private static final ObjectMapper or an injected bean) since it is thread-safe for reads.
📌 变更内容
✅ 测试验证
PR 提交规范提醒: