diff --git a/README.md b/README.md index 2d563b03..91ec599b 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,7 @@ Check the services are running: curl $OUTPOST_URL/api/v1/healthz ``` -Wait until you get a `OK%` response. +Wait until you get a 200 response. Create a tenant with the following command, replacing `$TENANT_ID` with a unique identifier such as "your_org_name", and the `$API_KEY` with the value you set in your `.env`: diff --git a/cmd/e2e/api_test.go b/cmd/e2e/api_test.go index c0adc093..b8b6114d 100644 --- a/cmd/e2e/api_test.go +++ b/cmd/e2e/api_test.go @@ -23,6 +23,20 @@ func (suite *basicSuite) TestHealthzAPI() { Match: &httpclient.Response{ StatusCode: http.StatusOK, }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "status": map[string]interface{}{ + "type": "string", + }, + "timestamp": map[string]interface{}{ + "type": "string", + }, + "workers": map[string]interface{}{ + "type": "object", + }, + }, + }, }, }, } diff --git a/cmd/e2e/suites_test.go b/cmd/e2e/suites_test.go index d76471fe..9bb1f0bb 100644 --- a/cmd/e2e/suites_test.go +++ b/cmd/e2e/suites_test.go @@ -30,14 +30,17 @@ type e2eSuite struct { mockServerInfra *testinfra.MockServerInfra cleanup func() client httpclient.Client + appDone chan struct{} } func (suite *e2eSuite) SetupSuite() { ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx suite.cancel = cancel + suite.appDone = make(chan struct{}) suite.client = httpclient.New(fmt.Sprintf("http://localhost:%d/api/v1", suite.config.APIPort), suite.config.APIKey) go func() { + defer close(suite.appDone) application := app.New(&suite.config) if err := application.Run(suite.ctx); err != nil { log.Println("Application failed to run", err) @@ -48,6 +51,8 @@ func (suite *e2eSuite) SetupSuite() { func (s *e2eSuite) TearDownSuite() { if s.cancel != nil { s.cancel() + // Wait for application to fully shut down before cleaning up resources + <-s.appDone } s.cleanup() } diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index bec4e773..dda75909 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -1686,17 +1686,101 @@ paths: get: tags: [Health] summary: Health Check - description: Simple health check endpoint. + description: | + Health check endpoint that reports the status of all workers. + + Returns HTTP 200 when all workers are healthy, or HTTP 503 if any worker has failed. + + The response includes: + - `status`: Overall health status ("healthy" or "failed") + - `timestamp`: When this health check was performed (ISO 8601 format) + - `workers`: Map of worker names to their individual health status + + Each worker reports: + - `status`: Worker health ("healthy" or "failed") + + Note: Error details are not exposed for security reasons. Check application logs for detailed error information. operationId: healthCheck security: [] responses: "200": - description: Service is healthy. + description: Service is healthy - all workers are operational. content: - text/plain: + application/json: + schema: + type: object + required: + - status + - timestamp + - workers + properties: + status: + type: string + enum: [healthy] + example: healthy + timestamp: + type: string + format: date-time + description: When this health check was performed + example: "2025-11-11T10:30:00Z" + workers: + type: object + additionalProperties: + type: object + required: + - status + properties: + status: + type: string + enum: [healthy] + example: healthy + example: + status: healthy + timestamp: "2025-11-11T10:30:00Z" + workers: + http-server: + status: healthy + retrymq-consumer: + status: healthy + "503": + description: Service is unhealthy - one or more workers have failed. + content: + application/json: schema: - type: string - example: OK + type: object + required: + - status + - timestamp + - workers + properties: + status: + type: string + enum: [failed] + example: failed + timestamp: + type: string + format: date-time + description: When this health check was performed + example: "2025-11-11T10:30:15Z" + workers: + type: object + additionalProperties: + type: object + required: + - status + properties: + status: + type: string + enum: [healthy, failed] + example: failed + example: + status: failed + timestamp: "2025-11-11T10:30:15Z" + workers: + http-server: + status: healthy + retrymq-consumer: + status: failed # Tenants /{tenant_id}: parameters: diff --git a/internal/services/api/auth_middleware.go b/internal/apirouter/auth_middleware.go similarity index 99% rename from internal/services/api/auth_middleware.go rename to internal/apirouter/auth_middleware.go index 3ebc01b6..d28797da 100644 --- a/internal/services/api/auth_middleware.go +++ b/internal/apirouter/auth_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" diff --git a/internal/services/api/auth_middleware_test.go b/internal/apirouter/auth_middleware_test.go similarity index 87% rename from internal/services/api/auth_middleware_test.go rename to internal/apirouter/auth_middleware_test.go index 4ade05d0..18fa43db 100644 --- a/internal/services/api/auth_middleware_test.go +++ b/internal/apirouter/auth_middleware_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "net/http" @@ -6,8 +6,9 @@ import ( "testing" "github.com/gin-gonic/gin" - api "github.com/hookdeck/outpost/internal/services/api" "github.com/stretchr/testify/assert" + + "github.com/hookdeck/outpost/internal/apirouter" ) func TestPublicRouter(t *testing.T) { @@ -19,7 +20,7 @@ func TestPublicRouter(t *testing.T) { t.Run("should accept requests without a token", func(t *testing.T) { t.Parallel() w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", baseAPIPath+"/healthz", nil) + req, _ := http.NewRequest("GET", baseAPIPath+"/tenant-id/topics", nil) router.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) }) @@ -27,7 +28,7 @@ func TestPublicRouter(t *testing.T) { t.Run("should accept requests with an invalid authorization token", func(t *testing.T) { t.Parallel() w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", baseAPIPath+"/healthz", nil) + req, _ := http.NewRequest("GET", baseAPIPath+"/tenant-id/topics", nil) req.Header.Set("Authorization", "invalid key") router.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) @@ -36,7 +37,7 @@ func TestPublicRouter(t *testing.T) { t.Run("should accept requests with a valid authorization token", func(t *testing.T) { t.Parallel() w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", baseAPIPath+"/healthz", nil) + req, _ := http.NewRequest("GET", baseAPIPath+"/tenant-id/topics", nil) req.Header.Set("Authorization", "Bearer key") router.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) @@ -99,7 +100,7 @@ func TestSetTenantIDMiddleware(t *testing.T) { // Create a middleware chain var tenantID string - handler := api.SetTenantIDMiddleware() + handler := apirouter.SetTenantIDMiddleware() nextHandler := func(c *gin.Context) { val, exists := c.Get("tenantID") if exists { @@ -124,7 +125,7 @@ func TestSetTenantIDMiddleware(t *testing.T) { // Create a middleware chain var tenantIDExists bool - handler := api.SetTenantIDMiddleware() + handler := apirouter.SetTenantIDMiddleware() nextHandler := func(c *gin.Context) { _, tenantIDExists = c.Get("tenantID") } @@ -145,7 +146,7 @@ func TestSetTenantIDMiddleware(t *testing.T) { // Create a middleware chain var tenantIDExists bool - handler := api.SetTenantIDMiddleware() + handler := apirouter.SetTenantIDMiddleware() nextHandler := func(c *gin.Context) { _, tenantIDExists = c.Get("tenantID") } @@ -175,7 +176,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { c.Params = []gin.Param{{Key: "tenantID", Value: "different_tenant"}} // Create JWT token for tenantID - token, err := api.JWT.New(jwtSecret, tenantID) + token, err := apirouter.JWT.New(jwtSecret, tenantID) if err != nil { t.Fatal(err) } @@ -185,7 +186,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { c.Request.Header.Set("Authorization", "Bearer "+token) // Test - handler := api.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) handler(c) assert.Equal(t, http.StatusUnauthorized, c.Writer.Status()) @@ -200,7 +201,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { c.Params = []gin.Param{{Key: "tenantID", Value: tenantID}} // Create JWT token for tenantID - token, err := api.JWT.New(jwtSecret, tenantID) + token, err := apirouter.JWT.New(jwtSecret, tenantID) if err != nil { t.Fatal(err) } @@ -211,7 +212,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { // Create a middleware chain var contextTenantID string - handler := api.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) nextHandler := func(c *gin.Context) { val, exists := c.Get("tenantID") if exists { @@ -242,7 +243,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { c.Request.Header.Set("Authorization", "Bearer "+apiKey) // Test - handler := api.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware(apiKey, jwtSecret) handler(c) assert.NotEqual(t, http.StatusUnauthorized, c.Writer.Status()) @@ -250,7 +251,7 @@ func TestAPIKeyOrTenantJWTAuthMiddleware(t *testing.T) { } func newJWTToken(t *testing.T, secret string, tenantID string) string { - token, err := api.JWT.New(secret, tenantID) + token, err := apirouter.JWT.New(secret, tenantID) if err != nil { t.Fatal(err) } @@ -338,7 +339,7 @@ func TestTenantJWTAuthMiddleware(t *testing.T) { c.Params = []gin.Param{{Key: "tenantID", Value: tt.paramTenantID}} } - handler := api.TenantJWTAuthMiddleware(tt.apiKey, tt.jwtSecret) + handler := apirouter.TenantJWTAuthMiddleware(tt.apiKey, tt.jwtSecret) handler(c) t.Logf("Test case: %s, Expected: %d, Got: %d", tt.name, tt.wantStatus, w.Code) @@ -362,7 +363,7 @@ func TestAuthRole(t *testing.T) { c, _ := gin.CreateTestContext(w) c.Request = httptest.NewRequest(http.MethodGet, "/", nil) - handler := api.APIKeyAuthMiddleware("") + handler := apirouter.APIKeyAuthMiddleware("") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -374,7 +375,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleAdmin, role) + assert.Equal(t, apirouter.RoleAdmin, role) }) t.Run("should set RoleAdmin when valid API key", func(t *testing.T) { @@ -383,7 +384,7 @@ func TestAuthRole(t *testing.T) { c.Request = httptest.NewRequest(http.MethodGet, "/", nil) c.Request.Header.Set("Authorization", "Bearer key") - handler := api.APIKeyAuthMiddleware("key") + handler := apirouter.APIKeyAuthMiddleware("key") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -395,7 +396,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleAdmin, role) + assert.Equal(t, apirouter.RoleAdmin, role) }) }) @@ -405,7 +406,7 @@ func TestAuthRole(t *testing.T) { c, _ := gin.CreateTestContext(w) c.Request = httptest.NewRequest(http.MethodGet, "/", nil) - handler := api.APIKeyOrTenantJWTAuthMiddleware("", "jwt_secret") + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware("", "jwt_secret") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -417,7 +418,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleAdmin, role) + assert.Equal(t, apirouter.RoleAdmin, role) }) t.Run("should set RoleAdmin when using API key", func(t *testing.T) { @@ -426,7 +427,7 @@ func TestAuthRole(t *testing.T) { c.Request = httptest.NewRequest(http.MethodGet, "/", nil) c.Request.Header.Set("Authorization", "Bearer key") - handler := api.APIKeyOrTenantJWTAuthMiddleware("key", "jwt_secret") + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware("key", "jwt_secret") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -438,7 +439,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleAdmin, role) + assert.Equal(t, apirouter.RoleAdmin, role) }) t.Run("should set RoleTenant when using valid JWT", func(t *testing.T) { @@ -448,7 +449,7 @@ func TestAuthRole(t *testing.T) { token := newJWTToken(t, "jwt_secret", "tenant-id") c.Request.Header.Set("Authorization", "Bearer "+token) - handler := api.APIKeyOrTenantJWTAuthMiddleware("key", "jwt_secret") + handler := apirouter.APIKeyOrTenantJWTAuthMiddleware("key", "jwt_secret") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -460,7 +461,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleTenant, role) + assert.Equal(t, apirouter.RoleTenant, role) }) }) @@ -472,7 +473,7 @@ func TestAuthRole(t *testing.T) { token := newJWTToken(t, "jwt_secret", "tenant-id") c.Request.Header.Set("Authorization", "Bearer "+token) - handler := api.TenantJWTAuthMiddleware("key", "jwt_secret") + handler := apirouter.TenantJWTAuthMiddleware("key", "jwt_secret") var role string nextHandler := func(c *gin.Context) { val, exists := c.Get("authRole") @@ -484,7 +485,7 @@ func TestAuthRole(t *testing.T) { handler(c) nextHandler(c) - assert.Equal(t, api.RoleTenant, role) + assert.Equal(t, apirouter.RoleTenant, role) }) t.Run("should not set role when apiKey is empty", func(t *testing.T) { @@ -494,7 +495,7 @@ func TestAuthRole(t *testing.T) { token := newJWTToken(t, "jwt_secret", "tenant-id") c.Request.Header.Set("Authorization", "Bearer "+token) - handler := api.TenantJWTAuthMiddleware("", "jwt_secret") + handler := apirouter.TenantJWTAuthMiddleware("", "jwt_secret") var roleExists bool nextHandler := func(c *gin.Context) { _, roleExists = c.Get("authRole") diff --git a/internal/services/api/destination_handlers.go b/internal/apirouter/destination_handlers.go similarity index 99% rename from internal/services/api/destination_handlers.go rename to internal/apirouter/destination_handlers.go index fdb0eac1..68292215 100644 --- a/internal/services/api/destination_handlers.go +++ b/internal/apirouter/destination_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" diff --git a/internal/services/api/errorhandler_middleware.go b/internal/apirouter/errorhandler_middleware.go similarity index 99% rename from internal/services/api/errorhandler_middleware.go rename to internal/apirouter/errorhandler_middleware.go index 979295c9..2bf74901 100644 --- a/internal/services/api/errorhandler_middleware.go +++ b/internal/apirouter/errorhandler_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "encoding/json" diff --git a/internal/services/api/jwt.go b/internal/apirouter/jwt.go similarity index 98% rename from internal/services/api/jwt.go rename to internal/apirouter/jwt.go index 5c1bb79c..d0f5f5c6 100644 --- a/internal/services/api/jwt.go +++ b/internal/apirouter/jwt.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" diff --git a/internal/services/api/jwt_test.go b/internal/apirouter/jwt_test.go similarity index 71% rename from internal/services/api/jwt_test.go rename to internal/apirouter/jwt_test.go index 5d01c2fd..5a90acbd 100644 --- a/internal/services/api/jwt_test.go +++ b/internal/apirouter/jwt_test.go @@ -1,12 +1,13 @@ -package api_test +package apirouter_test import ( "testing" "time" "github.com/golang-jwt/jwt/v5" - api "github.com/hookdeck/outpost/internal/services/api" "github.com/stretchr/testify/assert" + + "github.com/hookdeck/outpost/internal/apirouter" ) func TestJWT(t *testing.T) { @@ -19,37 +20,37 @@ func TestJWT(t *testing.T) { t.Run("should generate a new jwt token", func(t *testing.T) { t.Parallel() - token, err := api.JWT.New(jwtKey, tenantID) + token, err := apirouter.JWT.New(jwtKey, tenantID) assert.Nil(t, err) assert.NotEqual(t, "", token) }) t.Run("should verify a valid jwt token", func(t *testing.T) { t.Parallel() - token, err := api.JWT.New(jwtKey, tenantID) + token, err := apirouter.JWT.New(jwtKey, tenantID) if err != nil { t.Fatal(err) } - valid, err := api.JWT.Verify(jwtKey, token, tenantID) + valid, err := apirouter.JWT.Verify(jwtKey, token, tenantID) assert.Nil(t, err) assert.True(t, valid) }) t.Run("should extract tenantID from valid token", func(t *testing.T) { t.Parallel() - token, err := api.JWT.New(jwtKey, tenantID) + token, err := apirouter.JWT.New(jwtKey, tenantID) if err != nil { t.Fatal(err) } - extractedTenantID, err := api.JWT.ExtractTenantID(jwtKey, token) + extractedTenantID, err := apirouter.JWT.ExtractTenantID(jwtKey, token) assert.Nil(t, err) assert.Equal(t, tenantID, extractedTenantID) }) t.Run("should fail to extract tenantID from invalid token", func(t *testing.T) { t.Parallel() - _, err := api.JWT.ExtractTenantID(jwtKey, "invalid_token") - assert.ErrorIs(t, err, api.ErrInvalidToken) + _, err := apirouter.JWT.ExtractTenantID(jwtKey, "invalid_token") + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) }) t.Run("should fail to extract tenantID from token with invalid issuer", func(t *testing.T) { @@ -65,14 +66,14 @@ func TestJWT(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = api.JWT.ExtractTenantID(jwtKey, token) - assert.ErrorIs(t, err, api.ErrInvalidToken) + _, err = apirouter.JWT.ExtractTenantID(jwtKey, token) + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) }) t.Run("should reject an invalid token", func(t *testing.T) { t.Parallel() - valid, err := api.JWT.Verify(jwtKey, "invalid_token", tenantID) - assert.ErrorIs(t, err, api.ErrInvalidToken) + valid, err := apirouter.JWT.Verify(jwtKey, "invalid_token", tenantID) + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) assert.False(t, valid) }) @@ -89,8 +90,8 @@ func TestJWT(t *testing.T) { if err != nil { t.Fatal(err) } - valid, err := api.JWT.Verify(jwtKey, token, tenantID) - assert.ErrorIs(t, err, api.ErrInvalidToken) + valid, err := apirouter.JWT.Verify(jwtKey, token, tenantID) + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) assert.False(t, valid) }) @@ -107,8 +108,8 @@ func TestJWT(t *testing.T) { if err != nil { t.Fatal(err) } - valid, err := api.JWT.Verify(jwtKey, token, tenantID) - assert.ErrorIs(t, err, api.ErrInvalidToken) + valid, err := apirouter.JWT.Verify(jwtKey, token, tenantID) + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) assert.False(t, valid) }) @@ -125,8 +126,8 @@ func TestJWT(t *testing.T) { if err != nil { t.Fatal(err) } - valid, err := api.JWT.Verify(jwtKey, token, tenantID) - assert.ErrorIs(t, err, api.ErrInvalidToken) + valid, err := apirouter.JWT.Verify(jwtKey, token, tenantID) + assert.ErrorIs(t, err, apirouter.ErrInvalidToken) assert.False(t, valid) }) } diff --git a/internal/services/api/latency_middleware.go b/internal/apirouter/latency_middleware.go similarity index 96% rename from internal/services/api/latency_middleware.go rename to internal/apirouter/latency_middleware.go index 66efabea..ea1bbeb7 100644 --- a/internal/services/api/latency_middleware.go +++ b/internal/apirouter/latency_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "time" diff --git a/internal/services/api/latency_middleware_test.go b/internal/apirouter/latency_middleware_test.go similarity index 89% rename from internal/services/api/latency_middleware_test.go rename to internal/apirouter/latency_middleware_test.go index 563c152f..5ec55d28 100644 --- a/internal/services/api/latency_middleware_test.go +++ b/internal/apirouter/latency_middleware_test.go @@ -1,4 +1,4 @@ -package api +package apirouter_test import ( "net/http" @@ -7,6 +7,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/hookdeck/outpost/internal/apirouter" "github.com/stretchr/testify/assert" ) @@ -24,7 +25,7 @@ func TestMiddlewareOrder(t *testing.T) { executionOrder = append(executionOrder, "metrics_start") c.Next() executionOrder = append(executionOrder, "metrics_end") - metricsLatency = GetRequestLatency(c) + metricsLatency = apirouter.GetRequestLatency(c) } } @@ -34,7 +35,7 @@ func TestMiddlewareOrder(t *testing.T) { executionOrder = append(executionOrder, "logger_start") c.Next() executionOrder = append(executionOrder, "logger_end") - loggerLatency = GetRequestLatency(c) + loggerLatency = apirouter.GetRequestLatency(c) } } @@ -42,7 +43,7 @@ func TestMiddlewareOrder(t *testing.T) { r := gin.New() r.Use(mockMetrics()) r.Use(mockLogger()) - r.Use(LatencyMiddleware()) + r.Use(apirouter.LatencyMiddleware()) // Add a handler that sleeps to simulate work r.GET("/test", func(c *gin.Context) { diff --git a/internal/services/api/log_handlers.go b/internal/apirouter/log_handlers.go similarity index 99% rename from internal/services/api/log_handlers.go rename to internal/apirouter/log_handlers.go index ad29e931..e827021b 100644 --- a/internal/services/api/log_handlers.go +++ b/internal/apirouter/log_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "net/http" diff --git a/internal/services/api/logger_middleware.go b/internal/apirouter/logger_middleware.go similarity index 96% rename from internal/services/api/logger_middleware.go rename to internal/apirouter/logger_middleware.go index f249528f..be3f72aa 100644 --- a/internal/services/api/logger_middleware.go +++ b/internal/apirouter/logger_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "fmt" @@ -60,11 +60,7 @@ func LoggerMiddlewareWithSanitizer(logger *logging.Logger, sanitizer *RequestBod hub.CaptureException(getErrorWithStackTrace(c.Errors.Last().Err)) } } else { - if strings.HasPrefix(c.Request.URL.Path, "/api") && strings.HasSuffix(c.Request.URL.Path, "/healthz") { - logger.Debug("healthz request completed", fields...) - } else { - logger.Info("request completed", fields...) - } + logger.Info("request completed", fields...) } } } diff --git a/internal/services/api/logger_middleware_integration_test.go b/internal/apirouter/logger_middleware_integration_test.go similarity index 98% rename from internal/services/api/logger_middleware_integration_test.go rename to internal/apirouter/logger_middleware_integration_test.go index 398e6c90..2d44ea86 100644 --- a/internal/services/api/logger_middleware_integration_test.go +++ b/internal/apirouter/logger_middleware_integration_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "bytes" @@ -11,11 +11,11 @@ import ( "testing" "github.com/gin-gonic/gin" + "github.com/hookdeck/outpost/internal/apirouter" "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/metadata" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/services/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uptrace/opentelemetry-go-extra/otelzap" @@ -134,9 +134,9 @@ func setupTestEnvironment(t *testing.T) (*gin.Engine, *observer.ObservedLogs, de registry := &mockRegistry{loader: metadataLoader} // Create sanitizer and router - sanitizer := api.NewRequestBodySanitizer(registry) + sanitizer := apirouter.NewRequestBodySanitizer(registry) router := gin.New() - router.Use(api.LoggerMiddlewareWithSanitizer(testLogger, sanitizer)) + router.Use(apirouter.LoggerMiddlewareWithSanitizer(testLogger, sanitizer)) return router, logs, registry } diff --git a/internal/services/api/metrics_middleware.go b/internal/apirouter/metrics_middleware.go similarity index 96% rename from internal/services/api/metrics_middleware.go rename to internal/apirouter/metrics_middleware.go index c557bb98..daa8a314 100644 --- a/internal/services/api/metrics_middleware.go +++ b/internal/apirouter/metrics_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "github.com/gin-gonic/gin" diff --git a/internal/services/api/publish_handlers.go b/internal/apirouter/publish_handlers.go similarity index 99% rename from internal/services/api/publish_handlers.go rename to internal/apirouter/publish_handlers.go index 69400ddf..bab8354f 100644 --- a/internal/services/api/publish_handlers.go +++ b/internal/apirouter/publish_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" diff --git a/internal/services/api/publish_handlers_test.go b/internal/apirouter/publish_handlers_test.go similarity index 97% rename from internal/services/api/publish_handlers_test.go rename to internal/apirouter/publish_handlers_test.go index 07d3d1b1..fdc026f4 100644 --- a/internal/services/api/publish_handlers_test.go +++ b/internal/apirouter/publish_handlers_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "encoding/json" diff --git a/internal/services/api/requiretenant_middleware.go b/internal/apirouter/requiretenant_middleware.go similarity index 98% rename from internal/services/api/requiretenant_middleware.go rename to internal/apirouter/requiretenant_middleware.go index 2d51fb50..022ea1a0 100644 --- a/internal/services/api/requiretenant_middleware.go +++ b/internal/apirouter/requiretenant_middleware.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "net/http" diff --git a/internal/services/api/requiretenant_middleware_test.go b/internal/apirouter/requiretenant_middleware_test.go similarity index 98% rename from internal/services/api/requiretenant_middleware_test.go rename to internal/apirouter/requiretenant_middleware_test.go index e5681284..5cddbe0f 100644 --- a/internal/services/api/requiretenant_middleware_test.go +++ b/internal/apirouter/requiretenant_middleware_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "context" diff --git a/internal/services/api/retry_handlers.go b/internal/apirouter/retry_handlers.go similarity index 99% rename from internal/services/api/retry_handlers.go rename to internal/apirouter/retry_handlers.go index 3f5acb9c..d6035e4b 100644 --- a/internal/services/api/retry_handlers.go +++ b/internal/apirouter/retry_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" diff --git a/internal/services/api/router.go b/internal/apirouter/router.go similarity index 99% rename from internal/services/api/router.go rename to internal/apirouter/router.go index 36fc7d8b..3f96092e 100644 --- a/internal/services/api/router.go +++ b/internal/apirouter/router.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "errors" @@ -148,10 +148,6 @@ func NewRouter( apiRouter := r.Group("/api/v1") apiRouter.Use(SetTenantIDMiddleware()) - apiRouter.GET("/healthz", func(c *gin.Context) { - c.String(http.StatusOK, "OK") - }) - tenantHandlers := NewTenantHandlers(logger, telemetry, cfg.JWTSecret, entityStore) destinationHandlers := NewDestinationHandlers(logger, telemetry, entityStore, cfg.Topics, cfg.Registry) publishHandlers := NewPublishHandlers(logger, publishmqEventHandler) diff --git a/internal/services/api/router_test.go b/internal/apirouter/router_test.go similarity index 93% rename from internal/services/api/router_test.go rename to internal/apirouter/router_test.go index dcdf0e47..fd2511eb 100644 --- a/internal/services/api/router_test.go +++ b/internal/apirouter/router_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "context" @@ -18,8 +18,9 @@ import ( "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" "github.com/hookdeck/outpost/internal/redis" - "github.com/hookdeck/outpost/internal/services/api" "github.com/hookdeck/outpost/internal/telemetry" + + "github.com/hookdeck/outpost/internal/apirouter" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -37,8 +38,8 @@ func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *te entityStore := setupTestEntityStore(t, redisClient, nil) logStore := setupTestLogStore(t, funcs...) eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, testutil.TestTopics, idempotence.New(redisClient, idempotence.WithSuccessfulTTL(24*time.Hour))) - router := api.NewRouter( - api.RouterConfig{ + router := apirouter.NewRouter( + apirouter.RouterConfig{ ServiceName: "", APIKey: apiKey, JWTSecret: jwtSecret, @@ -88,21 +89,11 @@ func TestRouterWithAPIKey(t *testing.T) { router, _, _ := setupTestRouter(t, apiKey, jwtSecret) tenantID := "tenantID" - validToken, err := api.JWT.New(jwtSecret, tenantID) + validToken, err := apirouter.JWT.New(jwtSecret, tenantID) if err != nil { t.Fatal(err) } - t.Run("healthcheck should work", func(t *testing.T) { - t.Parallel() - - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", baseAPIPath+"/healthz", nil) - router.ServeHTTP(w, req) - - assert.Equal(t, http.StatusOK, w.Code) - }) - t.Run("should block unauthenticated request to admin routes", func(t *testing.T) { t.Parallel() @@ -201,21 +192,11 @@ func TestRouterWithoutAPIKey(t *testing.T) { router, _, _ := setupTestRouter(t, apiKey, jwtSecret) tenantID := "tenantID" - validToken, err := api.JWT.New(jwtSecret, tenantID) + validToken, err := apirouter.JWT.New(jwtSecret, tenantID) if err != nil { t.Fatal(err) } - t.Run("healthcheck should work", func(t *testing.T) { - t.Parallel() - - w := httptest.NewRecorder() - req, _ := http.NewRequest("GET", baseAPIPath+"/healthz", nil) - router.ServeHTTP(w, req) - - assert.Equal(t, http.StatusOK, w.Code) - }) - t.Run("should allow unauthenticated request to admin routes", func(t *testing.T) { t.Parallel() diff --git a/internal/services/api/sanitizer.go b/internal/apirouter/sanitizer.go similarity index 99% rename from internal/services/api/sanitizer.go rename to internal/apirouter/sanitizer.go index a4a41b12..97ec8d18 100644 --- a/internal/services/api/sanitizer.go +++ b/internal/apirouter/sanitizer.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "bytes" diff --git a/internal/services/api/sanitizer_test.go b/internal/apirouter/sanitizer_test.go similarity index 99% rename from internal/services/api/sanitizer_test.go rename to internal/apirouter/sanitizer_test.go index 7d878f85..23f1bb33 100644 --- a/internal/services/api/sanitizer_test.go +++ b/internal/apirouter/sanitizer_test.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "strings" diff --git a/internal/services/api/tenant_handlers.go b/internal/apirouter/tenant_handlers.go similarity index 99% rename from internal/services/api/tenant_handlers.go rename to internal/apirouter/tenant_handlers.go index ca41de70..059c2fcb 100644 --- a/internal/services/api/tenant_handlers.go +++ b/internal/apirouter/tenant_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "net/http" diff --git a/internal/services/api/tenant_handlers_test.go b/internal/apirouter/tenant_handlers_test.go similarity index 99% rename from internal/services/api/tenant_handlers_test.go rename to internal/apirouter/tenant_handlers_test.go index f9a6e065..ef0b1336 100644 --- a/internal/services/api/tenant_handlers_test.go +++ b/internal/apirouter/tenant_handlers_test.go @@ -1,4 +1,4 @@ -package api_test +package apirouter_test import ( "context" diff --git a/internal/services/api/topic_handlers.go b/internal/apirouter/topic_handlers.go similarity index 95% rename from internal/services/api/topic_handlers.go rename to internal/apirouter/topic_handlers.go index 996df67e..8af9e4ea 100644 --- a/internal/services/api/topic_handlers.go +++ b/internal/apirouter/topic_handlers.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "net/http" diff --git a/internal/services/api/validate.go b/internal/apirouter/validate.go similarity index 95% rename from internal/services/api/validate.go rename to internal/apirouter/validate.go index ec30a44e..a7d9a569 100644 --- a/internal/services/api/validate.go +++ b/internal/apirouter/validate.go @@ -1,4 +1,4 @@ -package api +package apirouter import ( "net/http" diff --git a/internal/app/app.go b/internal/app/app.go index 5879f20b..9e29ec08 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -6,7 +6,6 @@ import ( "os" "os/signal" "strings" - "sync" "syscall" "time" @@ -17,9 +16,7 @@ import ( "github.com/hookdeck/outpost/internal/migrator" "github.com/hookdeck/outpost/internal/otel" "github.com/hookdeck/outpost/internal/redis" - "github.com/hookdeck/outpost/internal/services/api" - "github.com/hookdeck/outpost/internal/services/delivery" - "github.com/hookdeck/outpost/internal/services/log" + "github.com/hookdeck/outpost/internal/services" "github.com/hookdeck/outpost/internal/telemetry" "go.uber.org/zap" ) @@ -106,14 +103,14 @@ func run(mainContext context.Context, cfg *config.Config) error { telemetry.Init(mainContext) telemetry.ApplicationStarted(mainContext, cfg.ToTelemetryApplicationInfo()) - // Set up cancellation context and waitgroup + // Set up cancellation context ctx, cancel := context.WithCancel(mainContext) + defer cancel() // Set up OpenTelemetry. if cfg.OpenTelemetry.ToConfig() != nil { otelShutdown, err := otel.SetupOTelSDK(ctx, cfg.OpenTelemetry.ToConfig()) if err != nil { - cancel() return err } // Handle shutdown properly so nothing leaks. @@ -122,98 +119,56 @@ func run(mainContext context.Context, cfg *config.Config) error { }() } - // Initialize waitgroup - // Once all services are done, we can exit. - // Each service will wait for the context to be cancelled before shutting down. - wg := &sync.WaitGroup{} - - // Construct services based on config - logger.Debug("constructing services") - services, err := constructServices( - ctx, - cfg, - wg, - logger, - telemetry, - ) + // Build services using ServiceBuilder + logger.Debug("building services") + builder := services.NewServiceBuilder(ctx, cfg, logger, telemetry) + + supervisor, err := builder.BuildWorkers() if err != nil { - logger.Error("service construction failed", zap.Error(err)) - cancel() + logger.Error("failed to build workers", zap.Error(err)) return err } - // Start services - logger.Info("starting services", zap.Int("count", len(services))) - for _, service := range services { - go service.Run(ctx) - } - // Handle sigterm and await termChan signal termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) - // Wait for either context cancellation or termination signal + // Run workers in goroutine + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Wait for either termination signal or worker failure + var exitErr error select { case <-termChan: - logger.Ctx(ctx).Info("shutdown signal received") - case <-ctx.Done(): - logger.Ctx(ctx).Info("context cancelled") + logger.Info("shutdown signal received") + cancel() // Cancel context to trigger graceful shutdown + err := <-errChan + // context.Canceled is expected during graceful shutdown + if err != nil && !errors.Is(err, context.Canceled) { + logger.Error("error during graceful shutdown", zap.Error(err)) + exitErr = err + } + case err := <-errChan: + // Workers exited unexpectedly + if err != nil { + logger.Error("workers exited unexpectedly", zap.Error(err)) + exitErr = err + } } telemetry.Flush() - // Handle shutdown - cancel() // Signal cancellation to context.Context - wg.Wait() // Block here until all workers are done - - logger.Ctx(ctx).Info("outpost shutdown complete") - - return nil -} - -type Service interface { - Run(ctx context.Context) error -} + // Run cleanup functions + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + builder.Cleanup(shutdownCtx) -func constructServices( - ctx context.Context, - cfg *config.Config, - wg *sync.WaitGroup, - logger *logging.Logger, - telemetry telemetry.Telemetry, -) ([]Service, error) { - serviceType := cfg.MustGetService() - services := []Service{} - - if serviceType == config.ServiceTypeAPI || serviceType == config.ServiceTypeAll { - logger.Debug("creating API service") - service, err := api.NewService(ctx, wg, cfg, logger, telemetry) - if err != nil { - logger.Error("API service creation failed", zap.Error(err)) - return nil, err - } - services = append(services, service) - } - if serviceType == config.ServiceTypeDelivery || serviceType == config.ServiceTypeAll { - logger.Debug("creating delivery service") - service, err := delivery.NewService(ctx, wg, cfg, logger, nil) - if err != nil { - logger.Error("delivery service creation failed", zap.Error(err)) - return nil, err - } - services = append(services, service) - } - if serviceType == config.ServiceTypeLog || serviceType == config.ServiceTypeAll { - logger.Debug("creating log service") - service, err := log.NewService(ctx, wg, cfg, logger, nil) - if err != nil { - logger.Error("log service creation failed", zap.Error(err)) - return nil, err - } - services = append(services, service) - } + logger.Info("outpost shutdown complete") - return services, nil + return exitErr } // runMigration handles database schema migrations with retry logic for lock conflicts. diff --git a/internal/services/api/api.go b/internal/services/api/api.go index ffc764e1..cae7847d 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/hookdeck/outpost/internal/apirouter" "github.com/hookdeck/outpost/internal/config" "github.com/hookdeck/outpost/internal/consumer" "github.com/hookdeck/outpost/internal/deliverymq" @@ -128,8 +129,8 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log idempotence.WithSuccessfulTTL(time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second), ) eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, cfg.Topics, publishIdempotence) - router := NewRouter( - RouterConfig{ + router := apirouter.NewRouter( + apirouter.RouterConfig{ ServiceName: cfg.OpenTelemetry.GetServiceName(), APIKey: cfg.APIKey, JWTSecret: cfg.APIJWTSecret, diff --git a/internal/services/builder.go b/internal/services/builder.go new file mode 100644 index 00000000..31a98961 --- /dev/null +++ b/internal/services/builder.go @@ -0,0 +1,623 @@ +package services + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/hookdeck/outpost/internal/alert" + apirouter "github.com/hookdeck/outpost/internal/apirouter" + "github.com/hookdeck/outpost/internal/config" + "github.com/hookdeck/outpost/internal/deliverymq" + "github.com/hookdeck/outpost/internal/destregistry" + destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" + "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/logmq" + "github.com/hookdeck/outpost/internal/logstore" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/publishmq" + "github.com/hookdeck/outpost/internal/redis" + "github.com/hookdeck/outpost/internal/scheduler" + "github.com/hookdeck/outpost/internal/telemetry" + "github.com/hookdeck/outpost/internal/worker" + "github.com/mikestefanello/batcher" + "go.uber.org/zap" +) + +// ServiceBuilder constructs workers based on service configuration. +type ServiceBuilder struct { + ctx context.Context + cfg *config.Config + logger *logging.Logger + telemetry telemetry.Telemetry + supervisor *worker.WorkerSupervisor + + // Track service instances for cleanup + services []*serviceInstance +} + +// serviceInstance represents a single service with its cleanup functions and common dependencies +type serviceInstance struct { + name string + cleanupFuncs []func(context.Context, *logging.LoggerWithCtx) + + // Common infrastructure + redisClient redis.Client + logStore logstore.LogStore + entityStore models.EntityStore + destRegistry destregistry.Registry + eventTracer eventtracer.EventTracer + deliveryMQ *deliverymq.DeliveryMQ + logMQ *logmq.LogMQ + retryScheduler scheduler.Scheduler + + // HTTP server and router + router http.Handler +} + +// NewServiceBuilder creates a new ServiceBuilder. +func NewServiceBuilder(ctx context.Context, cfg *config.Config, logger *logging.Logger, telemetry telemetry.Telemetry) *ServiceBuilder { + return &ServiceBuilder{ + ctx: ctx, + cfg: cfg, + logger: logger, + telemetry: telemetry, + supervisor: worker.NewWorkerSupervisor(logger), + services: []*serviceInstance{}, + } +} + +// BuildWorkers builds workers based on the configured service type and returns the supervisor. +func (b *ServiceBuilder) BuildWorkers() (*worker.WorkerSupervisor, error) { + serviceType := b.cfg.MustGetService() + b.logger.Debug("building workers for service type", zap.String("service_type", serviceType.String())) + + // Create base router with health check that all services will extend + b.logger.Debug("creating base router with health check") + baseRouter := NewBaseRouter(b.supervisor, b.cfg.GinMode) + + if serviceType == config.ServiceTypeAPI || serviceType == config.ServiceTypeAll { + if err := b.BuildAPIWorkers(baseRouter); err != nil { + b.logger.Error("failed to build API workers", zap.Error(err)) + return nil, err + } + } + if serviceType == config.ServiceTypeDelivery || serviceType == config.ServiceTypeAll { + if err := b.BuildDeliveryWorker(baseRouter); err != nil { + b.logger.Error("failed to build delivery worker", zap.Error(err)) + return nil, err + } + } + if serviceType == config.ServiceTypeLog || serviceType == config.ServiceTypeAll { + if err := b.BuildLogWorker(baseRouter); err != nil { + b.logger.Error("failed to build log worker", zap.Error(err)) + return nil, err + } + } + + // Create HTTP server with the base router + if err := b.createHTTPServer(baseRouter); err != nil { + b.logger.Error("failed to create HTTP server", zap.Error(err)) + return nil, err + } + + return b.supervisor, nil +} + +// createHTTPServer creates and registers the HTTP server worker with the given router +func (b *ServiceBuilder) createHTTPServer(router http.Handler) error { + // Create HTTP server + b.logger.Debug("creating HTTP server") + httpServer := &http.Server{ + Addr: fmt.Sprintf(":%d", b.cfg.APIPort), + Handler: router, + } + + // Register HTTP server worker + // Note: HTTP server shutdown is handled by HTTPServerWorker, not in cleanup functions + httpWorker := NewHTTPServerWorker(httpServer, b.logger) + b.supervisor.Register(httpWorker) + + b.logger.Info("HTTP server created", zap.String("addr", httpServer.Addr)) + return nil +} + +// Cleanup runs all registered cleanup functions for all services. +// Cleanup is performed in LIFO (last-in-first-out) order to ensure that +// resources created later (which may depend on earlier resources) are +// cleaned up before their dependencies. +func (b *ServiceBuilder) Cleanup(ctx context.Context) { + logger := b.logger.Ctx(ctx) + // Clean up services in reverse order (LIFO) + for i := len(b.services) - 1; i >= 0; i-- { + svc := b.services[i] + logger.Debug("cleaning up service", zap.String("service", svc.name)) + // Clean up functions within each service in reverse order + for j := len(svc.cleanupFuncs) - 1; j >= 0; j-- { + svc.cleanupFuncs[j](ctx, &logger) + } + } +} + +// BuildAPIWorkers creates the API router and registers workers for the API service. +// This sets up the infrastructure, creates the API router, and registers workers: +// 1. Retry scheduler +// 2. PublishMQ consumer (optional) +// The baseRouter parameter is extended with API routes (apirouter already has health check) +func (b *ServiceBuilder) BuildAPIWorkers(baseRouter *gin.Engine) error { + b.logger.Debug("building API service workers") + + // Create a new service instance for API + svc := &serviceInstance{ + name: "api", + cleanupFuncs: []func(context.Context, *logging.LoggerWithCtx){}, + } + b.services = append(b.services, svc) + + // Initialize common infrastructure + if err := svc.initDestRegistry(b.cfg, b.logger); err != nil { + return err + } + if err := svc.initDeliveryMQ(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initRedis(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initLogStore(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initEventTracer(b.cfg, b.logger); err != nil { + return err + } + if err := svc.initEntityStore(b.cfg, b.logger); err != nil { + return err + } + + // Initialize retry scheduler + if err := svc.initRetryScheduler(b.ctx, b.cfg, b.logger); err != nil { + return err + } + + // Initialize event handler and create API router + b.logger.Debug("creating event handler and API router") + publishIdempotence := idempotence.New(svc.redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(b.cfg.PublishIdempotencyKeyTTL)*time.Second), + ) + eventHandler := publishmq.NewEventHandler(b.logger, svc.deliveryMQ, svc.entityStore, svc.eventTracer, b.cfg.Topics, publishIdempotence) + + // Create API router as separate handler + apiHandler := apirouter.NewRouter( + apirouter.RouterConfig{ + ServiceName: b.cfg.OpenTelemetry.GetServiceName(), + APIKey: b.cfg.APIKey, + JWTSecret: b.cfg.APIJWTSecret, + Topics: b.cfg.Topics, + Registry: svc.destRegistry, + PortalConfig: b.cfg.GetPortalConfig(), + GinMode: b.cfg.GinMode, + }, + b.logger, + svc.redisClient, + svc.deliveryMQ, + svc.entityStore, + svc.logStore, + eventHandler, + b.telemetry, + ) + + // Mount API handler onto base router (everything except /healthz goes to apiHandler) + baseRouter.NoRoute(gin.WrapH(apiHandler)) + + svc.router = baseRouter + + // Worker 1: RetryMQ Consumer + retryWorker := NewRetryMQWorker(svc.retryScheduler, b.logger) + b.supervisor.Register(retryWorker) + + // Worker 2: PublishMQ Consumer (optional) + if b.cfg.PublishMQ.GetQueueConfig() != nil { + publishMQ := publishmq.New(publishmq.WithQueue(b.cfg.PublishMQ.GetQueueConfig())) + messageHandler := publishmq.NewMessageHandler(eventHandler) + publishMQWorker := NewConsumerWorker( + "publishmq-consumer", + publishMQ.Subscribe, + messageHandler, + b.cfg.PublishMaxConcurrency, + b.logger, + ) + b.supervisor.Register(publishMQWorker) + } + + b.logger.Info("API service workers built successfully") + return nil +} + +// BuildDeliveryWorker creates and registers the delivery worker. +func (b *ServiceBuilder) BuildDeliveryWorker(baseRouter *gin.Engine) error { + b.logger.Debug("building delivery service worker") + + // Create a new service instance for Delivery + svc := &serviceInstance{ + name: "delivery", + cleanupFuncs: []func(context.Context, *logging.LoggerWithCtx){}, + } + b.services = append(b.services, svc) + + // Initialize common infrastructure + if err := svc.initRedis(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initLogMQ(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initDeliveryMQ(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initDestRegistry(b.cfg, b.logger); err != nil { + return err + } + if err := svc.initEventTracer(b.cfg, b.logger); err != nil { + return err + } + if err := svc.initEntityStore(b.cfg, b.logger); err != nil { + return err + } + if err := svc.initLogStore(b.ctx, b.cfg, b.logger); err != nil { + return err + } + if err := svc.initRetryScheduler(b.ctx, b.cfg, b.logger); err != nil { + return err + } + + // Initialize alert monitor + var alertNotifier alert.AlertNotifier + var destinationDisabler alert.DestinationDisabler + if b.cfg.Alert.CallbackURL != "" { + alertNotifier = alert.NewHTTPAlertNotifier(b.cfg.Alert.CallbackURL, alert.NotifierWithBearerToken(b.cfg.APIKey)) + } + if b.cfg.Alert.AutoDisableDestination { + destinationDisabler = newDestinationDisabler(svc.entityStore) + } + alertMonitor := alert.NewAlertMonitor( + b.logger, + svc.redisClient, + alert.WithNotifier(alertNotifier), + alert.WithDisabler(destinationDisabler), + alert.WithAutoDisableFailureCount(b.cfg.Alert.ConsecutiveFailureCount), + alert.WithDeploymentID(b.cfg.DeploymentID), + ) + + // Initialize delivery idempotence + deliveryIdempotence := idempotence.New(svc.redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(b.cfg.DeliveryIdempotencyKeyTTL)*time.Second), + ) + + // Get retry configuration + retryBackoff, retryMaxLimit := b.cfg.GetRetryBackoff() + + // Create delivery handler + handler := deliverymq.NewMessageHandler( + b.logger, + svc.logMQ, + svc.entityStore, + svc.logStore, + svc.destRegistry, + svc.eventTracer, + svc.retryScheduler, + retryBackoff, + retryMaxLimit, + alertMonitor, + deliveryIdempotence, + ) + + // Store reference to the base router + svc.router = baseRouter + + // Create DeliveryMQ worker + deliveryWorker := NewConsumerWorker( + "deliverymq-consumer", + svc.deliveryMQ.Subscribe, + handler, + b.cfg.DeliveryMaxConcurrency, + b.logger, + ) + b.supervisor.Register(deliveryWorker) + + b.logger.Info("delivery service worker built successfully") + return nil +} + +// BuildLogWorker creates and registers the log worker. +func (b *ServiceBuilder) BuildLogWorker(baseRouter *gin.Engine) error { + b.logger.Debug("building log service worker") + + // Create a new service instance for Log + svc := &serviceInstance{ + name: "log", + cleanupFuncs: []func(context.Context, *logging.LoggerWithCtx){}, + } + b.services = append(b.services, svc) + + // Initialize common infrastructure + if err := svc.initLogStore(b.ctx, b.cfg, b.logger); err != nil { + return err + } + + // Create batcher for batching log writes + batcherCfg := struct { + ItemCountThreshold int + DelayThreshold time.Duration + }{ + ItemCountThreshold: b.cfg.LogBatchSize, + DelayThreshold: time.Duration(b.cfg.LogBatchThresholdSeconds) * time.Second, + } + + b.logger.Debug("creating log batcher") + batcher, err := b.makeBatcher(svc.logStore, batcherCfg.ItemCountThreshold, batcherCfg.DelayThreshold) + if err != nil { + b.logger.Error("failed to create batcher", zap.Error(err)) + return err + } + svc.cleanupFuncs = append(svc.cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { + batcher.Shutdown() + }) + + // Create log handler with batcher + handler := logmq.NewMessageHandler(b.logger, &handlerBatcherImpl{batcher: batcher}) + + // Initialize LogMQ + b.logger.Debug("configuring log message queue") + logQueueConfig, err := b.cfg.MQs.ToQueueConfig(b.ctx, "logmq") + if err != nil { + b.logger.Error("log queue configuration failed", zap.Error(err)) + return err + } + + logMQ := logmq.New(logmq.WithQueue(logQueueConfig)) + + // Store reference to the base router + svc.router = baseRouter + + // Create LogMQ worker + logWorker := NewConsumerWorker( + "logmq-consumer", + logMQ.Subscribe, + handler, + b.cfg.DeliveryMaxConcurrency, + b.logger, + ) + b.supervisor.Register(logWorker) + + b.logger.Info("log service worker built successfully") + return nil +} + +// destinationDisabler implements alert.DestinationDisabler +type destinationDisabler struct { + entityStore models.EntityStore +} + +func newDestinationDisabler(entityStore models.EntityStore) alert.DestinationDisabler { + return &destinationDisabler{ + entityStore: entityStore, + } +} + +func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { + destination, err := d.entityStore.RetrieveDestination(ctx, tenantID, destinationID) + if err != nil { + return err + } + if destination == nil { + return nil + } + now := time.Now() + destination.DisabledAt = &now + return d.entityStore.UpsertDestination(ctx, *destination) +} + +// makeBatcher creates a batcher for batching log writes +func (b *ServiceBuilder) makeBatcher(logStore logstore.LogStore, itemCountThreshold int, delayThreshold time.Duration) (*batcher.Batcher[*mqs.Message], error) { + batchr, err := batcher.NewBatcher(batcher.Config[*mqs.Message]{ + GroupCountThreshold: 2, + ItemCountThreshold: itemCountThreshold, + DelayThreshold: delayThreshold, + NumGoroutines: 1, + Processor: func(_ string, msgs []*mqs.Message) { + logger := b.logger.Ctx(b.ctx) + logger.Info("processing batch", zap.Int("message_count", len(msgs))) + + nackAll := func() { + for _, msg := range msgs { + msg.Nack() + } + } + + deliveryEvents := make([]*models.DeliveryEvent, 0, len(msgs)) + for _, msg := range msgs { + deliveryEvent := models.DeliveryEvent{} + if err := deliveryEvent.FromMessage(msg); err != nil { + logger.Error("failed to parse delivery event", + zap.Error(err), + zap.String("message_id", msg.LoggableID)) + nackAll() + return + } + deliveryEvents = append(deliveryEvents, &deliveryEvent) + } + + if err := logStore.InsertManyDeliveryEvent(b.ctx, deliveryEvents); err != nil { + logger.Error("failed to insert delivery events", + zap.Error(err), + zap.Int("count", len(deliveryEvents))) + nackAll() + return + } + + logger.Info("batch processed successfully", zap.Int("count", len(msgs))) + + for _, msg := range msgs { + msg.Ack() + } + }, + }) + if err != nil { + b.logger.Ctx(b.ctx).Error("failed to create batcher", zap.Error(err)) + return nil, err + } + return batchr, nil +} + +// handlerBatcherImpl implements the batcher interface expected by logmq.MessageHandler +type handlerBatcherImpl struct { + batcher *batcher.Batcher[*mqs.Message] +} + +func (hb *handlerBatcherImpl) Add(ctx context.Context, msg *mqs.Message) error { + hb.batcher.Add("", msg) + return nil +} + +// Helper methods for serviceInstance to initialize common dependencies + +func (s *serviceInstance) initRedis(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + logger.Debug("initializing Redis client", zap.String("service", s.name)) + redisClient, err := redis.New(ctx, cfg.Redis.ToConfig()) + if err != nil { + logger.Error("Redis client initialization failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.redisClient = redisClient + return nil +} + +func (s *serviceInstance) initLogStore(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + logger.Debug("configuring log store driver", zap.String("service", s.name)) + logStoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ + Postgres: &cfg.PostgresURL, + }) + if err != nil { + logger.Error("log store driver configuration failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.cleanupFuncs = append(s.cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { + logStoreDriverOpts.Close() + }) + + logger.Debug("creating log store", zap.String("service", s.name)) + logStore, err := logstore.NewLogStore(ctx, logStoreDriverOpts) + if err != nil { + logger.Error("log store creation failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.logStore = logStore + return nil +} + +func (s *serviceInstance) initEntityStore(cfg *config.Config, logger *logging.Logger) error { + if s.redisClient == nil { + return fmt.Errorf("redis client must be initialized before entity store") + } + logger.Debug("creating entity store", zap.String("service", s.name)) + s.entityStore = models.NewEntityStore(s.redisClient, + models.WithCipher(models.NewAESCipher(cfg.AESEncryptionSecret)), + models.WithAvailableTopics(cfg.Topics), + models.WithMaxDestinationsPerTenant(cfg.MaxDestinationsPerTenant), + models.WithDeploymentID(cfg.DeploymentID), + ) + return nil +} + +func (s *serviceInstance) initDestRegistry(cfg *config.Config, logger *logging.Logger) error { + logger.Debug("initializing destination registry", zap.String("service", s.name)) + registry := destregistry.NewRegistry(&destregistry.Config{ + DestinationMetadataPath: cfg.Destinations.MetadataPath, + DeliveryTimeout: time.Duration(cfg.DeliveryTimeoutSeconds) * time.Second, + }, logger) + if err := destregistrydefault.RegisterDefault(registry, cfg.Destinations.ToConfig(cfg)); err != nil { + logger.Error("destination registry setup failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.destRegistry = registry + return nil +} + +func (s *serviceInstance) initEventTracer(cfg *config.Config, logger *logging.Logger) error { + logger.Debug("setting up event tracer", zap.String("service", s.name)) + if cfg.OpenTelemetry.ToConfig() == nil { + s.eventTracer = eventtracer.NewNoopEventTracer() + } else { + s.eventTracer = eventtracer.NewEventTracer() + } + return nil +} + +func (s *serviceInstance) initDeliveryMQ(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + logger.Debug("configuring delivery message queue", zap.String("service", s.name)) + deliveryQueueConfig, err := cfg.MQs.ToQueueConfig(ctx, "deliverymq") + if err != nil { + logger.Error("delivery queue configuration failed", zap.String("service", s.name), zap.Error(err)) + return err + } + + logger.Debug("initializing delivery MQ connection", zap.String("service", s.name), zap.String("mq_type", cfg.MQs.GetInfraType())) + deliveryMQ := deliverymq.New(deliverymq.WithQueue(deliveryQueueConfig)) + cleanupDeliveryMQ, err := deliveryMQ.Init(ctx) + if err != nil { + logger.Error("delivery MQ initialization failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.cleanupFuncs = append(s.cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { cleanupDeliveryMQ() }) + s.deliveryMQ = deliveryMQ + return nil +} + +func (s *serviceInstance) initLogMQ(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + logger.Debug("configuring log message queue", zap.String("service", s.name)) + logQueueConfig, err := cfg.MQs.ToQueueConfig(ctx, "logmq") + if err != nil { + logger.Error("log queue configuration failed", zap.String("service", s.name), zap.Error(err)) + return err + } + + logger.Debug("initializing log MQ connection", zap.String("service", s.name), zap.String("mq_type", cfg.MQs.GetInfraType())) + logMQ := logmq.New(logmq.WithQueue(logQueueConfig)) + cleanupLogMQ, err := logMQ.Init(ctx) + if err != nil { + logger.Error("log MQ initialization failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.cleanupFuncs = append(s.cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { cleanupLogMQ() }) + s.logMQ = logMQ + return nil +} + +func (s *serviceInstance) initRetryScheduler(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { + if s.deliveryMQ == nil { + return fmt.Errorf("delivery MQ must be initialized before retry scheduler") + } + logger.Debug("creating delivery MQ retry scheduler", zap.String("service", s.name)) + retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + if err != nil { + logger.Error("failed to create delivery MQ retry scheduler", zap.String("service", s.name), zap.Error(err)) + return err + } + logger.Debug("initializing delivery MQ retry scheduler", zap.String("service", s.name)) + if err := retryScheduler.Init(ctx); err != nil { + logger.Error("delivery MQ retry scheduler initialization failed", zap.String("service", s.name), zap.Error(err)) + return err + } + s.cleanupFuncs = append(s.cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { + retryScheduler.Shutdown() + }) + s.retryScheduler = retryScheduler + return nil +} diff --git a/internal/services/consumer_worker.go b/internal/services/consumer_worker.go new file mode 100644 index 00000000..6ff3a913 --- /dev/null +++ b/internal/services/consumer_worker.go @@ -0,0 +1,71 @@ +package services + +import ( + "context" + "errors" + + "github.com/hookdeck/outpost/internal/consumer" + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/worker" + "go.uber.org/zap" +) + +// ConsumerWorker is a generic worker that wraps a message queue consumer. +// It handles subscription at runtime and consistent error handling for graceful shutdowns. +type ConsumerWorker struct { + name string + subscribe func(ctx context.Context) (mqs.Subscription, error) + handler consumer.MessageHandler + concurrency int + logger *logging.Logger +} + +// NewConsumerWorker creates a new generic consumer worker. +func NewConsumerWorker( + name string, + subscribe func(ctx context.Context) (mqs.Subscription, error), + handler consumer.MessageHandler, + concurrency int, + logger *logging.Logger, +) worker.Worker { + return &ConsumerWorker{ + name: name, + subscribe: subscribe, + handler: handler, + concurrency: concurrency, + logger: logger, + } +} + +// Name returns the worker name. +func (w *ConsumerWorker) Name() string { + return w.name +} + +// Run starts the consumer and blocks until context is cancelled or it fails. +func (w *ConsumerWorker) Run(ctx context.Context) error { + logger := w.logger.Ctx(ctx) + logger.Info("consumer worker starting", zap.String("name", w.name)) + + subscription, err := w.subscribe(ctx) + if err != nil { + logger.Error("error subscribing", zap.String("name", w.name), zap.Error(err)) + return err + } + + csm := consumer.New(subscription, w.handler, + consumer.WithName(w.name), + consumer.WithConcurrency(w.concurrency), + ) + + if err := csm.Run(ctx); err != nil { + // Only report as failure if it's not a graceful shutdown + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + logger.Error("error running consumer", zap.String("name", w.name), zap.Error(err)) + return err + } + } + + return nil +} diff --git a/internal/services/health.go b/internal/services/health.go new file mode 100644 index 00000000..84daf751 --- /dev/null +++ b/internal/services/health.go @@ -0,0 +1,39 @@ +package services + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/hookdeck/outpost/internal/worker" +) + +// HealthHandler creates a health check handler that reports worker supervisor health +func HealthHandler(supervisor *worker.WorkerSupervisor) gin.HandlerFunc { + return func(c *gin.Context) { + tracker := supervisor.GetHealthTracker() + status := tracker.GetStatus() + if tracker.IsHealthy() { + c.JSON(http.StatusOK, status) + } else { + c.JSON(http.StatusServiceUnavailable, status) + } + } +} + +// NewBaseRouter creates a base router with health check endpoint +// This is used by all services to expose /healthz +// +// TODO: Rethink API versioning strategy in the future. +// For now, we expose health check at both /healthz and /api/v1/healthz for backwards compatibility. +// The /api/v1 prefix is hardcoded here but should be part of a broader versioning approach. +func NewBaseRouter(supervisor *worker.WorkerSupervisor, ginMode string) *gin.Engine { + gin.SetMode(ginMode) + r := gin.New() + r.Use(gin.Recovery()) + + healthHandler := HealthHandler(supervisor) + r.GET("/healthz", healthHandler) + r.GET("/api/v1/healthz", healthHandler) + + return r +} diff --git a/internal/services/http_worker.go b/internal/services/http_worker.go new file mode 100644 index 00000000..b34297d6 --- /dev/null +++ b/internal/services/http_worker.go @@ -0,0 +1,64 @@ +package services + +import ( + "context" + "net/http" + "time" + + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/worker" + "go.uber.org/zap" +) + +// HTTPServerWorker wraps an HTTP server as a worker. +type HTTPServerWorker struct { + server *http.Server + logger *logging.Logger +} + +// NewHTTPServerWorker creates a new HTTP server worker. +func NewHTTPServerWorker(server *http.Server, logger *logging.Logger) worker.Worker { + return &HTTPServerWorker{ + server: server, + logger: logger, + } +} + +// Name returns the worker name. +func (w *HTTPServerWorker) Name() string { + return "http-server" +} + +// Run starts the HTTP server and blocks until context is cancelled or server fails. +func (w *HTTPServerWorker) Run(ctx context.Context) error { + logger := w.logger.Ctx(ctx) + logger.Info("http server listening", zap.String("addr", w.server.Addr)) + + // Start server in goroutine + errChan := make(chan error, 1) + go func() { + if err := w.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errChan <- err + } + }() + + // Wait for context cancellation or server error + select { + case <-ctx.Done(): + // Graceful shutdown + logger.Info("shutting down http server") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := w.server.Shutdown(shutdownCtx); err != nil { + logger.Error("error shutting down http server", zap.Error(err)) + return err + } + logger.Info("http server shut down") + return nil + + case err := <-errChan: + logger.Error("http server error", zap.Error(err)) + return err + } +} diff --git a/internal/services/retrymq_worker.go b/internal/services/retrymq_worker.go new file mode 100644 index 00000000..010e61a5 --- /dev/null +++ b/internal/services/retrymq_worker.go @@ -0,0 +1,42 @@ +package services + +import ( + "context" + + "github.com/hookdeck/outpost/internal/logging" + "github.com/hookdeck/outpost/internal/scheduler" + "github.com/hookdeck/outpost/internal/worker" + "go.uber.org/zap" +) + +// RetryMQWorker wraps a retry scheduler as a worker. +type RetryMQWorker struct { + scheduler scheduler.Scheduler + logger *logging.Logger +} + +// NewRetryMQWorker creates a new retry scheduler worker. +func NewRetryMQWorker(scheduler scheduler.Scheduler, logger *logging.Logger) worker.Worker { + return &RetryMQWorker{ + scheduler: scheduler, + logger: logger, + } +} + +// Name returns the worker name. +func (w *RetryMQWorker) Name() string { + return "retrymq-consumer" +} + +// Run starts the retry scheduler monitor and blocks until context is cancelled or it fails. +func (w *RetryMQWorker) Run(ctx context.Context) error { + logger := w.logger.Ctx(ctx) + logger.Info("retry scheduler monitor running") + + if err := w.scheduler.Monitor(ctx); err != nil { + logger.Error("retry scheduler monitor error", zap.Error(err)) + return err + } + + return nil +} diff --git a/internal/worker/health.go b/internal/worker/health.go new file mode 100644 index 00000000..24964557 --- /dev/null +++ b/internal/worker/health.go @@ -0,0 +1,97 @@ +package worker + +import ( + "sync" + "time" +) + +const ( + WorkerStatusHealthy = "healthy" + WorkerStatusFailed = "failed" +) + +// WorkerHealth represents the health status of a single worker. +// Error details are NOT exposed for security reasons. +type WorkerHealth struct { + Status string `json:"status"` // "healthy" or "failed" +} + +// HealthTracker tracks the health status of all workers. +// It is safe for concurrent use. +type HealthTracker struct { + mu sync.RWMutex + workers map[string]WorkerHealth +} + +// NewHealthTracker creates a new HealthTracker. +func NewHealthTracker() *HealthTracker { + return &HealthTracker{ + workers: make(map[string]WorkerHealth), + } +} + +// MarkHealthy marks a worker as healthy. +func (h *HealthTracker) MarkHealthy(name string) { + h.mu.Lock() + defer h.mu.Unlock() + + h.workers[name] = WorkerHealth{ + Status: WorkerStatusHealthy, + } +} + +// MarkFailed marks a worker as failed. +// Note: Error details are NOT stored for security reasons. +func (h *HealthTracker) MarkFailed(name string) { + h.mu.Lock() + defer h.mu.Unlock() + + h.workers[name] = WorkerHealth{ + Status: WorkerStatusFailed, + } +} + +// IsHealthy returns true if all workers are healthy. +func (h *HealthTracker) IsHealthy() bool { + h.mu.RLock() + defer h.mu.RUnlock() + + for _, w := range h.workers { + if w.Status != WorkerStatusHealthy { + return false + } + } + return true +} + +// GetStatus returns the overall health status with details of all workers. +func (h *HealthTracker) GetStatus() map[string]interface{} { + h.mu.RLock() + defer h.mu.RUnlock() + + workers := make(map[string]WorkerHealth) + for name, w := range h.workers { + workers[name] = w + } + + status := "healthy" + if !h.isHealthyLocked() { + status = "failed" + } + + return map[string]interface{}{ + "status": status, + "timestamp": time.Now(), + "workers": workers, + } +} + +// isHealthyLocked checks health without acquiring lock (caller must hold read lock) +func (h *HealthTracker) isHealthyLocked() bool { + for _, w := range h.workers { + if w.Status != WorkerStatusHealthy { + return false + } + } + return true +} diff --git a/internal/worker/logger_test.go b/internal/worker/logger_test.go new file mode 100644 index 00000000..6dd0f2f4 --- /dev/null +++ b/internal/worker/logger_test.go @@ -0,0 +1,23 @@ +package worker_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/hookdeck/outpost/internal/worker" +) + +// TestLoggingLoggerImplementsInterface verifies that *logging.Logger +// from internal/logging satisfies the worker.Logger interface. +func TestLoggingLoggerImplementsInterface(t *testing.T) { + logger := testutil.CreateTestLogger(t) + + // This will fail to compile if *logging.Logger doesn't implement worker.Logger + var _ worker.Logger = logger + + // Also verify we can actually use it with WorkerSupervisor + supervisor := worker.NewWorkerSupervisor(logger) + if supervisor == nil { + t.Fatal("expected non-nil supervisor") + } +} diff --git a/internal/worker/supervisor.go b/internal/worker/supervisor.go new file mode 100644 index 00000000..9be4aab1 --- /dev/null +++ b/internal/worker/supervisor.go @@ -0,0 +1,164 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "go.uber.org/zap" +) + +// Logger is a minimal logging interface for structured logging with zap. +type Logger interface { + Info(msg string, fields ...zap.Field) + Error(msg string, fields ...zap.Field) + Debug(msg string, fields ...zap.Field) + Warn(msg string, fields ...zap.Field) +} + +// WorkerSupervisor manages and supervises multiple workers. +// It tracks their health and handles graceful shutdown. +type WorkerSupervisor struct { + workers map[string]Worker + health *HealthTracker + logger Logger + shutdownTimeout time.Duration // 0 means no timeout +} + +// SupervisorOption configures a WorkerSupervisor. +type SupervisorOption func(*WorkerSupervisor) + +// WithShutdownTimeout sets the maximum time to wait for workers to shutdown gracefully. +// After this timeout, Run() will return even if workers haven't finished. +// Default is 0 (no timeout - wait indefinitely). +func WithShutdownTimeout(timeout time.Duration) SupervisorOption { + return func(r *WorkerSupervisor) { + r.shutdownTimeout = timeout + } +} + +// NewWorkerSupervisor creates a new WorkerSupervisor. +func NewWorkerSupervisor(logger Logger, opts ...SupervisorOption) *WorkerSupervisor { + r := &WorkerSupervisor{ + workers: make(map[string]Worker), + health: NewHealthTracker(), + logger: logger, + shutdownTimeout: 0, // Default: no timeout + } + + for _, opt := range opts { + opt(r) + } + + return r +} + +// Register adds a worker to the supervisor. +// Panics if a worker with the same name is already registered. +func (r *WorkerSupervisor) Register(w Worker) { + if _, exists := r.workers[w.Name()]; exists { + panic(fmt.Sprintf("worker %s already registered", w.Name())) + } + r.workers[w.Name()] = w + r.logger.Info("worker registered", zap.String("worker", w.Name())) +} + +// GetHealthTracker returns the health tracker for this supervisor. +func (r *WorkerSupervisor) GetHealthTracker() *HealthTracker { + return r.health +} + +// Run starts all registered workers and supervises them. +// It blocks until: +// - ALL workers have exited (either successfully or with errors), OR +// - The context is cancelled (SIGTERM/SIGINT) +// +// When a worker fails, it marks the worker as failed but does NOT +// terminate other workers. This allows: +// - Other workers to continue serving (e.g., HTTP server stays up) +// - Health checks to report the failed worker status +// - Orchestrator to detect failure and restart the pod/container +// +// Returns nil if context was cancelled and workers shutdown gracefully. +// Returns error if workers failed to shutdown within timeout (if configured). +func (r *WorkerSupervisor) Run(ctx context.Context) error { + if len(r.workers) == 0 { + r.logger.Warn("no workers registered") + return fmt.Errorf("no workers registered") + } + + r.logger.Info("starting workers", zap.Int("count", len(r.workers))) + + // WaitGroup to track worker goroutines + var wg sync.WaitGroup + + // Start all workers + for name, worker := range r.workers { + wg.Add(1) + go func(name string, w Worker) { + defer wg.Done() + + r.logger.Debug("worker starting", zap.String("worker", name)) + r.health.MarkHealthy(name) + + // Run the worker + if err := w.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { + r.logger.Error("worker failed", + zap.String("worker", name), + zap.Error(err)) + r.health.MarkFailed(name) + } else { + r.logger.Info("worker stopped gracefully", zap.String("worker", name)) + } + }(name, worker) + } + + // Wait for either: + // - All workers to exit (wg.Wait completes) + // - Context cancellation (graceful shutdown requested) + select { + case <-ctx.Done(): + r.logger.Info("context cancelled, shutting down workers") + + // Wait for all workers to finish gracefully, with optional timeout + if r.shutdownTimeout > 0 { + return r.waitWithTimeout(&wg, r.shutdownTimeout) + } + + // No timeout - wait indefinitely + wg.Wait() + return ctx.Err() + case <-r.waitForWorkers(&wg): + // All workers exited (either successfully or with errors) + r.logger.Warn("all workers have exited") + return fmt.Errorf("all workers have exited unexpectedly") + } +} + +// waitForWorkers converts WaitGroup.Wait() into a channel that can be used in select. +// Returns a channel that closes when all workers have exited. +func (r *WorkerSupervisor) waitForWorkers(wg *sync.WaitGroup) <-chan struct{} { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return done +} + +// waitWithTimeout waits for the WaitGroup with a timeout. +// Returns nil if all workers finish within timeout. +// Returns error if timeout is exceeded. +func (r *WorkerSupervisor) waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error { + select { + case <-r.waitForWorkers(wg): + r.logger.Info("all workers shutdown gracefully") + return nil + case <-time.After(timeout): + r.logger.Warn("shutdown timeout exceeded, some workers may still be running", + zap.Duration("timeout", timeout)) + return fmt.Errorf("shutdown timeout exceeded (%v)", timeout) + } +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 00000000..0e2a955a --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,20 @@ +package worker + +import "context" + +// Worker represents a long-running background process. +// Each worker runs in its own goroutine and can be monitored for health. +// +// Workers should: +// - Block in Run() until context is cancelled or a fatal error occurs +// - Return nil or context.Canceled for graceful shutdown +// - Return non-nil error only for fatal failures +type Worker interface { + // Name returns a unique identifier for this worker (e.g., "http-server", "retrymq-consumer") + Name() string + + // Run executes the worker and blocks until context is cancelled or error occurs. + // Returns nil or context.Canceled for graceful shutdown. + // Returns error for fatal failures. + Run(ctx context.Context) error +} diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go new file mode 100644 index 00000000..eab0dc71 --- /dev/null +++ b/internal/worker/worker_test.go @@ -0,0 +1,656 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// Mock worker for testing +type mockWorker struct { + name string + runFunc func(ctx context.Context) error + mu sync.Mutex + started bool +} + +func newMockWorker(name string, runFunc func(ctx context.Context) error) *mockWorker { + return &mockWorker{ + name: name, + runFunc: runFunc, + } +} + +func (m *mockWorker) Name() string { + return m.name +} + +func (m *mockWorker) Run(ctx context.Context) error { + m.mu.Lock() + m.started = true + m.mu.Unlock() + + if m.runFunc != nil { + return m.runFunc(ctx) + } + <-ctx.Done() + return nil +} + +func (m *mockWorker) WasStarted() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.started +} + +// Mock logger for testing +type mockLogger struct { + mu sync.Mutex + messages []string +} + +func newMockLogger() *mockLogger { + return &mockLogger{ + messages: []string{}, + } +} + +func (l *mockLogger) log(level, msg string, fields ...zap.Field) { + l.mu.Lock() + defer l.mu.Unlock() + l.messages = append(l.messages, fmt.Sprintf("[%s] %s", level, msg)) +} + +func (l *mockLogger) Info(msg string, fields ...zap.Field) { + l.log("INFO", msg, fields...) +} + +func (l *mockLogger) Error(msg string, fields ...zap.Field) { + l.log("ERROR", msg, fields...) +} + +func (l *mockLogger) Debug(msg string, fields ...zap.Field) { + l.log("DEBUG", msg, fields...) +} + +func (l *mockLogger) Warn(msg string, fields ...zap.Field) { + l.log("WARN", msg, fields...) +} + +func (l *mockLogger) Contains(substr string) bool { + l.mu.Lock() + defer l.mu.Unlock() + for _, msg := range l.messages { + if contains(msg, substr) { + return true + } + } + return false +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || s[1:len(s)-1] != s[1:len(s)-1] && contains(s[1:], substr))) +} + +// Tests + +func TestHealthTracker_MarkHealthy(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + tracker.MarkHealthy("worker-1") + + status := tracker.GetStatus() + assert.Equal(t, "healthy", status["status"]) + + workers := status["workers"].(map[string]WorkerHealth) + assert.Len(t, workers, 1) + assert.Equal(t, WorkerStatusHealthy, workers["worker-1"].Status) +} + +func TestHealthTracker_MarkFailed(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + tracker.MarkFailed("worker-1") + + status := tracker.GetStatus() + assert.Equal(t, "failed", status["status"]) + + workers := status["workers"].(map[string]WorkerHealth) + assert.Len(t, workers, 1) + assert.Equal(t, WorkerStatusFailed, workers["worker-1"].Status) +} + +func TestHealthTracker_IsHealthy_AllHealthy(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + tracker.MarkHealthy("worker-1") + tracker.MarkHealthy("worker-2") + + assert.True(t, tracker.IsHealthy()) +} + +func TestHealthTracker_IsHealthy_OneFailed(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + tracker.MarkHealthy("worker-1") + tracker.MarkFailed("worker-2") + + assert.False(t, tracker.IsHealthy()) +} + +func TestHealthTracker_NoErrorExposed(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + tracker.MarkFailed("worker-1") + + status := tracker.GetStatus() + workers := status["workers"].(map[string]WorkerHealth) + + // Verify that error details are NOT exposed + health := workers["worker-1"] + assert.Equal(t, WorkerStatusFailed, health.Status) + // Verify WorkerHealth struct has no Error field (compile-time check via struct) + // If Error field existed, this would have compile error + _ = WorkerHealth{ + Status: "healthy", + } +} + +func TestWorkerSupervisor_RegisterWorker(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + worker := newMockWorker("test-worker", nil) + supervisor.Register(worker) + + assert.Len(t, supervisor.workers, 1) + assert.True(t, logger.Contains("worker registered")) +} + +func TestWorkerSupervisor_RegisterDuplicateWorker(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + worker1 := newMockWorker("test-worker", nil) + worker2 := newMockWorker("test-worker", nil) + + supervisor.Register(worker1) + + assert.Panics(t, func() { + supervisor.Register(worker2) + }) +} + +func TestWorkerSupervisor_Run_HealthyWorkers(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + worker1 := newMockWorker("worker-1", func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + worker2 := newMockWorker("worker-2", func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + + supervisor.Register(worker1) + supervisor.Register(worker2) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give workers time to start + time.Sleep(50 * time.Millisecond) + + // Verify workers started + assert.True(t, worker1.WasStarted()) + assert.True(t, worker2.WasStarted()) + + // Verify health while workers are running + tracker := supervisor.GetHealthTracker() + assert.True(t, tracker.IsHealthy(), "all workers should be healthy while running") + + status := tracker.GetStatus() + assert.Equal(t, "healthy", status["status"]) + + workers := status["workers"].(map[string]WorkerHealth) + assert.Len(t, workers, 2, "should have 2 workers in health status") + assert.Equal(t, WorkerStatusHealthy, workers["worker-1"].Status, "worker-1 should be healthy") + assert.Equal(t, WorkerStatusHealthy, workers["worker-2"].Status, "worker-2 should be healthy") + assert.NotZero(t, status["timestamp"], "should have timestamp field") + + // Cancel context and verify graceful shutdown + cancel() + + err := <-errChan + assert.ErrorIs(t, err, context.Canceled) +} + +func TestWorkerSupervisor_Run_FailedWorker(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + healthyWorker := newMockWorker("healthy", func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + + failingWorker := newMockWorker("failing", func(ctx context.Context) error { + time.Sleep(50 * time.Millisecond) + return errors.New("worker failed") + }) + + supervisor.Register(healthyWorker) + supervisor.Register(failingWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Wait for failing worker to fail + time.Sleep(100 * time.Millisecond) + + // Verify health reflects failure while supervisor is still running + assert.False(t, supervisor.GetHealthTracker().IsHealthy()) + + status := supervisor.GetHealthTracker().GetStatus() + assert.Equal(t, "failed", status["status"]) + + workers := status["workers"].(map[string]WorkerHealth) + assert.Equal(t, WorkerStatusFailed, workers["failing"].Status) + + // Verify that supervisor is still blocking (hasn't returned yet) + select { + case <-errChan: + t.Fatal("supervisor.Run() returned early - should keep running until context cancelled") + default: + // Good - still blocking + } + + // Now cancel context and verify graceful shutdown + cancel() + err := <-errChan + assert.ErrorIs(t, err, context.Canceled) // Graceful shutdown should return context.Canceled from ctx.Err() +} + +func TestWorkerSupervisor_Run_AllWorkersExit(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + // Both workers exit on their own (not from context cancellation) + worker1 := newMockWorker("worker-1", func(ctx context.Context) error { + time.Sleep(50 * time.Millisecond) + return errors.New("worker 1 failed") + }) + + worker2 := newMockWorker("worker-2", func(ctx context.Context) error { + time.Sleep(100 * time.Millisecond) + return errors.New("worker 2 failed") + }) + + supervisor.Register(worker1) + supervisor.Register(worker2) + + ctx := context.Background() + + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Wait for both workers to exit + err := <-errChan + assert.Error(t, err) // Should return error when all workers exit unexpectedly + assert.Contains(t, err.Error(), "all workers have exited unexpectedly") + + // Verify both workers are marked as failed + assert.False(t, supervisor.GetHealthTracker().IsHealthy()) + + status := supervisor.GetHealthTracker().GetStatus() + assert.Equal(t, "failed", status["status"]) + + workers := status["workers"].(map[string]WorkerHealth) + assert.Equal(t, WorkerStatusFailed, workers["worker-1"].Status) + assert.Equal(t, WorkerStatusFailed, workers["worker-2"].Status) + + // Verify log message + assert.True(t, logger.Contains("all workers have exited")) +} + +func TestWorkerSupervisor_Run_ContextCanceled(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + worker := newMockWorker("worker-1", func(ctx context.Context) error { + <-ctx.Done() + return context.Canceled + }) + + supervisor.Register(worker) + + ctx, cancel := context.WithCancel(context.Background()) + + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give worker time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context + cancel() + + err := <-errChan + assert.ErrorIs(t, err, context.Canceled) // Should return context.Canceled +} + +func TestWorkerSupervisor_Run_NoWorkers(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + ctx := context.Background() + err := supervisor.Run(ctx) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no workers registered") + assert.True(t, logger.Contains("no workers registered")) +} + +func TestHealthTracker_ConcurrentAccess(t *testing.T) { + t.Parallel() + + tracker := NewHealthTracker() + + var wg sync.WaitGroup + workers := 100 + + // Concurrent writes + for i := 0; i < workers; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("worker-%d", i) + if i%2 == 0 { + tracker.MarkHealthy(name) + } else { + tracker.MarkFailed(name) + } + }(i) + } + + // Concurrent reads + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = tracker.IsHealthy() + _ = tracker.GetStatus() + }() + } + + wg.Wait() + + // Verify final state + status := tracker.GetStatus() + workersMap := status["workers"].(map[string]WorkerHealth) + assert.Len(t, workersMap, workers) +} + +func TestWorkerSupervisor_Run_VariableShutdownTiming(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + // Worker that shuts down quickly (50ms) + fastWorker := newMockWorker("fast", func(ctx context.Context) error { + <-ctx.Done() + time.Sleep(50 * time.Millisecond) + return nil + }) + + // Worker that shuts down slowly (200ms) + slowWorker := newMockWorker("slow", func(ctx context.Context) error { + <-ctx.Done() + time.Sleep(200 * time.Millisecond) + return nil + }) + + // Worker that shuts down instantly + instantWorker := newMockWorker("instant", func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + + supervisor.Register(fastWorker) + supervisor.Register(slowWorker) + supervisor.Register(instantWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + start := time.Now() + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give workers time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context and verify graceful shutdown + cancel() + + err := <-errChan + elapsed := time.Since(start) + + assert.ErrorIs(t, err, context.Canceled) + + // Supervisor should wait for the slowest worker (200ms) + // Total time should be at least 200ms (slow worker) + some overhead + assert.True(t, elapsed >= 200*time.Millisecond, + "expected shutdown to take at least 200ms (slowest worker), got %v", elapsed) + + // But not too much longer (should complete within 300ms) + assert.True(t, elapsed < 300*time.Millisecond, + "shutdown took too long: %v", elapsed) +} + +func TestWorkerSupervisor_Run_VerySlowShutdown_NoTimeout(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) // No timeout + + // Worker that takes a very long time to shutdown (2 seconds) + verySlowWorker := newMockWorker("very-slow", func(ctx context.Context) error { + <-ctx.Done() + time.Sleep(2 * time.Second) + return nil + }) + + supervisor.Register(verySlowWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + start := time.Now() + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give worker time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context + cancel() + + // Wait for shutdown with timeout + select { + case err := <-errChan: + elapsed := time.Since(start) + assert.ErrorIs(t, err, context.Canceled) + + // Should wait the full 2 seconds for worker to finish + assert.True(t, elapsed >= 2*time.Second, + "expected to wait at least 2s for slow worker, got %v", elapsed) + + t.Logf("Supervisor waited %v for slow worker to shutdown gracefully (no timeout)", elapsed) + + case <-time.After(3 * time.Second): + t.Fatal("Supervisor.Run() blocked for more than 3 seconds") + } +} + +func TestWorkerSupervisor_Run_ShutdownTimeout(t *testing.T) { + logger := newMockLogger() + // Set shutdown timeout to 500ms + supervisor := NewWorkerSupervisor(logger, WithShutdownTimeout(500*time.Millisecond)) + + // Worker that takes 2 seconds to shutdown (longer than timeout) + slowWorker := newMockWorker("slow", func(ctx context.Context) error { + <-ctx.Done() + time.Sleep(2 * time.Second) + return nil + }) + + supervisor.Register(slowWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + start := time.Now() + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give worker time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context + cancel() + + err := <-errChan + elapsed := time.Since(start) + + // Should return timeout error + require.Error(t, err) + assert.Contains(t, err.Error(), "shutdown timeout exceeded") + + // Should return after ~500ms (timeout), not 2s (worker shutdown time) + assert.True(t, elapsed >= 500*time.Millisecond, + "expected to wait at least 500ms (timeout), got %v", elapsed) + assert.True(t, elapsed < 1*time.Second, + "expected to timeout before 1s, got %v", elapsed) + + t.Logf("Supervisor timed out after %v (expected ~500ms)", elapsed) +} + +func TestWorkerSupervisor_Run_ShutdownTimeout_FastWorkers(t *testing.T) { + logger := newMockLogger() + // Set shutdown timeout to 2s + supervisor := NewWorkerSupervisor(logger, WithShutdownTimeout(2*time.Second)) + + // Workers that shutdown quickly (100ms) + fastWorker := newMockWorker("fast", func(ctx context.Context) error { + <-ctx.Done() + time.Sleep(100 * time.Millisecond) + return nil + }) + + supervisor.Register(fastWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + start := time.Now() + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give worker time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context + cancel() + + err := <-errChan + elapsed := time.Since(start) + + // Should NOT timeout since worker finishes quickly (returns nil when timeout configured but not exceeded) + assert.NoError(t, err) + + // Should return after ~100ms (worker shutdown time), not 2s (timeout) + assert.True(t, elapsed >= 100*time.Millisecond, + "expected to wait at least 100ms, got %v", elapsed) + assert.True(t, elapsed < 500*time.Millisecond, + "shutdown took too long: %v", elapsed) + + t.Logf("Supervisor shutdown in %v (workers finished before timeout)", elapsed) +} + +func TestWorkerSupervisor_Run_StuckWorker(t *testing.T) { + logger := newMockLogger() + supervisor := NewWorkerSupervisor(logger) + + // Worker that never shuts down (ignores context cancellation) + stuckWorker := newMockWorker("stuck", func(ctx context.Context) error { + // Ignores context, blocks forever + select {} + }) + + supervisor.Register(stuckWorker) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := make(chan error, 1) + go func() { + errChan <- supervisor.Run(ctx) + }() + + // Give worker time to start + time.Sleep(50 * time.Millisecond) + + // Cancel context + cancel() + + // Verify that supervisor blocks indefinitely waiting for stuck worker + select { + case <-errChan: + t.Fatal("Supervisor.Run() returned but worker is stuck - should block forever") + case <-time.After(500 * time.Millisecond): + // Expected: supervisor is still waiting for stuck worker + t.Log("Supervisor correctly blocks waiting for stuck worker (this is expected behavior)") + } + + // Note: In production, this is why orchestrators need timeouts for pod termination + // Kubernetes will forcefully kill pods that don't shutdown within terminationGracePeriodSeconds +}