Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .go-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.25.10
1.25.11
35 changes: 14 additions & 21 deletions src/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ type asyncProcessorConfig struct {
}

type asyncEventProcessor struct {
queue chan webhookEvent
workers int
processFn map[string]eventHandler
logger *zap.Logger
wg sync.WaitGroup
queue chan webhookEvent
workers int
dispatcher webhookEventDispatcher
logger *zap.Logger
wg sync.WaitGroup
}

func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) {
Expand All @@ -58,15 +58,10 @@ func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) {

func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *asyncEventProcessor {
processor := &asyncEventProcessor{
queue: make(chan webhookEvent, cfg.QueueSize),
workers: cfg.WorkerCount,
processFn: map[string]eventHandler{
"workflow_run": updateWorkflowMetrics,
"workflow_job": updateJobMetrics,
"push": func(_ context.Context, body []byte) { updateCommitMetrics(body) },
"pull_request": func(_ context.Context, body []byte) { updatePullRequestMetrics(body) },
},
logger: logger,
queue: make(chan webhookEvent, cfg.QueueSize),
workers: cfg.WorkerCount,
dispatcher: newDefaultGitHubEventDispatcher(),
logger: logger,
}

defaultMetricRecorder.SetAsyncWorkerCount(cfg.WorkerCount)
Expand Down Expand Up @@ -114,12 +109,6 @@ func (p *asyncEventProcessor) runWorker(workerID int) {
defaultMetricRecorder.SetAsyncQueueDepth(len(p.queue))
start := time.Now()

processor, ok := p.processFn[event.eventType]
if !ok {
defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType)
continue
}

func() {
defer func() {
if recovered := recover(); recovered != nil {
Expand All @@ -132,7 +121,11 @@ func (p *asyncEventProcessor) runWorker(workerID int) {
}
}()

processor(event.ctx, event.body)
if p.dispatcher == nil || !p.dispatcher.Dispatch(event.ctx, event.eventType, event.body) {
defaultMetricRecorder.RecordAsyncUnsupportedEvent(event.eventType)
return
}

defaultMetricRecorder.RecordAsyncProcessedEvent(event.eventType)
defaultMetricRecorder.ObserveAsyncProcessingDuration(event.eventType, time.Since(start).Seconds())
}()
Expand Down
50 changes: 46 additions & 4 deletions src/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {

processed := make(chan struct{}, 1)
processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop())
processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
useWorkflowRunAsyncEventHandler(processor, func(_ context.Context, _ []byte) {
processed <- struct{}{}
}
})
processor.Start()
defer processor.Stop()

Expand All @@ -51,15 +51,57 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {
}
}

func TestAsyncProcessorUsesSharedGitHubEventDispatcher(t *testing.T) {
asyncProcessedEventsCounter.Reset()
asyncUnsupportedEventsCounter.Reset()

processed := make(chan struct{}, 1)
processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop())
processor.dispatcher = githubEventDispatcher{
handlers: map[string]eventHandler{
githubEventWorkflowRun: func(_ context.Context, _ []byte) {
processed <- struct{}{}
},
},
}
processor.Start()
defer processor.Stop()

if err := processor.Enqueue(context.Background(), githubEventWorkflowRun, []byte(`{}`)); err != nil {
t.Fatalf("unexpected enqueue error: %v", err)
}
if err := processor.Enqueue(context.Background(), "unknown_event", []byte(`{}`)); err != nil {
t.Fatalf("unexpected enqueue error: %v", err)
}

select {
case <-processed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for async processing")
}

deadline := time.Now().Add(2 * time.Second)
for {
if got := testutil.ToFloat64(asyncUnsupportedEventsCounter.WithLabelValues("unknown_event")); got == 1 {
break
}
if time.Now().After(deadline) {
got := testutil.ToFloat64(asyncUnsupportedEventsCounter.WithLabelValues("unknown_event"))
t.Fatalf("expected unsupported counter to be 1, got %v", got)
}
time.Sleep(10 * time.Millisecond)
}
}

