diff --git a/cmd/pulse/main.go b/cmd/pulse/main.go index 289fa4b..2baf07f 100644 --- a/cmd/pulse/main.go +++ b/cmd/pulse/main.go @@ -164,11 +164,29 @@ func main() { } // Check if a download client with this name already exists. + // If it does, update it (service may have restarted on a new port + // or regenerated its API key). If not, create it. existing, _ := dlClientSvc.List(ctx) for _, e := range existing { if e.Name == svcInfo.Name { - logger.Info("pulse: download-client service already registered", - "name", svcInfo.Name, "service_id", serviceID) + _, updateErr := dlClientSvc.Update(ctx, e.ID, downloadclient.Input{ + Name: svcInfo.Name, + Kind: kind, + Protocol: protocol, + Enabled: e.Enabled, + Priority: int(e.Priority), + Host: host, + Port: port, + Password: svcInfo.ApiKey, + Settings: `{"pulse":true}`, + }) + if updateErr != nil { + logger.Warn("pulse: failed to update download-client on re-register", + "name", svcInfo.Name, "error", updateErr) + } else { + logger.Info("pulse: updated download-client service on re-register", + "name", svcInfo.Name, "host", host, "port", port) + } return } } @@ -181,6 +199,7 @@ func main() { Priority: 1, Host: host, Port: port, + Password: svcInfo.ApiKey, Settings: `{"pulse":true}`, }) if err != nil { diff --git a/pkg/sdk/client.go b/pkg/sdk/client.go index 8bfad8d..d39b5bd 100644 --- a/pkg/sdk/client.go +++ b/pkg/sdk/client.go @@ -59,6 +59,10 @@ type Config struct { // Capabilities declares what this service supports. Optional. Capabilities []string + // ServiceAPIKey is this service's own API key, shared during registration + // so other services that discover it via Pulse can authenticate. Optional. + ServiceAPIKey string + // HeartbeatInterval controls how often heartbeats are sent. Default: 30s. HeartbeatInterval time.Duration @@ -137,6 +141,51 @@ func New(cfg Config) (*Client, error) { return c, nil } +// NewWithRetry creates a client that retries registration with exponential +// backoff. It tries up to 6 times over ~2 minutes (2s, 4s, 8s, 16s, 32s, +// 32s). If Pulse is not configured (empty URL), returns nil immediately. +// If all attempts fail, returns nil and an error — the caller should +// continue in standalone mode. +func NewWithRetry(cfg Config) (*Client, error) { + if cfg.PulseURL == "" { + return nil, nil + } + if cfg.Logger == nil { + cfg.Logger = slog.Default() + } + + const maxAttempts = 6 + wait := 2 * time.Second + maxWait := 32 * time.Second + + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + c, err := New(cfg) + if err == nil { + return c, nil + } + lastErr = err + if attempt < maxAttempts { + cfg.Logger.Info("sdk: pulse unavailable, retrying", + "attempt", attempt, + "next_retry", wait, + "error", err, + ) + time.Sleep(wait) + wait *= 2 + if wait > maxWait { + wait = maxWait + } + } + } + + cfg.Logger.Warn("sdk: pulse registration failed after retries, running standalone", + "attempts", maxAttempts, + "error", lastErr, + ) + return nil, lastErr +} + // ServiceID returns the ID assigned by Pulse during registration. func (c *Client) ServiceID() string { return c.serviceID @@ -299,6 +348,9 @@ func (c *Client) register(ctx context.Context) (*Service, error) { "version": c.cfg.Version, "capabilities": c.cfg.Capabilities, } + if c.cfg.ServiceAPIKey != "" { + body["api_key"] = c.cfg.ServiceAPIKey + } var svc Service if err := c.doRequest(ctx, "POST", "/api/v1/services/register", body, &svc); err != nil { return nil, err @@ -308,7 +360,12 @@ func (c *Client) register(ctx context.Context) (*Service, error) { func (c *Client) heartbeatLoop(ctx context.Context) { defer c.wg.Done() - ticker := time.NewTicker(c.cfg.HeartbeatInterval) + + interval := c.cfg.HeartbeatInterval + maxInterval := 5 * time.Minute + consecutiveFailures := 0 + + ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -317,7 +374,44 @@ func (c *Client) heartbeatLoop(ctx context.Context) { return case <-ticker.C: if err := c.sendHeartbeat(ctx); err != nil { - c.logger.Warn("sdk: heartbeat failed", "error", err) + consecutiveFailures++ + c.logger.Warn("sdk: heartbeat failed", + "error", err, + "consecutive_failures", consecutiveFailures, + "next_interval", interval, + ) + + // After 5 consecutive failures, try re-registering in case + // Pulse restarted and lost our registration. + if consecutiveFailures == 5 { + c.logger.Info("sdk: attempting re-registration after persistent heartbeat failures") + if svc, rerr := c.register(ctx); rerr == nil { + c.serviceID = svc.ID + c.logger.Info("sdk: re-registered with pulse", "service_id", svc.ID) + consecutiveFailures = 0 + interval = c.cfg.HeartbeatInterval + ticker.Reset(interval) + continue + } + } + + // Exponential backoff: 30s → 45s → 67s → ... → 5m cap + interval = time.Duration(float64(interval) * 1.5) + if interval > maxInterval { + interval = maxInterval + } + ticker.Reset(interval) + } else { + if consecutiveFailures > 0 { + c.logger.Info("sdk: heartbeat recovered", + "after_failures", consecutiveFailures, + ) + } + consecutiveFailures = 0 + if interval != c.cfg.HeartbeatInterval { + interval = c.cfg.HeartbeatInterval + ticker.Reset(interval) + } } } }