Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions cmd/haul/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,18 @@ func main() {
"utp", cfg.Torrent.EnableUTP,
)

// ── Pulse integration (non-blocking) ──────────────────────────────────
go func() {
pi, piErr := pulse.New(cfg.Pulse, cfg.Server.Host, cfg.Server.Port, logger)
if piErr != nil {
logger.Warn("pulse integration failed", "error", piErr)
}
if pi != nil {
// Keep reference for cleanup — but since this is in a goroutine,
// we rely on process exit to clean up the heartbeat.
_ = pi
}
}()
// ── Pulse integration ─────────────────────────────────────────────────
// Registers with Pulse using retry/backoff (~2 minutes). If Pulse is
// unreachable, Haul continues in standalone mode. The service API key
// is sent during registration so Pilot/Prism can discover and
// authenticate with this Haul instance automatically.
pulseIntegration, pulseErr := pulse.New(cfg.Pulse, cfg.Server.Host, cfg.Server.Port, cfg.Auth.APIKey.Value(), logger)
if pulseErr != nil {
logger.Warn("pulse unavailable — running standalone", "error", pulseErr)
}
if pulseIntegration != nil {
defer pulseIntegration.Close()
}

// ── Services ──────────────────────────────────────────────────────────
categorySvc := category.NewService(database.SQL)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/anacrolix/dht/v2 v2.23.0
github.com/anacrolix/publicip v0.3.1
github.com/anacrolix/torrent v1.61.0
github.com/beacon-stack/pulse v0.1.0
github.com/beacon-stack/pulse v0.2.0
github.com/coder/websocket v1.8.14
github.com/danielgtaylor/huma/v2 v2.37.3
github.com/go-chi/chi/v5 v5.2.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmg
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beacon-stack/pulse v0.1.0 h1:PUUkyef7H07xpip/KzhVoq+uzAeh+dvGd+L1uMH6A+Q=
github.com/beacon-stack/pulse v0.1.0/go.mod h1:LyTlxljRmkoi7JOhLvtm6XnvYThcQn8eTk5l4a3Y4ko=
github.com/beacon-stack/pulse v0.2.0 h1:S0ncxtQvP2MJF5Swz81TSCDFw6lez7wXKJXuLbK1bps=
github.com/beacon-stack/pulse v0.2.0/go.mod h1:LyTlxljRmkoi7JOhLvtm6XnvYThcQn8eTk5l4a3Y4ko=
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/benbjohnson/immutable v0.4.1-0.20221220213129-8932b999621d h1:2qVb9bsAMtmAfnxXltm+6eBzrrS7SZ52c3SedsulaMI=
github.com/benbjohnson/immutable v0.4.1-0.20221220213129-8932b999621d/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM=
Expand Down
34 changes: 22 additions & 12 deletions internal/pulse/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ type Integration struct {
logger *slog.Logger
}

// New creates and registers with Pulse. Returns nil (not an error)
// if Pulse is not configured.
func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog.Logger) (*Integration, error) {
// New creates and registers with Pulse using retry/backoff. Returns nil
// (not an error) if Pulse is not configured (empty URL). Returns nil + error
// if Pulse is configured but unreachable after retries — the caller should
// continue in standalone mode.
//
// serviceAPIKey is this Haul instance's own API key. It's sent during
// registration so Pilot/Prism can discover and authenticate with Haul
// via Pulse's service registry.
func New(cfg config.PulseConfig, serverHost string, serverPort int, serviceAPIKey string, logger *slog.Logger) (*Integration, error) {
if cfg.URL == "" {
logger.Info("pulse integration disabled (no URL configured)")
return nil, nil
Expand Down Expand Up @@ -49,14 +55,15 @@ func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog
healthURL = apiURL + "/health"
}

client, err := sdk.New(sdk.Config{
PulseURL: cfg.URL,
APIKey: apiKey,
ServiceName: version.AppName,
ServiceType: "download-client",
APIURL: apiURL,
HealthURL: healthURL,
Version: version.Version,
client, err := sdk.NewWithRetry(sdk.Config{
PulseURL: cfg.URL,
APIKey: apiKey,
ServiceName: version.AppName,
ServiceType: "download-client",
APIURL: apiURL,
HealthURL: healthURL,
Version: version.Version,
ServiceAPIKey: serviceAPIKey,
Capabilities: []string{
"supports_torrent",
"supports_categories",
Expand All @@ -65,7 +72,10 @@ func New(cfg config.PulseConfig, serverHost string, serverPort int, logger *slog
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("pulse registration failed: %w", err)
return nil, err
}
if client == nil {
return nil, nil
}

return &Integration{Client: client, logger: logger}, nil
Expand Down
Loading