func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) {
asyncQueueDroppedCounter.Reset()
asyncQueueDepthGauge.Set(0)

blocker := make(chan struct{})
processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
useWorkflowRunAsyncEventHandler(processor, func(_ context.Context, _ []byte) {
<-blocker
}
})
processor.Start()
defer func() {
close(blocker)
Expand Down
33 changes: 33 additions & 0 deletions src/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import "context"

type githubEventDispatcher struct {
handlers map[string]eventHandler
}

func newDefaultGitHubEventDispatcher() githubEventDispatcher {
return githubEventDispatcher{
handlers: map[string]eventHandler{
githubEventWorkflowRun: updateWorkflowMetrics,
githubEventWorkflowJob: updateJobMetrics,
githubEventPush: func(_ context.Context, body []byte) { updateCommitMetrics(body) },
githubEventPullRequest: func(_ context.Context, body []byte) { updatePullRequestMetrics(body) },
},
}
}

func (d githubEventDispatcher) Supports(eventType string) bool {
_, ok := d.handlers[eventType]
return ok
}

func (d githubEventDispatcher) Dispatch(ctx context.Context, eventType string, body []byte) bool {
handler, ok := d.handlers[eventType]
if !ok {
return false
}

handler(ctx, body)
return true
}
53 changes: 53 additions & 0 deletions src/event_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//go:build !integration

package main

import (
"bytes"
"context"
"testing"
)

func TestGitHubEventDispatcherRoutesSupportedEventsAndRejectsUnknown(t *testing.T) {
body := []byte(`{"ok":true}`)
var routedEvent string
var routedBody []byte

dispatcher := githubEventDispatcher{
handlers: map[string]eventHandler{
githubEventWorkflowRun: func(_ context.Context, eventBody []byte) {
routedEvent = githubEventWorkflowRun
routedBody = eventBody
},
},
}

if ok := dispatcher.Dispatch(context.Background(), githubEventWorkflowRun, body); !ok {
t.Fatal("expected workflow_run to be dispatched")
}
if routedEvent != githubEventWorkflowRun || !bytes.Equal(routedBody, body) {
t.Fatalf("unexpected routed event %q with body %q", routedEvent, string(routedBody))
}
if ok := dispatcher.Dispatch(context.Background(), "unknown_event", body); ok {
t.Fatal("expected unknown event to be rejected")
}
}

func TestDefaultGitHubEventDispatcherOwnsSupportedEventPolicy(t *testing.T) {
dispatcher := newDefaultGitHubEventDispatcher()

for _, eventType := range []string{
githubEventWorkflowRun,
githubEventWorkflowJob,
githubEventPush,
githubEventPullRequest,
} {
if !dispatcher.Supports(eventType) {
t.Fatalf("expected %s to be supported", eventType)
}
}

if dispatcher.Supports("unknown_event") {
t.Fatal("expected unknown_event to be unsupported")
}
}
8 changes: 4 additions & 4 deletions src/github_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) {

processed := make(chan struct{}, 1)
eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) {
processed <- struct{}{}
}
})
eventProcessor.Start()
defer func() {
eventProcessor.Stop()
Expand All @@ -288,9 +288,9 @@ func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) {

blocker := make(chan struct{})
eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) {
<-blocker
}
})
defer func() {
close(blocker)
eventProcessor.Stop()
Expand Down
12 changes: 6 additions & 6 deletions src/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,14 @@ func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t

started := make(chan struct{})
unblock := make(chan struct{})
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
useWorkflowRunAsyncEventHandler(eventProcessor, func(_ context.Context, _ []byte) {
select {
case <-started:
default:
close(started)
}
<-unblock
}
})
t.Cleanup(func() {
select {
case <-unblock:
Expand Down Expand Up @@ -301,13 +301,13 @@ func TestIntegrationAsyncProcessingFailureIsVisibleAndWorkerContinues(t *testing
defer server.Close()

var attempts atomic.Int32
eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) {
useWorkflowRunAsyncEventHandler(eventProcessor, func(ctx context.Context, body []byte) {
if attempts.Add(1) == 1 {
panic("synthetic async processor failure")
}

updateWorkflowMetrics(ctx, body)
}
})

body := mustReadFixture(t, "workflow_run.json")
first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-1")
Expand All @@ -333,15 +333,15 @@ func TestIntegrationAsyncShutdownDrainsQueuedEvents(t *testing.T) {

started := make(chan struct{})
unblock := make(chan struct{})
eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) {
useWorkflowRunAsyncEventHandler(eventProcessor, func(ctx context.Context, body []byte) {
select {
case <-started:
default:
close(started)
}
<-unblock
updateWorkflowMetrics(ctx, body)
}
})
t.Cleanup(func() {
select {
case <-unblock:
Expand Down
8 changes: 8 additions & 0 deletions src/test_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func useStateBackends(
})
}

func useWorkflowRunAsyncEventHandler(processor *asyncEventProcessor, handler eventHandler) {
processor.dispatcher = githubEventDispatcher{
handlers: map[string]eventHandler{
githubEventWorkflowRun: handler,
},
}
}

func (s *inMemoryStateStore) MarkDeliveryProcessed(_ context.Context, deliveryID string) (bool, error) {
if _, ok := s.deliveries[deliveryID]; ok {
return false, nil
Expand Down
21 changes: 1 addition & 20 deletions src/webhook_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newDefaultWebhookIngestion() *webhookIngestion {
logger: logger,
deliveryStore: deliveryStateStore,
localDeduper: deliveryDeduperCache,
dispatcher: defaultWebhookEventDispatcher{},
dispatcher: newDefaultGitHubEventDispatcher(),
metrics: defaultMetricRecorder,
now: time.Now,
}
Expand Down Expand Up @@ -171,25 +171,6 @@ func (i *webhookIngestion) logError(message string, fields ...zap.Field) {
}
}

type defaultWebhookEventDispatcher struct{}

func (defaultWebhookEventDispatcher) Dispatch(ctx context.Context, eventType string, body []byte) bool {
switch eventType {
case githubEventWorkflowRun:
updateWorkflowMetrics(ctx, body)
case githubEventWorkflowJob:
updateJobMetrics(ctx, body)
case githubEventPush:
updateCommitMetrics(body)
case githubEventPullRequest:
updatePullRequestMetrics(body)
default:
return false
}

return true
}

func webhookHTTPHandler(acceptor webhookAcceptor, logger *zap.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
Expand Down
Loading