From 15a838ca0385218957eb7e20bde77fa6a8b9e959 Mon Sep 17 00:00:00 2001 From: Abhishek Rai Date: Thu, 4 Jun 2026 18:52:48 -0700 Subject: [PATCH] Consolidate event dispatch policy --- .go-version | 2 +- src/async.go | 35 ++++++++++-------------- src/async_test.go | 50 +++++++++++++++++++++++++++++++--- src/event_dispatcher.go | 33 ++++++++++++++++++++++ src/event_dispatcher_test.go | 53 ++++++++++++++++++++++++++++++++++++ src/github_test.go | 8 +++--- src/integration_test.go | 12 ++++---- src/test_support_test.go | 8 ++++++ src/webhook_ingestion.go | 21 +------------- 9 files changed, 166 insertions(+), 56 deletions(-) create mode 100644 src/event_dispatcher.go create mode 100644 src/event_dispatcher_test.go diff --git a/.go-version b/.go-version index ba7b129..4fd1625 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.25.10 +1.25.11 diff --git a/src/async.go b/src/async.go index 73ece7b..0e2b7b5 100644 --- a/src/async.go +++ b/src/async.go @@ -29,11 +29,11 @@ type asyncProcessorConfig struct { } type asyncEventProcessor struct { - queue chan webhookEvent - workers int - processFn map[string]eventHandler - logger *zap.Logger - wg sync.WaitGroup + queue chan webhookEvent + workers int + dispatcher webhookEventDispatcher + logger *zap.Logger + wg sync.WaitGroup } func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) { @@ -58,15 +58,10 @@ func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) { func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *asyncEventProcessor { processor := &asyncEventProcessor{ - queue: make(chan webhookEvent, cfg.QueueSize), - workers: cfg.WorkerCount, - processFn: map[string]eventHandler{ - "workflow_run": updateWorkflowMetrics, - "workflow_job": updateJobMetrics, - "push": func(_ context.Context, body []byte) { updateCommitMetrics(body) }, - "pull_request": func(_ context.Context, body []byte) { updatePullRequestMetrics(body) }, - }, - logger: logger, + queue: make(chan webhookEvent, cfg.QueueSize), + workers: cfg.WorkerCount, + dispatcher: newDefaultGitHubEventDispatcher(), + logger: logger, } defaultMetricRecorder.SetAsyncWorkerCount(cfg.WorkerCount) @@ -114,12 +109,6 @@ func (p *asyncEventProcessor) runWorker(workerID int) { defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue)) start := time.Now() - processor, ok := p.processFn[event.eventType] - if !ok { - defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType) - continue - } - func() { defer func() { if recovered := recover(); recovered != nil { @@ -132,7 +121,11 @@ func (p *asyncEventProcessor) runWorker(workerID int) { } }() - processor(event.ctx, event.body) + if p.dispatcher == nil || !p.dispatcher.Dispatch(event.ctx, event.eventType, event.body) { + defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType) + return + } + defaultMetricRecorder.RecordAsyncProcessedEvent(event.eventType) defaultMetricRecorder.ObserveAsyncProcessingDuration(event.eventType, time.Since(start).Seconds()) }() diff --git a/src/async_test.go b/src/async_test.go index 17162ea..2a96481 100644 --- a/src/async_test.go +++ b/src/async_test.go @@ -22,9 +22,9 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) { processed := make(chan struct{}, 1) processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop()) - processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + useWorkflowRunAsyncEventHandler(processor, func(_ context.Context, _ []byte) { processed <- struct{}{} - } + }) processor.Start() defer processor.Stop() @@ -51,15 +51,57 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) { } } +func TestAsyncProcessorUsesSharedGitHubEventDispatcher(t *testing.T) { + asyncProcessedEventsCounter.Reset() + asyncUnsupportedEventsCounter.Reset() + + processed := make(chan struct{}, 1) + processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop()) + processor.dispatcher = githubEventDispatcher{ + handlers: map[string]eventHandler{ + githubEventWorkflowRun: func(_ context.Context, _ []byte) { + processed <- struct{}{} + }, + }, + } + processor.Start() + defer processor.Stop() + + if err := processor.Enqueue(context.Background(), githubEventWorkflowRun, []byte(`{}`)); err != nil { + t.Fatalf("unexpected enqueue error: %v", err) + } + if err := processor.Enqueue(context.Background(), "unknown_event", []byte(`{}`)); err != nil { + t.Fatalf("unexpected enqueue error: %v", err) + } + + select { + case <-processed: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for async processing") + } + + deadline := time.Now().Add(2 * time.Second) + for { + if got := testutil.ToFloat64(asyncUnsupportedEventsCounter.WithLabelValues("unknown_event")); got == 1 { + break + } + if time.Now().After(deadline) { + got := testutil.ToFloat64(asyncUnsupportedEventsCounter.WithLabelValues("unknown_event")) + t.Fatalf("expected unsupported counter to be 1, got %v", got) + } + time.Sleep(10 * time.Millisecond) + } +} + func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) { asyncQueueDroppedCounter.Reset() asyncQueueDepthGauge.Set(0) blocker := make(chan struct{}) processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) - processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + useWorkflowRunAsyncEventHandler(processor, func(_ context.Context, _ []byte) { <-blocker - } + }) processor.Start() defer func() { close(blocker) diff --git a/src/event_dispatcher.go b/src/event_dispatcher.go new file mode 100644 index 0000000..ee7fa9f --- /dev/null +++ b/src/event_dispatcher.go @@ -0,0 +1,33 @@ +package main + +import "context" + +type githubEventDispatcher struct { + handlers map[string]eventHandler +} + +func newDefaultGitHubEventDispatcher() githubEventDispatcher { + return githubEventDispatcher{ + handlers: map[string]eventHandler{ + githubEventWorkflowRun: updateWorkflowMetrics, + githubEventWorkflowJob: updateJobMetrics, + githubEventPush: func(_ context.Context, body []byte) { updateCommitMetrics(body) }, + githubEventPullRequest: func(_ context.Context, body []byte) { updatePullRequestMetrics(body) }, + }, + } +} + +func (d githubEventDispatcher) Supports(eventType string) bool { + _, ok := d.handlers[eventType] + return ok +} + +func (d githubEventDispatcher) Dispatch(ctx context.Context, eventType string, body []byte) bool { + handler, ok := d.handlers[eventType] + if !ok { + return false + } + + handler(ctx, body) + return true +} diff --git a/src/event_dispatcher_test.go b/src/event_dispatcher_test.go new file mode 100644 index 0000000..349263e --- /dev/null +++ b/src/event_dispatcher_test.go @@ -0,0 +1,53 @@ +//go:build !integration + +package main + +import ( + "bytes" + "context" + "testing" +) + +func TestGitHubEventDispatcherRoutesSupportedEventsAndRejectsUnknown(t *testing.T) { + body := []byte(`{"ok":true}`) + var routedEvent string + var routedBody []byte + + dispatcher := githubEventDispatcher{ + handlers: map[string]eventHandler{ + githubEventWorkflowRun: func(_ context.Context, eventBody []byte) { + routedEvent = githubEventWorkflowRun + routedBody = eventBody + }, + }, + } + + if ok := dispatcher.Dispatch(context.Background(), githubEventWorkflowRun, body); !ok { + t.Fatal("expected workflow_run to be dispatched") + } + if routedEvent != githubEventWorkflowRun || !bytes.Equal(routedBody, body) { + t.Fatalf("unexpected routed event %q with body %q", routedEvent, string(routedBody)) + } + if ok := dispatcher.Dispatch(context.Background(), "unknown_event", body); ok { + t.Fatal("expected unknown event to be rejected") + } +} + +func TestDefaultGitHubEventDispatcherOwnsSupportedEventPolicy(t *testing.T) { + dispatcher := newDefaultGitHubEventDispatcher() + + for _, eventType := range []string{ + githubEventWorkflowRun, + githubEventWorkflowJob, + githubEventPush, + githubEventPullRequest, + } { + if !dispatcher.Supports(eventType) { + t.Fatalf("expected %s to be supported", eventType) + } + } + + if dispatcher.Supports("unknown_event") { + t.Fatal("expected unknown_event to be unsupported") + } +} diff --git a/src/github_test.go b/src/github_test.go index b48e90d..ee9b2b9 100644 --- a/src/github_test.go +++ b/src/github_test.go @@ -259,9 +259,9 @@ func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) { processed := make(chan struct{}, 1) eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) - eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) { processed <- struct{}{} - } + }) eventProcessor.Start() defer func() { eventProcessor.Stop() @@ -288,9 +288,9 @@ func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) { blocker := make(chan struct{}) eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) - eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) { <-blocker - } + }) defer func() { close(blocker) eventProcessor.Stop() diff --git a/src/integration_test.go b/src/integration_test.go index fc1394e..65d2731 100644 --- a/src/integration_test.go +++ b/src/integration_test.go @@ -258,14 +258,14 @@ func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t started := make(chan struct{}) unblock := make(chan struct{}) - eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) { select { case <-started: default: close(started) } <-unblock - } + }) t.Cleanup(func() { select { case <-unblock: @@ -301,13 +301,13 @@ func TestIntegrationAsyncProcessingFailureIsVisibleAndWorkerContinues(t *testing defer server.Close() var attempts atomic.Int32 - eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) { + useWorkflowRunAsyncEventHandler(eventProcessor, func(ctx context.Context, body []byte) { if attempts.Add(1) == 1 { panic("synthetic async processor failure") } updateWorkflowMetrics(ctx, body) - } + }) body := mustReadFixture(t, "workflow_run.json") first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-1") @@ -333,7 +333,7 @@ func TestIntegrationAsyncShutdownDrainsQueuedEvents(t *testing.T) { started := make(chan struct{}) unblock := make(chan struct{}) - eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) { + useWorkflowRunAsyncEventHandler(eventProcessor, func(ctx context.Context, body []byte) { select { case <-started: default: @@ -341,7 +341,7 @@ func TestIntegrationAsyncShutdownDrainsQueuedEvents(t *testing.T) { } <-unblock updateWorkflowMetrics(ctx, body) - } + }) t.Cleanup(func() { select { case <-unblock: diff --git a/src/test_support_test.go b/src/test_support_test.go index 7e0231a..efb316a 100644 --- a/src/test_support_test.go +++ b/src/test_support_test.go @@ -49,6 +49,14 @@ func useStateBackends( }) } +func useWorkflowRunAsyncEventHandler(processor *asyncEventProcessor, handler eventHandler) { + processor.dispatcher = githubEventDispatcher{ + handlers: map[string]eventHandler{ + githubEventWorkflowRun: handler, + }, + } +} + func (s *inMemoryStateStore) MarkDeliveryProcessed(_ context.Context, deliveryID string) (bool, error) { if _, ok := s.deliveries[deliveryID]; ok { return false, nil diff --git a/src/webhook_ingestion.go b/src/webhook_ingestion.go index 6267b10..1ddfca7 100644 --- a/src/webhook_ingestion.go +++ b/src/webhook_ingestion.go @@ -66,7 +66,7 @@ func newDefaultWebhookIngestion() *webhookIngestion { logger: logger, deliveryStore: deliveryStateStore, localDeduper: deliveryDeduperCache, - dispatcher: defaultWebhookEventDispatcher{}, + dispatcher: newDefaultGitHubEventDispatcher(), metrics: defaultMetricRecorder, now: time.Now, } @@ -171,25 +171,6 @@ func (i *webhookIngestion) logError(message string, fields ...zap.Field) { } } -type defaultWebhookEventDispatcher struct{} - -func (defaultWebhookEventDispatcher) Dispatch(ctx context.Context, eventType string, body []byte) bool { - switch eventType { - case githubEventWorkflowRun: - updateWorkflowMetrics(ctx, body) - case githubEventWorkflowJob: - updateJobMetrics(ctx, body) - case githubEventPush: - updateCommitMetrics(body) - case githubEventPullRequest: - updatePullRequestMetrics(body) - default: - return false - } - - return true -} - func webhookHTTPHandler(acceptor webhookAcceptor, logger *zap.Logger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body)