Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions cmd/pulse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,29 @@ func main() {
}

// Check if a download client with this name already exists.
// If it does, update it (service may have restarted on a new port
// or regenerated its API key). If not, create it.
existing, _ := dlClientSvc.List(ctx)
for _, e := range existing {
if e.Name == svcInfo.Name {
logger.Info("pulse: download-client service already registered",
"name", svcInfo.Name, "service_id", serviceID)
_, updateErr := dlClientSvc.Update(ctx, e.ID, downloadclient.Input{
Name: svcInfo.Name,
Kind: kind,
Protocol: protocol,
Enabled: e.Enabled,
Priority: int(e.Priority),
Host: host,
Port: port,
Password: svcInfo.ApiKey,
Settings: `{"pulse":true}`,
})
if updateErr != nil {
logger.Warn("pulse: failed to update download-client on re-register",
"name", svcInfo.Name, "error", updateErr)
} else {
logger.Info("pulse: updated download-client service on re-register",
"name", svcInfo.Name, "host", host, "port", port)
}
return
}
}
Expand All @@ -181,6 +199,7 @@ func main() {
Priority: 1,
Host: host,
Port: port,
Password: svcInfo.ApiKey,
Settings: `{"pulse":true}`,
})
if err != nil {
Expand Down
98 changes: 96 additions & 2 deletions pkg/sdk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type Config struct {
// Capabilities declares what this service supports. Optional.
Capabilities []string

// ServiceAPIKey is this service's own API key, shared during registration
// so other services that discover it via Pulse can authenticate. Optional.
ServiceAPIKey string

// HeartbeatInterval controls how often heartbeats are sent. Default: 30s.
HeartbeatInterval time.Duration

Expand Down Expand Up @@ -137,6 +141,51 @@ func New(cfg Config) (*Client, error) {
return c, nil
}

// NewWithRetry creates a client that retries registration with exponential
// backoff. It tries up to 6 times over ~2 minutes (2s, 4s, 8s, 16s, 32s,
// 32s). If Pulse is not configured (empty URL), returns nil immediately.
// If all attempts fail, returns nil and an error — the caller should
// continue in standalone mode.
func NewWithRetry(cfg Config) (*Client, error) {
if cfg.PulseURL == "" {
return nil, nil
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}

const maxAttempts = 6
wait := 2 * time.Second
maxWait := 32 * time.Second

var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
c, err := New(cfg)
if err == nil {
return c, nil
}
lastErr = err
if attempt < maxAttempts {
cfg.Logger.Info("sdk: pulse unavailable, retrying",
"attempt", attempt,
"next_retry", wait,
"error", err,
)
time.Sleep(wait)
wait *= 2
if wait > maxWait {
wait = maxWait
}
}
}

cfg.Logger.Warn("sdk: pulse registration failed after retries, running standalone",
"attempts", maxAttempts,
"error", lastErr,
)
return nil, lastErr
}

// ServiceID returns the ID assigned by Pulse during registration.
func (c *Client) ServiceID() string {
return c.serviceID
Expand Down Expand Up @@ -299,6 +348,9 @@ func (c *Client) register(ctx context.Context) (*Service, error) {
"version": c.cfg.Version,
"capabilities": c.cfg.Capabilities,
}
if c.cfg.ServiceAPIKey != "" {
body["api_key"] = c.cfg.ServiceAPIKey
}
var svc Service
if err := c.doRequest(ctx, "POST", "/api/v1/services/register", body, &svc); err != nil {
return nil, err
Expand All @@ -308,7 +360,12 @@ func (c *Client) register(ctx context.Context) (*Service, error) {

func (c *Client) heartbeatLoop(ctx context.Context) {
defer c.wg.Done()
ticker := time.NewTicker(c.cfg.HeartbeatInterval)

interval := c.cfg.HeartbeatInterval
maxInterval := 5 * time.Minute
consecutiveFailures := 0

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
Expand All @@ -317,7 +374,44 @@ func (c *Client) heartbeatLoop(ctx context.Context) {
return
case <-ticker.C:
if err := c.sendHeartbeat(ctx); err != nil {
c.logger.Warn("sdk: heartbeat failed", "error", err)
consecutiveFailures++
c.logger.Warn("sdk: heartbeat failed",
"error", err,
"consecutive_failures", consecutiveFailures,
"next_interval", interval,
)

// After 5 consecutive failures, try re-registering in case
// Pulse restarted and lost our registration.
if consecutiveFailures == 5 {
c.logger.Info("sdk: attempting re-registration after persistent heartbeat failures")
if svc, rerr := c.register(ctx); rerr == nil {
c.serviceID = svc.ID
c.logger.Info("sdk: re-registered with pulse", "service_id", svc.ID)
consecutiveFailures = 0
interval = c.cfg.HeartbeatInterval
ticker.Reset(interval)
continue
}
}

// Exponential backoff: 30s → 45s → 67s → ... → 5m cap
interval = time.Duration(float64(interval) * 1.5)
if interval > maxInterval {
interval = maxInterval
}
ticker.Reset(interval)
} else {
if consecutiveFailures > 0 {
c.logger.Info("sdk: heartbeat recovered",
"after_failures", consecutiveFailures,
)
}
consecutiveFailures = 0
if interval != c.cfg.HeartbeatInterval {
interval = c.cfg.HeartbeatInterval
ticker.Reset(interval)
}
}
}
}
Expand Down
Loading