Revert "fix: redis pub/sub and streaming response issue"#6084
Conversation
This reverts commit 34aa82e.
There was a problem hiding this comment.
Code Review
This pull request refactors the Server-Sent Events (SSE) and Redis pub/sub implementation by removing several safety wrappers, cleanup routines, and the heartbeat mechanism. Feedback highlights critical stability and resource management concerns, specifically the removal of unsubscription logic in finally blocks which leads to Redis resource leaks. The reviewer also noted that the deletion of error handling around JSON parsing and response writing introduces risks of server crashes upon encountering malformed data or client disconnections. Furthermore, the removal of the heartbeat mechanism may result in dropped connections through proxies, and the elimination of centralized publishing helpers has caused significant code duplication and inconsistent logging practices.
| } finally { | ||
| if (isQueueMode && chatId) { | ||
| await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) | ||
| } | ||
| sseStreamer.removeClient(chatId) | ||
| } |
There was a problem hiding this comment.
The logic to unsubscribe from the Redis channel in the finally block has been removed. This will lead to stale subscriptions on the Redis server, as the application subscribes to a channel for each streaming request but never unsubscribes. This is a resource leak that can impact server performance and stability over time. Please reintroduce the unsubscribe call.
| } finally { | ||
| if (isQueueMode && chatId) { | ||
| await getRunningExpressApp().redisSubscriber.unsubscribe(chatId) | ||
| } | ||
| sseStreamer.removeClient(chatId) | ||
| } |
There was a problem hiding this comment.
The logic to unsubscribe from the Redis channel in the finally block has been removed. This will lead to stale subscriptions on the Redis server, as the application subscribes to a channel for each streaming request but never unsubscribes. This is a resource leak that can impact server performance and stability over time. Please reintroduce the unsubscribe call.
| this.sseStreamer.streamErrorEvent(chatId, err instanceof Error ? err.message : 'Failed to process stream event') | ||
| } | ||
| // Parse the message from Redis | ||
| const event = JSON.parse(message) |
| @@ -11,87 +11,66 @@ type Client = { | |||
| } | |||
|
|
|||
| export class SSEStreamer implements IServerSideEventStreamer { | |||
There was a problem hiding this comment.
The heartbeat mechanism (startHeartbeat, stopHeartbeat) has been removed. This is crucial for keeping Server-Sent Events (SSE) connections alive, especially when they pass through proxies or load balancers with idle timeouts. Without heartbeats, client connections are likely to be dropped unexpectedly, leading to a poor user experience. This functionality should be restored.
| streamCustomEvent(chatId: string, eventType: string, data: any) { | ||
| const clientResponse = { | ||
| event: eventType, | ||
| data: data | ||
| const client = this.clients[chatId] | ||
| if (client) { | ||
| const clientResponse = { | ||
| event: eventType, | ||
| data: data | ||
| } | ||
| client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') | ||
| } |
There was a problem hiding this comment.
Calling client.response.write() directly without error handling is unsafe. If the client disconnects, this call will throw an unhandled exception and crash the server. The previous safeWrite implementation prevented this by wrapping the write operation in a try-catch block. Please reintroduce a safe writing mechanism to prevent server crashes. This issue applies to all stream... methods in this class.
| streamCustomEvent(chatId: string, eventType: string, data: any) { | ||
| try { | ||
| await this.redisPublisher.publish(channel, message) | ||
| this.redisPublisher.publish( | ||
| chatId, | ||
| JSON.stringify({ | ||
| chatId, | ||
| eventType, | ||
| data | ||
| }) | ||
| ) | ||
| } catch (error) { | ||
| logger.error(`[RedisEventPublisher] Error publishing to channel ${channel}:`, { error }) | ||
| console.error('Error streaming custom event:', error) | ||
| } |
There was a problem hiding this comment.
The removal of the safePublish helper method has led to duplicated try-catch blocks in every stream... method. This increases code duplication and makes maintenance harder. Furthermore, the check for this.redisPublisher.isReady before publishing is now missing, and errors are logged via console.error instead of the application's logger. It's recommended to re-introduce a private helper method to centralize publishing logic, including readiness checks, error handling, and consistent logging to improve readability and reduce future errors.
References
- Prioritize code readability and understandability over conciseness. A series of simple, chained operations or helper methods can be preferable to duplicated complex logic to reduce the potential for future errors.
- Use a default (fallback) implementation (like the application's standard logger) unless the specific implementation has meaningfully different behavior.
Reverts #6008, accidentally merge into main.