Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/srv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/websocket"
Expand All @@ -13,11 +14,34 @@ import (
)

// Client represents a connected WebSocket client with their subscription preferences.
//
// Connection management follows a simple pattern:
// - ONE goroutine (Run) handles ALL writes to avoid concurrent write issues
// - Server sends pings every pingInterval to detect dead connections
// - Client responds with pongs; read loop resets deadline on any message
// - Read loop (in websocket.go) detects disconnects and closes the connection
//
// Cleanup coordination (CRITICAL FOR THREAD SAFETY):
// Multiple goroutines can trigger cleanup concurrently:
// 1. Handle() defer in websocket.go calls Hub.Unregister() (async via channel)
// 2. Handle() defer in websocket.go calls closeWebSocket() (closes WS connection)
// 3. Client.Run() defer calls client.Close() when context is cancelled
// 4. Hub.Run() processes unregister message and calls client.Close()
// 5. Hub.cleanup() during shutdown calls client.Close() for all clients
//
// Thread safety is ensured by:
// - Close() uses sync.Once to ensure channels are closed exactly once
// - closed atomic flag allows checking if client is closing (safe from any goroutine)
// - Hub checks closed flag before sending to avoid race with channel close
// - closeWebSocket() does NOT send to client channels (would race with Close)
//
// Cleanup flow when a client disconnects:
// 1. Handle() read loop exits (EOF, timeout, or error)
// 2. defer cancel() signals Client.Run() via context
// 3. defer Hub.Unregister(clientID) sends message to hub (returns immediately)
// 4. defer closeWebSocket() closes the WebSocket connection only
// 5. Client.Run() sees context cancellation, exits, calls defer client.Close()
// 6. Hub.Run() processes unregister, calls client.Close() (idempotent via sync.Once)
type Client struct {
conn *websocket.Conn
send chan Event
Expand All @@ -28,6 +52,7 @@ type Client struct {
ID string
subscription Subscription
closeOnce sync.Once
closed uint32 // Atomic flag: 1 if closed, 0 if open
}

// NewClient creates a new client.
Expand Down Expand Up @@ -156,8 +181,18 @@ func (c *Client) write(msg any, timeout time.Duration) error {
// Close gracefully closes the client.
func (c *Client) Close() {
c.closeOnce.Do(func() {
// Set closed flag BEFORE closing channels
// This allows other goroutines to check if client is closing
atomic.StoreUint32(&c.closed, 1)

close(c.done)
close(c.send)
close(c.control)
})
}

// IsClosed returns true if the client is closed or closing.
// Safe to call from any goroutine.
func (c *Client) IsClosed() bool {
return atomic.LoadUint32(&c.closed) != 0
}
103 changes: 77 additions & 26 deletions pkg/srv/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ type Event struct {
// Hub manages WebSocket clients and event broadcasting.
// It runs in its own goroutine and handles client registration,
// unregistration, and event distribution.
//
// Thread safety design:
// - Single-goroutine pattern: Only Run() modifies the clients map
// - All external operations (Register, Unregister, Broadcast) send to buffered channels
// - ClientCount() uses RLock for safe concurrent reads
// - Client snapshot pattern in broadcast minimizes lock time
//
// Unregister coordination:
// - Unregister(clientID) sends message to channel and returns immediately (async)
// - Run() processes unregister messages in order
// - Calls client.Close() which is idempotent (sync.Once)
// - Multiple concurrent unregisters for same client are safe
//
// Broadcast safety:
// - Creates client snapshot with RLock, then releases lock
// - Non-blocking send to client.send channel prevents deadlocks
// - If client disconnects during iteration, send fails gracefully (channel full or closed)
// - Client.Close() is safe to call multiple times during this window
type Hub struct {
clients map[string]*Client
register chan *Client
Expand Down Expand Up @@ -142,9 +160,8 @@ func (h *Hub) Run(ctx context.Context) {
dropped := 0
for _, client := range clientSnapshot {
if matches(client.subscription, msg.event, msg.payload, client.userOrgs) {
// Non-blocking send
select {
case client.send <- msg.event:
// Try to send (safe against closed channels)
if h.trySendEvent(client, msg.event) {
matched++
logger.Info("delivered event to client", logger.Fields{
"client_id": client.ID,
Expand All @@ -154,9 +171,9 @@ func (h *Hub) Run(ctx context.Context) {
"pr_url": msg.event.URL,
"delivery_id": msg.event.DeliveryID,
})
default:
} else {
dropped++
logger.Warn("dropped event for client: buffer full", logger.Fields{
logger.Warn("dropped event for client: channel full or closed", logger.Fields{
"client_id": client.ID,
})
}
Expand Down Expand Up @@ -225,38 +242,72 @@ func (h *Hub) ClientCount() int {
return len(h.clients)
}

// trySendEvent attempts to send an event to a client's send channel.
// Returns true if sent successfully, false if channel is full or closed.
//
// CRITICAL: This function checks the client's closed flag before sending.
// This prevents race conditions where Client.Close() is called while Hub is broadcasting.
//
// Race scenario this handles:
// 1. Hub takes client snapshot (client in map, channels open)
// 2. Client.Close() is called (sets closed=1, then closes send channel)
// 3. Hub checks client.IsClosed() before sending
// 4. If closed=1, we don't attempt to send (avoiding panic)
//
// Note: There's still a tiny window between IsClosed() check and send where
// Close() could be called, so we keep recover() as a safety net.
func (h *Hub) trySendEvent(client *Client, event Event) (sent bool) {
// Check if client is closed before attempting send
// This prevents most races with client.Close()
if client.IsClosed() {
return false
}

defer func() {
if r := recover(); r != nil {
// Channel was closed between IsClosed() check and send
// This is a very rare race but possible, so we catch it
sent = false
}
}()

// Non-blocking send with panic protection
select {
case client.send <- event:
return true
default:
return false
}
}

// cleanup closes all client connections during shutdown.
//
// CRITICAL THREADING NOTE:
// This function MUST NOT send to client channels (send/control) because of race conditions:
// - Client.Close() can be called concurrently from multiple places (Handle defer, Run defer, etc.)
// - Once Close() starts, it closes all channels atomically
// - Trying to send to a closed channel panics, even with select/default
// - select/default only protects against FULL channels, not CLOSED channels
//
// Instead, we rely on:
// - WebSocket connection close will signal the client
// - Client.Run() will detect context cancellation and exit gracefully
// - client.Close() is idempotent (sync.Once) so safe to call multiple times
func (h *Hub) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()

logger.Info("Hub cleanup: closing client connections gracefully", logger.Fields{
logger.Info("Hub cleanup: closing client connections", logger.Fields{
"client_count": len(h.clients),
})

// Close all clients. DO NOT try to send shutdown messages - race with client.Close()
// The WebSocket connection close and context cancellation are sufficient signals.
for id, client := range h.clients {
// Try to send shutdown message (non-blocking)
select {
case client.send <- Event{Type: "shutdown"}:
logger.Info("sent shutdown notice to client", logger.Fields{"client_id": id})
default:
logger.Warn("could not send shutdown notice to client: channel full", logger.Fields{"client_id": id})
}
}

// Give clients a moment to process shutdown messages and close gracefully
// This allows time for proper WebSocket close frames to be sent
if len(h.clients) > 0 {
logger.Info("waiting for clients to receive shutdown messages", logger.Fields{
"client_count": len(h.clients),
})
time.Sleep(200 * time.Millisecond)
}

// Now close all clients
for _, client := range h.clients {
client.Close()
logger.Info("closed client during hub cleanup", logger.Fields{"client_id": id})
}

h.clients = nil
logger.Info("Hub cleanup complete", nil)
}
Loading