diff --git a/cmd/haul/main.go b/cmd/haul/main.go index cfd7303..0338aba 100644 --- a/cmd/haul/main.go +++ b/cmd/haul/main.go @@ -114,18 +114,18 @@ func main() { "utp", cfg.Torrent.EnableUTP, ) - // ── Pulse integration (non-blocking) ────────────────────────────────── - go func() { - pi, piErr := pulse.New(cfg.Pulse, cfg.Server.Host, cfg.Server.Port, logger) - if piErr != nil { - logger.Warn("pulse integration failed", "error", piErr) - } - if pi != nil { - // Keep reference for cleanup — but since this is in a goroutine, - // we rely on process exit to clean up the heartbeat. - _ = pi - } - }() + // ── Pulse integration ───────────────────────────────────────────────── + // Registers with Pulse using retry/backoff (~2 minutes). If Pulse is + // unreachable, Haul continues in standalone mode. The service API key + // is sent during registration so Pilot/Prism can discover and + // authenticate with this Haul instance automatically. + pulseIntegration, pulseErr := pulse.New(cfg.Pulse, cfg.Server.Host, cfg.Server.Port, cfg.Auth.APIKey.Value(), logger) + if pulseErr != nil { + logger.Warn("pulse unavailable — running standalone", "error", pulseErr) + } + if pulseIntegration != nil { + defer pulseIntegration.Close() + } // ── Services ────────────────────────────────────────────────────────── categorySvc := category.NewService(database.SQL) diff --git a/go.mod b/go.mod index 57f0b6c..c8c4108 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/anacrolix/dht/v2 v2.23.0 github.com/anacrolix/publicip v0.3.1 github.com/anacrolix/torrent v1.61.0 - github.com/beacon-stack/pulse v0.1.0 + github.com/beacon-stack/pulse v0.2.0 github.com/coder/websocket v1.8.14 github.com/danielgtaylor/huma/v2 v2.37.3 github.com/go-chi/chi/v5 v5.2.5 diff --git a/go.sum b/go.sum index a96fe59..2831d6a 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,8 @@ github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmg github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= -github.com/beacon-stack/pulse v0.1.0 h1:PUUkyef7H07xpip/KzhVoq+uzAeh+dvGd+L1uMH6A+Q= -github.com/beacon-stack/pulse v0.1.0/go.mod h1:LyTlxljRmkoi7JOhLvtm6XnvYThcQn8eTk5l4a3Y4ko= +github.com/beacon-stack/pulse v0.2.0 h1:S0ncxtQvP2MJF5Swz81TSCDFw6lez7wXKJXuLbK1bps= +github.com/beacon-stack/pulse v0.2.0/go.mod h1:LyTlxljRmkoi7JOhLvtm6XnvYThcQn8eTk5l4a3Y4ko= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/immutable v0.4.1-0.20221220213129-8932b999621d h1:2qVb9bsAMtmAfnxXltm+6eBzrrS7SZ52c3SedsulaMI= github.com/benbjohnson/immutable v0.4.1-0.20221220213129-8932b999621d/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM= diff --git a/internal/pulse/integration.go b/internal/pulse/integration.go index c48a77f..942cec2 100644 --- a/internal/pulse/integration.go +++ b/internal/pulse/integration.go @@ -16,9 +16,15 @@ type Integration struct { logger *slog.Logger } -// New creates and registers with Pulse. Returns nil (not an error) -// if Pulse is not configured. -func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog.Logger) (*Integration, error) { +// New creates and registers with Pulse using retry/backoff. Returns nil +// (not an error) if Pulse is not configured (empty URL). Returns nil + error +// if Pulse is configured but unreachable after retries — the caller should +// continue in standalone mode. +// +// serviceAPIKey is this Haul instance's own API key. It's sent during +// registration so Pilot/Prism can discover and authenticate with Haul +// via Pulse's service registry. +func New(cfg config.PulseConfig, serverHost string, serverPort int, serviceAPIKey string, logger *slog.Logger) (*Integration, error) { if cfg.URL == "" { logger.Info("pulse integration disabled (no URL configured)") return nil, nil @@ -49,14 +55,15 @@ func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog healthURL = apiURL + "/health" } - client, err := sdk.New(sdk.Config{ - PulseURL: cfg.URL, - APIKey: apiKey, - ServiceName: version.AppName, - ServiceType: "download-client", - APIURL: apiURL, - HealthURL: healthURL, - Version: version.Version, + client, err := sdk.NewWithRetry(sdk.Config{ + PulseURL: cfg.URL, + APIKey: apiKey, + ServiceName: version.AppName, + ServiceType: "download-client", + APIURL: apiURL, + HealthURL: healthURL, + Version: version.Version, + ServiceAPIKey: serviceAPIKey, Capabilities: []string{ "supports_torrent", "supports_categories", @@ -65,7 +72,10 @@ func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog Logger: logger, }) if err != nil { - return nil, fmt.Errorf("pulse registration failed: %w", err) + return nil, err + } + if client == nil { + return nil, nil } return &Integration{Client: client, logger: logger}, nil