diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 232e611..a643ae1 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -90,3 +90,41 @@ jobs: - name: Integration Tests run: make integration-test + + redis-integration-tests: + name: Redis integration tests + runs-on: ubuntu-24.04 + + services: + redis: + image: redis:7-alpine + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup environment + id: environment + run: | + GOVERSION=$(make go-version) + echo "GOVERSION=${GOVERSION}" >> $GITHUB_ENV + echo "PROMGITHUB_REDIS_ADDR=127.0.0.1:6379" >> $GITHUB_ENV + + - name: Set up Golang + uses: actions/setup-go@v5 + id: go + with: + go-version: ${{ env.GOVERSION }} + + - name: Install Tools and Dependencies + run: make deps + + - name: Redis Integration Tests + run: make redis-integration-test diff --git a/Makefile b/Makefile index ebe6cb2..f53c35f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: build container container-security cross-platform debug release test unit-test integration-test test-all go-version coverage fmt lint deps security clean dev-setup +.PHONY: build container container-security cross-platform debug release test unit-test integration-test redis-integration-test test-all go-version coverage fmt lint deps security clean dev-setup include version @@ -31,7 +31,7 @@ debug: LDFLAGS := $(LDFLAGS_DBG) debug: TARGET := $(TARGET)-debug debug: build -test: unit-test integration-test ## Run the full Go test suite +test: unit-test integration-test redis-integration-test ## Run the full Go test suite unit-test: PROMGITHUB_WEBHOOK_SECRET := test-secret unit-test: ## Run unit tests @@ -43,6 +43,11 @@ integration-test: ## Run integration tests @echo "${COLOR_GREEN}Running Integration Tests..${COLOR_RESET}" @go test -tags=integration -v $(SRC) +redis-integration-test: PROMGITHUB_WEBHOOK_SECRET := test-secret +redis-integration-test: ## Run Redis-backed integration tests against PROMGITHUB_REDIS_ADDR + @echo "${COLOR_GREEN}Running Redis Integration Tests..${COLOR_RESET}" + @go test -tags='integration redis' -run '^TestRedisIntegration' -v $(SRC) + coverage: ## Run unit tests with coverage @echo "${COLOR_GREEN}Running Coverage Checks..${COLOR_RESET}" @go test -race -coverprofile=coverage.out -covermode=atomic $(SRC) diff --git a/src/async.go b/src/async.go index 8fa017d..bf26f0d 100644 --- a/src/async.go +++ b/src/async.go @@ -92,7 +92,7 @@ func (p *asyncEventProcessor) Stop() { func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, body []byte) error { event := webhookEvent{ - ctx: ctx, + ctx: context.WithoutCancel(ctx), eventType: eventType, body: append([]byte(nil), body...), } diff --git a/src/integration_test.go b/src/integration_test.go index 272adda..24e933c 100644 --- a/src/integration_test.go +++ b/src/integration_test.go @@ -218,6 +218,8 @@ func resetIntegrationTestMetrics() { asyncEventsDroppedCounter.Reset() asyncProcessingFailuresCounter.Reset() asyncProcessingDurationHistogram.Reset() + duplicateDeliveriesSeenCounter.Reset() + duplicateDeliveriesDroppedCounter.Reset() defaultServiceMetrics.apiCallsCounter.Reset() defaultServiceMetrics.requestDurationHistogram.Reset() asyncQueueDepthGauge.Set(0) diff --git a/src/redis_integration_test.go b/src/redis_integration_test.go new file mode 100644 index 0000000..403ba0a --- /dev/null +++ b/src/redis_integration_test.go @@ -0,0 +1,218 @@ +//go:build integration && redis + +package main + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +func TestRedisIntegrationDuplicateDeliverySharedAcrossServers(t *testing.T) { + store := newRedisIntegrationStore(t) + serverA := newRedisIntegrationTestServer(t, store) + defer serverA.Close() + serverB := httptestServerSharingGlobals(t) + defer serverB.Close() + + body := mustReadFixture(t, "workflow_run.json") + + resp := sendWebhookRequest(t, serverA.URL, "workflow_run", body, "redis-shared-delivery") + if resp.StatusCode != http.StatusAccepted { + _ = resp.Body.Close() + t.Fatalf("expected first status %d, got %d", http.StatusAccepted, resp.StatusCode) + } + _ = resp.Body.Close() + + resp = sendWebhookRequest(t, serverB.URL, "workflow_run", body, "redis-shared-delivery") + if resp.StatusCode != http.StatusOK { + _ = resp.Body.Close() + t.Fatalf("expected duplicate status %d from second server, got %d", http.StatusOK, resp.StatusCode) + } + _ = resp.Body.Close() + + metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`) + if !strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) { + t.Fatalf("expected workflow metric to be recorded once, got:\n%s", metrics) + } + if !strings.Contains(metrics, `promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected duplicate delivery to be dropped by shared Redis state, got:\n%s", metrics) + } +} + +func TestRedisIntegrationWorkflowAndJobStatePersistAcrossLookups(t *testing.T) { + store := newRedisIntegrationStore(t) + server := newRedisIntegrationTestServer(t, store) + defer server.Close() + + workflowBody := mustReadFixture(t, "workflow_run.json") + resp := sendWebhookRequest(t, server.URL, "workflow_run", workflowBody, "redis-workflow-state-1") + if resp.StatusCode != http.StatusAccepted { + _ = resp.Body.Close() + t.Fatalf("expected workflow status %d, got %d", http.StatusAccepted, resp.StatusCode) + } + _ = resp.Body.Close() + waitForMetricsSubstring(t, server.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) + + workflowState := waitForRedisWorkflowRun(t, store, 1001) + if workflowState.Repository != "user/repo" || workflowState.Branch != "main" || workflowState.Name != "CI" || workflowState.Status != "completed" || workflowState.Conclusion != "success" { + t.Fatalf("unexpected workflow state persisted in redis: %+v", workflowState) + } + + jobBody := mustReadFixture(t, "workflow_job.json") + resp = sendWebhookRequest(t, server.URL, "workflow_job", jobBody, "redis-job-state-1") + if resp.StatusCode != http.StatusAccepted { + _ = resp.Body.Close() + t.Fatalf("expected job status %d, got %d", http.StatusAccepted, resp.StatusCode) + } + _ = resp.Body.Close() + waitForMetricsSubstring(t, server.URL, `promgithub_job_status{branch="main",job_conclusion="success",job_status="completed",repository="user/repo",workflow_name="CI"} 1`) + + jobState := waitForRedisWorkflowJob(t, store, 1) + if jobState.Repository != "user/repo" || jobState.Branch != "main" || jobState.Name != "CI" || jobState.Status != "completed" || jobState.Conclusion != "success" { + t.Fatalf("unexpected job state persisted in redis: %+v", jobState) + } +} + +func TestRedisIntegrationDuplicateRunTransitionDoesNotDoubleCount(t *testing.T) { + store := newRedisIntegrationStore(t) + serverA := newRedisIntegrationTestServer(t, store) + defer serverA.Close() + serverB := httptestServerSharingGlobals(t) + defer serverB.Close() + + body := mustReadFixture(t, "workflow_run.json") + for _, request := range []struct { + serverURL string + deliveryID string + }{ + {serverURL: serverA.URL, deliveryID: "redis-run-transition-1"}, + {serverURL: serverB.URL, deliveryID: "redis-run-transition-2"}, + } { + resp := sendWebhookRequest(t, request.serverURL, "workflow_run", body, request.deliveryID) + if resp.StatusCode != http.StatusAccepted { + _ = resp.Body.Close() + t.Fatalf("expected status %d for %s, got %d", http.StatusAccepted, request.deliveryID, resp.StatusCode) + } + _ = resp.Body.Close() + } + + metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) + if strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 2`) { + t.Fatalf("expected Redis-backed run state to suppress duplicate transition, got:\n%s", metrics) + } +} + +func TestRedisIntegrationConnectionFailureIsClear(t *testing.T) { + _, err := NewRedisStateStore(RedisConfig{ + Addr: "127.0.0.1:0", + KeyPrefix: "promgithub-test-failure", + DeliveryTTL: time.Minute, + }) + if err == nil { + t.Fatal("expected redis connection failure") + } + if !strings.Contains(err.Error(), "ping redis") { + t.Fatalf("expected redis ping context in error, got %q", err.Error()) + } +} + +func newRedisIntegrationStore(t *testing.T) *RedisStateStore { + t.Helper() + + addr := strings.TrimSpace(os.Getenv("PROMGITHUB_REDIS_ADDR")) + if addr == "" { + addr = "127.0.0.1:6379" + } + + keyPrefix := fmt.Sprintf("promgithub:test:%s:%d", strings.NewReplacer("/", "_", " ", "_").Replace(t.Name()), time.Now().UnixNano()) + store, err := NewRedisStateStore(RedisConfig{ + Addr: addr, + KeyPrefix: keyPrefix, + DeliveryTTL: time.Minute, + }) + if err != nil { + t.Fatalf("real Redis integration tests require a reachable Redis at %s: %v", addr, err) + } + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cleanupRedisKeys(ctx, store, keyPrefix) + _ = store.Close() + }) + + return store +} + +func newRedisIntegrationTestServer(t *testing.T, store StateStore) *httptest.Server { + t.Helper() + resetIntegrationTestMetrics() + + githubWebhookSecret = []byte("integration-test-secret") + stateStore = store + eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}, zap.NewNop()) + eventProcessor.Start() + t.Cleanup(func() { + eventProcessor.Stop() + eventProcessor = nil + stateStore = nil + }) + + router := setupRouter(zap.NewNop(), defaultServiceMetrics, prometheus.DefaultGatherer) + return httptest.NewServer(router) +} + +func httptestServerSharingGlobals(t *testing.T) *httptest.Server { + t.Helper() + router := setupRouter(zap.NewNop(), defaultServiceMetrics, prometheus.DefaultGatherer) + return httptest.NewServer(router) +} + +func waitForRedisWorkflowRun(t *testing.T, store *RedisStateStore, id int) RunState { + t.Helper() + return waitForRedisState(t, func(ctx context.Context) (RunState, bool, error) { + return store.GetWorkflowRun(ctx, id) + }) +} + +func waitForRedisWorkflowJob(t *testing.T, store *RedisStateStore, id int) RunState { + t.Helper() + return waitForRedisState(t, func(ctx context.Context) (RunState, bool, error) { + return store.GetWorkflowJob(ctx, id) + }) +} + +func waitForRedisState(t *testing.T, get func(context.Context) (RunState, bool, error)) RunState { + t.Helper() + ctx := context.Background() + var last RunState + for i := 0; i < 50; i++ { + state, found, err := get(ctx) + if err != nil { + t.Fatalf("failed to read redis state: %v", err) + } + if found { + return state + } + last = state + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("timed out waiting for redis state, last=%+v", last) + return RunState{} +} + +func cleanupRedisKeys(ctx context.Context, store *RedisStateStore, keyPrefix string) { + iter := store.client.Scan(ctx, 0, keyPrefix+":*", 0).Iterator() + for iter.Next(ctx) { + _ = store.client.Del(ctx, iter.Val()).Err() + } +}