Skip to content

Commit be352ee

Browse files
amir20claude
andauthored
fix: back off cloud notification dispatcher on invalid API key (#4747)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6a014d1 commit be352ee

7 files changed

Lines changed: 120 additions & 0 deletions

File tree

internal/notification/dispatcher/cloud.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ func NewCloudDispatcher(name string, apiKey string, prefix string, expiresAt *ti
5353

5454
const defaultRetryAfter = 60 * time.Second
5555

56+
// unauthorizedRetryAfter is how long to back off after an auth failure (invalid/expired
57+
// API key). Retrying won't help until the user fixes their key, which recreates the
58+
// dispatcher and resets the breaker.
59+
const unauthorizedRetryAfter = 6 * time.Hour
60+
61+
// ResetBreaker clears the circuit breaker so the next Send dials cloud again.
62+
// Called when a cloud status check succeeds, proving the API key is valid.
63+
func (c *CloudDispatcher) ResetBreaker() {
64+
c.blockedUntil.Store(0)
65+
}
66+
5667
// Send sends a notification to Dozzle Cloud
5768
func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notification) error {
5869
if blockedUntil := c.blockedUntil.Load(); blockedUntil > 0 && time.Now().UnixNano() < blockedUntil {
@@ -99,6 +110,18 @@ func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notificat
99110
return fmt.Errorf("cloud rate limited, backing off for %s", retryAfter)
100111
}
101112

113+
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
114+
limitedReader := io.LimitReader(resp.Body, 1024*1024)
115+
responseBody, _ := io.ReadAll(limitedReader)
116+
c.blockedUntil.Store(time.Now().Add(unauthorizedRetryAfter).UnixNano())
117+
log.Warn().
118+
Str("cloud", c.Name).
119+
Int("status_code", resp.StatusCode).
120+
Dur("retry_after", unauthorizedRetryAfter).
121+
Msg("cloud rejected API key, circuit breaker tripped")
122+
return fmt.Errorf("cloud returned status code %d: %s", resp.StatusCode, string(responseBody))
123+
}
124+
102125
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
103126
limitedReader := io.LimitReader(resp.Body, 1024*1024)
104127
responseBody, _ := io.ReadAll(limitedReader)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package dispatcher
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func newTestCloudDispatcher(url string) *CloudDispatcher {
16+
return &CloudDispatcher{
17+
Name: "Dozzle Cloud",
18+
URL: url,
19+
APIKey: "test-key",
20+
client: &http.Client{Timeout: 5 * time.Second},
21+
}
22+
}
23+
24+
// On a 401/403 the breaker trips and subsequent sends short-circuit without
25+
// hitting cloud until the breaker is reset.
26+
func TestCloudDispatcher_AuthFailureTripsBreaker(t *testing.T) {
27+
for _, status := range []int{http.StatusUnauthorized, http.StatusForbidden} {
28+
var hits atomic.Int32
29+
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
30+
hits.Add(1)
31+
rw.WriteHeader(status)
32+
rw.Write([]byte("Invalid API key\n"))
33+
}))
34+
35+
d := newTestCloudDispatcher(srv.URL)
36+
37+
err := d.Send(context.Background(), newTestNotification("first"))
38+
require.Error(t, err)
39+
assert.EqualValues(t, 1, hits.Load(), "first send should reach cloud")
40+
41+
err = d.Send(context.Background(), newTestNotification("second"))
42+
require.Error(t, err)
43+
assert.Contains(t, err.Error(), "rate limited")
44+
assert.EqualValues(t, 1, hits.Load(), "breaker should block second send (status %d)", status)
45+
46+
srv.Close()
47+
}
48+
}
49+
50+
// ResetBreaker clears the circuit so the next send dials cloud again.
51+
func TestCloudDispatcher_ResetBreaker(t *testing.T) {
52+
var hits atomic.Int32
53+
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
54+
hits.Add(1)
55+
rw.WriteHeader(http.StatusUnauthorized)
56+
}))
57+
defer srv.Close()
58+
59+
d := newTestCloudDispatcher(srv.URL)
60+
61+
require.Error(t, d.Send(context.Background(), newTestNotification("first")))
62+
require.EqualValues(t, 1, hits.Load())
63+
64+
// Blocked while breaker is open.
65+
require.Error(t, d.Send(context.Background(), newTestNotification("blocked")))
66+
require.EqualValues(t, 1, hits.Load())
67+
68+
d.ResetBreaker()
69+
70+
require.Error(t, d.Send(context.Background(), newTestNotification("after-reset")))
71+
assert.EqualValues(t, 2, hits.Load(), "send after reset should reach cloud again")
72+
}

internal/notification/manager.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,16 @@ func (m *Manager) ClearCloudDispatcher() {
335335
log.Debug().Msg("Cleared cloud dispatcher")
336336
}
337337

338+
// ResetCloudDispatcherBreaker clears the cloud dispatcher's circuit breaker, if set.
339+
// No-op when no cloud dispatcher is registered.
340+
func (m *Manager) ResetCloudDispatcherBreaker() {
341+
if p := m.cloudDispatcher.Load(); p != nil {
342+
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
343+
cd.ResetBreaker()
344+
}
345+
}
346+
}
347+
338348

339349
// getDispatcher resolves a dispatcher by subscription's DispatcherID.
340350
// DispatcherID == 0 means the cloud dispatcher; otherwise lookup in the dispatchers map.

internal/support/docker/multi_host_service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@ func (m *MultiHostService) SetCloudStreamLogs(enabled bool) {
263263
m.persister.SetCloudStreamLogs(enabled)
264264
}
265265

266+
// ResetCloudDispatcherBreaker clears the cloud dispatcher's auth circuit breaker
267+
// so notifications resume immediately once the key is known good again.
268+
func (m *MultiHostService) ResetCloudDispatcherBreaker() {
269+
m.notificationManager.ResetCloudDispatcherBreaker()
270+
}
271+
266272
// RemoveCloudConfig clears the cloud config, removes the cloud dispatcher, deletes the file,
267273
// and broadcasts the change to all agents so they stop sending to cloud.
268274
func (m *MultiHostService) RemoveCloudConfig() {

internal/support/k8s/k8s_cluster_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,7 @@ func (m *K8sClusterService) SetCloudStreamLogs(enabled bool) {
238238
func (m *K8sClusterService) RemoveCloudConfig() {
239239
m.persister.RemoveCloudConfig()
240240
}
241+
242+
func (m *K8sClusterService) ResetCloudDispatcherBreaker() {
243+
m.notificationManager.ResetCloudDispatcherBreaker()
244+
}

internal/web/cloud.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ func (h *handler) cloudStatus(w http.ResponseWriter, r *http.Request) {
161161
return
162162
}
163163

164+
// A 200 proves the API key is valid, so clear any auth circuit breaker the
165+
// notification dispatcher tripped on a prior 401/403.
166+
h.hostService.ResetCloudDispatcherBreaker()
167+
164168
body, err := io.ReadAll(resp.Body)
165169
if err != nil {
166170
log.Error().Err(err).Msg("Failed to read cloud status response")

internal/web/routes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type HostService interface {
111111
SetCloudConfig(cc *notification.CloudConfig)
112112
SetCloudStreamLogs(enabled bool)
113113
RemoveCloudConfig()
114+
ResetCloudDispatcherBreaker()
114115
}
115116

116117
type handler struct {

0 commit comments

Comments
 (0)