Fix hub crash during Cloud Logging metadata outage#270
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a circuit breaker pattern (ResilientCloudHandler) to protect the hub from crashing or hanging during GCP metadata or Cloud Logging outages, alongside adding bounded buffers and client timeouts. The review feedback highlights several critical issues: a cleanup ordering bug that could cause panics on shutdown, a potential goroutine leak in flushWithTimeout during prolonged outages, and a deadlock risk from holding mutexes while calling external logging functions. Additionally, the request and message loggers currently bypass the circuit breaker, and the core background health-checking and recovery logic remains untested.
| resilientHandler, resilientCleanup := logging.NewResilientCloudHandler( | ||
| ch, logging.ResilientCloudHandlerConfig{}, | ||
| ) | ||
| cloudHandler = resilientHandler | ||
| cleanups = append(cleanups, resilientCleanup) | ||
| cleanups = append(cleanups, cloudLogCleanup) |
There was a problem hiding this comment.
The resilientCleanup (which stops the background health check goroutine) is appended to cleanups before cloudLogCleanup (which closes the underlying Cloud Logging client).
Because cleanups are executed in reverse order of how they are deferred in the main loop (for _, cleanup := range logCleanups { defer cleanup() }), cloudLogCleanup will execute before resilientCleanup. This means the underlying client/logger will be closed while the background health check goroutine is still running and potentially attempting to call Flush(), which can lead to panics or errors on shutdown.
To fix this, append cloudLogCleanup to cleanups before resilientCleanup so that resilientCleanup is executed first.
| resilientHandler, resilientCleanup := logging.NewResilientCloudHandler( | |
| ch, logging.ResilientCloudHandlerConfig{}, | |
| ) | |
| cloudHandler = resilientHandler | |
| cleanups = append(cleanups, resilientCleanup) | |
| cleanups = append(cleanups, cloudLogCleanup) | |
| resilientHandler, resilientCleanup := logging.NewResilientCloudHandler( | |
| ch, logging.ResilientCloudHandlerConfig{}, | |
| ) | |
| cloudHandler = resilientHandler | |
| cleanups = append(cleanups, cloudLogCleanup) | |
| cleanups = append(cleanups, resilientCleanup) |
| if ch, ok := cloudHandler.(*logging.ResilientCloudHandler); ok && ch != nil { | ||
| reqLogCfg.CloudClient = ch.Client() | ||
| reqLogCfg.ProjectID = logging.FormatProjectID() | ||
| } |
There was a problem hiding this comment.
The request logger and message logger are initialized with the raw *gcplog.Client (ch.Client()) instead of being wrapped by the ResilientCloudHandler.
During a Cloud Logging outage, writes to these loggers will bypass the circuit breaker and attempt to write directly to Cloud Logging, which could still lead to resource exhaustion, hangs, or crashes. Consider updating RequestLoggerConfig and MessageLoggerConfig to accept a wrapped handler or integrating the circuit breaker state at the client level so that all log streams benefit from the protection.
| type ResilientCloudHandler struct { | ||
| inner *CloudHandler | ||
| logger *gcplog.Logger | ||
| config ResilientCloudHandlerConfig | ||
| cb *circuitBreaker | ||
|
|
||
| done chan struct{} | ||
| wg sync.WaitGroup | ||
| } |
There was a problem hiding this comment.
Add a flushInFlight atomic boolean to the ResilientCloudHandler struct to prevent concurrent flush operations and avoid goroutine leaks during prolonged outages.
| type ResilientCloudHandler struct { | |
| inner *CloudHandler | |
| logger *gcplog.Logger | |
| config ResilientCloudHandlerConfig | |
| cb *circuitBreaker | |
| done chan struct{} | |
| wg sync.WaitGroup | |
| } | |
| type ResilientCloudHandler struct { | |
| inner *CloudHandler | |
| logger *gcplog.Logger | |
| config ResilientCloudHandlerConfig | |
| cb *circuitBreaker | |
| flushInFlight atomic.Bool | |
| done chan struct{} | |
| wg sync.WaitGroup | |
| } |
| func (h *ResilientCloudHandler) flushWithTimeout() error { | ||
| ctx, cancel := context.WithTimeout(context.Background(), h.config.ProbeTimeout) | ||
| defer cancel() | ||
|
|
||
| // Logger.Flush() doesn't accept a context, so we race it against our | ||
| // timeout. This prevents Flush from hanging indefinitely when the | ||
| // metadata service is unreachable. | ||
| errCh := make(chan error, 1) | ||
| go func() { | ||
| errCh <- h.logger.Flush() | ||
| }() | ||
|
|
||
| select { | ||
| case err := <-errCh: | ||
| return err | ||
| case <-ctx.Done(): | ||
| return fmt.Errorf("cloud logging flush timed out after %v", h.config.ProbeTimeout) | ||
| } | ||
| } |
There was a problem hiding this comment.
In flushWithTimeout, a new goroutine is spawned to call h.logger.Flush(). If the flush operation hangs indefinitely (which is the primary reason for the timeout), the spawned goroutine will be leaked. Since healthCheckLoop periodically calls runHealthCheck (and thus flushWithTimeout), this can lead to an unbounded accumulation of leaked goroutines during a prolonged outage.
To prevent this, use the flushInFlight atomic boolean to ensure that only one flush operation is active at any given time. If a previous flush is still running, subsequent health checks can immediately return an error without spawning a new goroutine.
func (h *ResilientCloudHandler) flushWithTimeout() error {
if !h.flushInFlight.CompareAndSwap(false, true) {
return fmt.Errorf("a previous flush is still in progress")
}
ctx, cancel := context.WithTimeout(context.Background(), h.config.ProbeTimeout)
defer cancel()
errCh := make(chan error, 1)
go func() {
defer h.flushInFlight.Store(false)
errCh <- h.logger.Flush()
}()
select {
case err := <-errCh:
return err
case <-ctx.Done():
return fmt.Errorf("cloud logging flush timed out after %v", h.config.ProbeTimeout)
}
}| func (h *ResilientCloudHandler) recordFailure(err error) { | ||
| h.cb.mu.Lock() | ||
| defer h.cb.mu.Unlock() | ||
|
|
||
| h.cb.failures++ | ||
| if h.cb.failures >= h.config.MaxFailures && circuitState(h.cb.state.Load()) == circuitClosed { | ||
| h.cb.state.Store(int32(circuitOpen)) | ||
| h.cb.lastStateChange = time.Now() | ||
| // Log to stderr since cloud logging is the one failing. | ||
| fmt.Fprintf(os.Stderr, | ||
| "WARNING: Cloud Logging circuit breaker opened after %d consecutive failures (last error: %v). Falling back to local-only logging.\n", | ||
| h.cb.failures, err) | ||
| // Also log via slog so the local handler captures it. | ||
| slog.Warn("Cloud Logging unavailable, falling back to local-only logging", | ||
| "consecutive_failures", h.cb.failures, | ||
| "error", err.Error(), | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
In recordFailure, the mutex h.cb.mu is held while calling external logging functions like slog.Warn and fmt.Fprintf. Holding locks across external calls is a known anti-pattern that can lead to unexpected deadlocks or performance bottlenecks if the logging handlers block or call back into the circuit breaker.
It is safer to perform the state updates under the lock, release the lock, and then perform the logging.
func (h *ResilientCloudHandler) recordFailure(err error) {
h.cb.mu.Lock()
h.cb.failures++
opened := false
if h.cb.failures >= h.config.MaxFailures && circuitState(h.cb.state.Load()) == circuitClosed {
h.cb.state.Store(int32(circuitOpen))
h.cb.lastStateChange = time.Now()
opened = true
}
failures := h.cb.failures
h.cb.mu.Unlock()
if opened {
// Log to stderr since cloud logging is the one failing.
fmt.Fprintf(os.Stderr,
"WARNING: Cloud Logging circuit breaker opened after %d consecutive failures (last error: %v). Falling back to local-only logging.\n",
failures, err)
// Also log via slog so the local handler captures it.
slog.Warn("Cloud Logging unavailable, falling back to local-only logging",
"consecutive_failures", failures,
"error", err.Error(),
)
}
}| // transitionTo atomically transitions the circuit to the given state and | ||
| // logs the transition. | ||
| func (h *ResilientCloudHandler) transitionTo(newState circuitState) { | ||
| h.cb.mu.Lock() | ||
| defer h.cb.mu.Unlock() | ||
|
|
||
| oldState := circuitState(h.cb.state.Load()) | ||
| if oldState == newState { | ||
| return | ||
| } | ||
|
|
||
| h.cb.state.Store(int32(newState)) | ||
| h.cb.lastStateChange = time.Now() | ||
|
|
||
| switch newState { | ||
| case circuitClosed: | ||
| h.cb.failures = 0 | ||
| slog.Info("Cloud Logging circuit breaker closed: Cloud Logging resumed") | ||
| case circuitOpen: | ||
| slog.Warn("Cloud Logging circuit breaker opened: falling back to local-only logging") | ||
| case circuitHalfOpen: | ||
| slog.Info("Cloud Logging circuit breaker half-open: probing Cloud Logging") | ||
| } |
There was a problem hiding this comment.
In transitionTo, the mutex h.cb.mu is held while calling external logging functions like slog.Info and slog.Warn. Holding locks across external calls is a known anti-pattern that can lead to unexpected deadlocks or performance bottlenecks if the logging handlers block or call back into the circuit breaker.
It is safer to perform the state updates under the lock, release the lock, and then perform the logging.
func (h *ResilientCloudHandler) transitionTo(newState circuitState) {
h.cb.mu.Lock()
oldState := circuitState(h.cb.state.Load())
if oldState == newState {
h.cb.mu.Unlock()
return
}
h.cb.state.Store(int32(newState))
h.cb.lastStateChange = time.Now()
if newState == circuitClosed {
h.cb.failures = 0
}
h.cb.mu.Unlock()
switch newState {
case circuitClosed:
slog.Info("Cloud Logging circuit breaker closed: Cloud Logging resumed")
case circuitOpen:
slog.Warn("Cloud Logging circuit breaker opened: falling back to local-only logging")
case circuitHalfOpen:
slog.Info("Cloud Logging circuit breaker half-open: probing Cloud Logging")
}
}| func newTestResilientHandler(t *testing.T, cfg ResilientCloudHandlerConfig) *ResilientCloudHandler { | ||
| t.Helper() | ||
| // Create a minimal CloudHandler without a real client/logger. | ||
| inner := &CloudHandler{ | ||
| level: slog.LevelInfo, | ||
| component: "test", | ||
| hostname: "test-host", | ||
| } | ||
| cfg.applyDefaults() | ||
| cb := &circuitBreaker{} | ||
| cb.state.Store(int32(circuitClosed)) | ||
| h := &ResilientCloudHandler{ | ||
| inner: inner, | ||
| config: cfg, | ||
| cb: cb, | ||
| done: make(chan struct{}), | ||
| } | ||
| // Don't start the health check loop — tests drive it explicitly. | ||
| return h | ||
| } |
There was a problem hiding this comment.
The test helper newTestResilientHandler explicitly avoids starting the background health check loop, and none of the unit tests invoke runHealthCheck, healthCheckLoop, or flushWithTimeout. As a result, the core background health checking, timeout-guarded flushing, and automatic recovery/probing logic is completely untested.
Consider adding unit tests that mock the gcplog.Logger or use a test double to verify that runHealthCheck correctly transitions the circuit state on failures and successes, and that timeouts are handled properly.
- Fix cleanup ordering: append cloudLogCleanup before resilientCleanup so health check goroutine stops before client closes on shutdown - Add circuitGatedHandler to gate request/message loggers through the circuit breaker, preventing bypass during Cloud Logging outages - Add flushInFlight atomic.Bool to prevent goroutine leaks from concurrent flush operations during prolonged outages - Move slog/fmt logging outside mutex in recordFailure and transitionTo to prevent potential deadlocks - Add tests for runHealthCheck, flushWithTimeout, flushInFlight guard, and circuitGatedHandler - Fix unused _linkedTelegramId in profile-telegram.ts (CI type check)
Add circuit breaker pattern to Cloud Logging integration: - ResilientCloudHandler wraps CloudHandler with three-state circuit breaker (closed/open/half-open) and background health checks - flushInFlight atomic prevents goroutine leaks from concurrent flushes - circuitGatedHandler gates request/message loggers through the circuit breaker so all log streams benefit from protection - CloudHandler gets BufferedByteLimit (8MiB) and ClientTimeout (15s) - Cleanup ordering ensures health check goroutine stops before client - Mutex-protected logging moved outside locks to prevent deadlocks
0c86336 to
fb75b99
Compare
Summary
Fixes ptone#70 — Hub crashes when Cloud Logging retries exhaust resources during metadata outage.
ResilientCloudHandlerthat wrapsCloudHandlerwith a three-state circuit breaker (closed → open → half-open). After 3 consecutive flush failures, the circuit opens and Cloud Logging entries are silently dropped. Local logging continues unaffected via themultiHandler. The circuit automatically probes for recovery and resumes Cloud Logging when the service returns.BufferedByteLimit(8 MiB default) to the Cloud Logging client to prevent unbounded memory growth during transient failures.gcplog.NewClientso the hub doesn't hang at startup when the metadata service is unreachable.Acceptance Criteria
Test plan
go vetclean