diff --git a/src/async.go b/src/async.go new file mode 100644 index 0000000..8fa017d --- /dev/null +++ b/src/async.go @@ -0,0 +1,141 @@ +package main + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const ( + defaultWorkerCount = 4 + defaultQueueSize = 256 +) + +type eventHandler func(context.Context, []byte) + +type webhookEvent struct { + ctx context.Context + eventType string + body []byte +} + +type asyncProcessorConfig struct { + WorkerCount int + QueueSize int +} + +type asyncEventProcessor struct { + queue chan webhookEvent + workers int + processFn map[string]eventHandler + logger *zap.Logger + wg sync.WaitGroup +} + +func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) { + workers, err := parseEnvInt("PROMGITHUB_EVENT_WORKERS", defaultWorkerCount) + if err != nil { + return asyncProcessorConfig{}, err + } + if workers <= 0 { + return asyncProcessorConfig{}, errors.New("PROMGITHUB_EVENT_WORKERS must be greater than 0") + } + + queueSize, err := parseEnvInt("PROMGITHUB_EVENT_QUEUE_SIZE", defaultQueueSize) + if err != nil { + return asyncProcessorConfig{}, err + } + if queueSize <= 0 { + return asyncProcessorConfig{}, errors.New("PROMGITHUB_EVENT_QUEUE_SIZE must be greater than 0") + } + + return asyncProcessorConfig{WorkerCount: workers, QueueSize: queueSize}, nil +} + +func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *asyncEventProcessor { + processor := &asyncEventProcessor{ + queue: make(chan webhookEvent, cfg.QueueSize), + workers: cfg.WorkerCount, + processFn: map[string]eventHandler{ + "workflow_run": updateWorkflowMetrics, + "workflow_job": updateJobMetrics, + "push": func(_ context.Context, body []byte) { updateCommitMetrics(body) }, + "pull_request": func(_ context.Context, body []byte) { updatePullRequestMetrics(body) }, + }, + logger: logger, + } + + asyncWorkerCountGauge.Set(float64(cfg.WorkerCount)) + asyncQueueCapacityGauge.Set(float64(cfg.QueueSize)) + return processor +} + +func (p *asyncEventProcessor) Start() { + for workerID := 0; workerID < p.workers; workerID++ { + p.wg.Add(1) + go p.runWorker(workerID) + } +} + +func (p *asyncEventProcessor) Stop() { + if p == nil { + return + } + close(p.queue) + p.wg.Wait() +} + +func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, body []byte) error { + event := webhookEvent{ + ctx: ctx, + eventType: eventType, + body: append([]byte(nil), body...), + } + + select { + case p.queue <- event: + asyncQueueDepthGauge.Set(float64(len(p.queue))) + return nil + default: + asyncEventsDroppedCounter.WithLabelValues(eventType, "queue_full").Inc() + asyncQueueDepthGauge.Set(float64(len(p.queue))) + return fmt.Errorf("event queue is full") + } +} + +func (p *asyncEventProcessor) runWorker(workerID int) { + defer p.wg.Done() + + for event := range p.queue { + asyncQueueDepthGauge.Set(float64(len(p.queue))) + start := time.Now() + + processor, ok := p.processFn[event.eventType] + if !ok { + asyncEventsDroppedCounter.WithLabelValues(event.eventType, "unsupported_event").Inc() + continue + } + + func() { + defer func() { + if recovered := recover(); recovered != nil { + asyncProcessingFailuresCounter.WithLabelValues(event.eventType).Inc() + p.logger.Error("Recovered from async event processor panic", + zap.Int("workerID", workerID), + zap.String("eventType", event.eventType), + zap.Any("panic", recovered), + ) + } + }() + + processor(event.ctx, event.body) + asyncProcessedEventsCounter.WithLabelValues(event.eventType).Inc() + asyncProcessingDurationHistogram.With(prometheus.Labels{"event_type": event.eventType}).Observe(time.Since(start).Seconds()) + }() + } +} diff --git a/src/async_test.go b/src/async_test.go new file mode 100644 index 0000000..bfb77b2 --- /dev/null +++ b/src/async_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "go.uber.org/zap" +) + +func TestAsyncProcessorEnqueueAndProcess(t *testing.T) { + asyncProcessedEventsCounter.Reset() + asyncEventsDroppedCounter.Reset() + asyncProcessingFailuresCounter.Reset() + asyncQueueDepthGauge.Set(0) + asyncQueueCapacityGauge.Set(0) + asyncWorkerCountGauge.Set(0) + + processed := make(chan struct{}, 1) + processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop()) + processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + processed <- struct{}{} + } + processor.Start() + defer processor.Stop() + + if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{}`)); err != nil { + t.Fatalf("unexpected enqueue error: %v", err) + } + + select { + case <-processed: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for async processing") + } + + if got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues("workflow_run")); got != 1 { + t.Fatalf("expected processed counter to be 1, got %v", got) + } +} + +func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) { + asyncEventsDroppedCounter.Reset() + asyncQueueDepthGauge.Set(0) + + blocker := make(chan struct{}) + processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) + processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + <-blocker + } + processor.Start() + defer func() { + close(blocker) + processor.Stop() + }() + + if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":1}`)); err != nil { + t.Fatalf("unexpected enqueue error: %v", err) + } + if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":2}`)); err == nil { + t.Fatal("expected queue full error") + } + + if got := testutil.ToFloat64(asyncEventsDroppedCounter.WithLabelValues("workflow_run", "queue_full")); got != 1 { + t.Fatalf("expected dropped counter to be 1, got %v", got) + } +} diff --git a/src/github.go b/src/github.go index 45f3e06..8f62ced 100644 --- a/src/github.go +++ b/src/github.go @@ -112,7 +112,10 @@ const ( statusCompleted = "completed" ) -var stateStore StateStore +var ( + stateStore StateStore + eventProcessor *asyncEventProcessor +) func validateHMAC(body []byte, signature string, secret []byte) bool { h := hmac.New(sha256.New, secret) @@ -153,6 +156,16 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) { } eventType := r.Header.Get("X-GitHub-Event") + if eventProcessor != nil { + if err := eventProcessor.Enqueue(ctx, eventType, body); err != nil { + http.Error(w, "Webhook queue is full", http.StatusServiceUnavailable) + logger.Warn("Dropping webhook event because queue is full", zap.String("eventType", eventType), zap.Error(err)) + return + } + w.WriteHeader(http.StatusAccepted) + return + } + switch eventType { case "workflow_run": updateWorkflowMetrics(ctx, body) diff --git a/src/github_test.go b/src/github_test.go index 76c8659..dd4ce95 100644 --- a/src/github_test.go +++ b/src/github_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "context" "crypto/hmac" "crypto/sha256" "encoding/hex" @@ -10,8 +11,10 @@ import ( "os" "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func computeHMAC(message, secret []byte) string { @@ -103,7 +106,11 @@ func TestUnknownEvent(t *testing.T) { func TestDuplicateDeliveryIsIgnored(t *testing.T) { stateStore = newInMemoryStateStore() - defer func() { stateStore = nil }() + eventProcessor = nil + defer func() { + stateStore = nil + eventProcessor = nil + }() body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { @@ -116,3 +123,55 @@ func TestDuplicateDeliveryIsIgnored(t *testing.T) { recorder = sendTestRequest(body, "workflow_run") assert.Equal(t, http.StatusOK, recorder.Code) } + +func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) { + body, err := os.ReadFile("../test_data/workflow_run.json") + if err != nil { + t.Fatalf("Failed to read test data file: %v", err) + } + + processed := make(chan struct{}, 1) + eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) + eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + processed <- struct{}{} + } + eventProcessor.Start() + defer func() { + eventProcessor.Stop() + eventProcessor = nil + }() + + recorder := sendTestRequest(body, "workflow_run") + assert.Equal(t, http.StatusAccepted, recorder.Code) + + select { + case <-processed: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for async processing") + } +} + +func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) { + body, err := os.ReadFile("../test_data/workflow_run.json") + if err != nil { + t.Fatalf("Failed to read test data file: %v", err) + } + + blocker := make(chan struct{}) + eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop()) + eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + <-blocker + } + defer func() { + close(blocker) + eventProcessor.Stop() + eventProcessor = nil + }() + + if err := eventProcessor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":1}`)); err != nil { + t.Fatalf("unexpected enqueue error: %v", err) + } + + recorder := sendTestRequest(body, "workflow_run") + assert.Equal(t, http.StatusServiceUnavailable, recorder.Code) +} diff --git a/src/main.go b/src/main.go index 43fe516..772a1b9 100644 --- a/src/main.go +++ b/src/main.go @@ -170,6 +170,18 @@ func main() { } logRedisMode(logger, redisEnabled, redisConfig.Addr) + asyncConfig, err := newAsyncProcessorConfigFromEnv() + if err != nil { + logger.Fatal("Invalid async event processor configuration", zap.Error(err)) + } + eventProcessor = newAsyncEventProcessor(asyncConfig, logger) + eventProcessor.Start() + defer eventProcessor.Stop() + logger.Info("Async webhook processing enabled", + zap.Int("workerCount", asyncConfig.WorkerCount), + zap.Int("queueSize", asyncConfig.QueueSize), + ) + r := setupRouter(logger) server := &http.Server{ diff --git a/src/metrics.go b/src/metrics.go index 04bf733..e0d3444 100644 --- a/src/metrics.go +++ b/src/metrics.go @@ -105,4 +105,58 @@ var ( }, []string{"repository", "base_branch", "pull_request_status"}, ) + + asyncQueueDepthGauge = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "promgithub_event_queue_depth", + Help: "Current number of queued webhook events awaiting processing", + }, + ) + + asyncQueueCapacityGauge = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "promgithub_event_queue_capacity", + Help: "Configured capacity of the webhook event queue", + }, + ) + + asyncWorkerCountGauge = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "promgithub_event_worker_count", + Help: "Configured number of async webhook event workers", + }, + ) + + asyncProcessedEventsCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_event_processed_total", + Help: "Total number of webhook events processed asynchronously", + }, + []string{"event_type"}, + ) + + asyncEventsDroppedCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_event_dropped_total", + Help: "Total number of webhook events dropped before processing", + }, + []string{"event_type", "reason"}, + ) + + asyncProcessingFailuresCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_event_processing_failures_total", + Help: "Total number of async webhook processing failures", + }, + []string{"event_type"}, + ) + + asyncProcessingDurationHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "promgithub_event_processing_duration_seconds", + Help: "Duration of async webhook event processing", + Buckets: prometheus.DefBuckets, + }, + []string{"event_type"}, + ) )