Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package main

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

const (
defaultWorkerCount = 4
defaultQueueSize = 256
)

type eventHandler func(context.Context, []byte)

type webhookEvent struct {
ctx context.Context
eventType string
body []byte
}

type asyncProcessorConfig struct {
WorkerCount int
QueueSize int
}

type asyncEventProcessor struct {
queue chan webhookEvent
workers int
processFn map[string]eventHandler
logger *zap.Logger
wg sync.WaitGroup
}

func newAsyncProcessorConfigFromEnv() (asyncProcessorConfig, error) {
workers, err := parseEnvInt("PROMGITHUB_EVENT_WORKERS", defaultWorkerCount)
if err != nil {
return asyncProcessorConfig{}, err
}
if workers <= 0 {
return asyncProcessorConfig{}, errors.New("PROMGITHUB_EVENT_WORKERS must be greater than 0")
}

queueSize, err := parseEnvInt("PROMGITHUB_EVENT_QUEUE_SIZE", defaultQueueSize)
if err != nil {
return asyncProcessorConfig{}, err
}
if queueSize <= 0 {
return asyncProcessorConfig{}, errors.New("PROMGITHUB_EVENT_QUEUE_SIZE must be greater than 0")
}

return asyncProcessorConfig{WorkerCount: workers, QueueSize: queueSize}, nil
}

func newAsyncEventProcessor(cfg asyncProcessorConfig, logger *zap.Logger) *asyncEventProcessor {
processor := &asyncEventProcessor{
queue: make(chan webhookEvent, cfg.QueueSize),
workers: cfg.WorkerCount,
processFn: map[string]eventHandler{
"workflow_run": updateWorkflowMetrics,
"workflow_job": updateJobMetrics,
"push": func(_ context.Context, body []byte) { updateCommitMetrics(body) },
"pull_request": func(_ context.Context, body []byte) { updatePullRequestMetrics(body) },
},
logger: logger,
}

asyncWorkerCountGauge.Set(float64(cfg.WorkerCount))
asyncQueueCapacityGauge.Set(float64(cfg.QueueSize))
return processor
}

func (p *asyncEventProcessor) Start() {
for workerID := 0; workerID < p.workers; workerID++ {
p.wg.Add(1)
go p.runWorker(workerID)
}
}

func (p *asyncEventProcessor) Stop() {
if p == nil {
return
}
close(p.queue)
p.wg.Wait()
}

func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, body []byte) error {
event := webhookEvent{
ctx: ctx,
eventType: eventType,
body: append([]byte(nil), body...),
}

select {
case p.queue <- event:
asyncQueueDepthGauge.Set(float64(len(p.queue)))
return nil
default:
asyncEventsDroppedCounter.WithLabelValues(eventType, "queue_full").Inc()
asyncQueueDepthGauge.Set(float64(len(p.queue)))
return fmt.Errorf("event queue is full")
}
}

func (p *asyncEventProcessor) runWorker(workerID int) {
defer p.wg.Done()

for event := range p.queue {
asyncQueueDepthGauge.Set(float64(len(p.queue)))
start := time.Now()

processor, ok := p.processFn[event.eventType]
if !ok {
asyncEventsDroppedCounter.WithLabelValues(event.eventType, "unsupported_event").Inc()
continue
}

func() {
defer func() {
if recovered := recover(); recovered != nil {
asyncProcessingFailuresCounter.WithLabelValues(event.eventType).Inc()
p.logger.Error("Recovered from async event processor panic",
zap.Int("workerID", workerID),
zap.String("eventType", event.eventType),
zap.Any("panic", recovered),
)
}
}()

processor(event.ctx, event.body)
asyncProcessedEventsCounter.WithLabelValues(event.eventType).Inc()
asyncProcessingDurationHistogram.With(prometheus.Labels{"event_type": event.eventType}).Observe(time.Since(start).Seconds())
}()
}
}
68 changes: 68 additions & 0 deletions src/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"go.uber.org/zap"
)

func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {
asyncProcessedEventsCounter.Reset()
asyncEventsDroppedCounter.Reset()
asyncProcessingFailuresCounter.Reset()
asyncQueueDepthGauge.Set(0)
asyncQueueCapacityGauge.Set(0)
asyncWorkerCountGauge.Set(0)

processed := make(chan struct{}, 1)
processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}, zap.NewNop())
processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
processed <- struct{}{}
}
processor.Start()
defer processor.Stop()

if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{}`)); err != nil {
t.Fatalf("unexpected enqueue error: %v", err)
}

select {
case <-processed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for async processing")
}

if got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues("workflow_run")); got != 1 {
t.Fatalf("expected processed counter to be 1, got %v", got)
}
}

func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) {
asyncEventsDroppedCounter.Reset()
asyncQueueDepthGauge.Set(0)

blocker := make(chan struct{})
processor := newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
processor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
<-blocker
}
processor.Start()
defer func() {
close(blocker)
processor.Stop()
}()

if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":1}`)); err != nil {
t.Fatalf("unexpected enqueue error: %v", err)
}
if err := processor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":2}`)); err == nil {
t.Fatal("expected queue full error")
}

if got := testutil.ToFloat64(asyncEventsDroppedCounter.WithLabelValues("workflow_run", "queue_full")); got != 1 {
t.Fatalf("expected dropped counter to be 1, got %v", got)
}
}
15 changes: 14 additions & 1 deletion src/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ const (
statusCompleted = "completed"
)

var stateStore StateStore
var (
stateStore StateStore
eventProcessor *asyncEventProcessor
)

func validateHMAC(body []byte, signature string, secret []byte) bool {
h := hmac.New(sha256.New, secret)
Expand Down Expand Up @@ -153,6 +156,16 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) {
}

eventType := r.Header.Get("X-GitHub-Event")
if eventProcessor != nil {
if err := eventProcessor.Enqueue(ctx, eventType, body); err != nil {
http.Error(w, "Webhook queue is full", http.StatusServiceUnavailable)
logger.Warn("Dropping webhook event because queue is full", zap.String("eventType", eventType), zap.Error(err))
return
}
w.WriteHeader(http.StatusAccepted)
return
}

switch eventType {
case "workflow_run":
updateWorkflowMetrics(ctx, body)
Expand Down
61 changes: 60 additions & 1 deletion src/github_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
Expand All @@ -10,8 +11,10 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func computeHMAC(message, secret []byte) string {
Expand Down Expand Up @@ -103,7 +106,11 @@ func TestUnknownEvent(t *testing.T) {

func TestDuplicateDeliveryIsIgnored(t *testing.T) {
stateStore = newInMemoryStateStore()
defer func() { stateStore = nil }()
eventProcessor = nil
defer func() {
stateStore = nil
eventProcessor = nil
}()

body, err := os.ReadFile("../test_data/workflow_run.json")
if err != nil {
Expand All @@ -116,3 +123,55 @@ func TestDuplicateDeliveryIsIgnored(t *testing.T) {
recorder = sendTestRequest(body, "workflow_run")
assert.Equal(t, http.StatusOK, recorder.Code)
}

func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) {
body, err := os.ReadFile("../test_data/workflow_run.json")
if err != nil {
t.Fatalf("Failed to read test data file: %v", err)
}

processed := make(chan struct{}, 1)
eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
processed <- struct{}{}
}
eventProcessor.Start()
defer func() {
eventProcessor.Stop()
eventProcessor = nil
}()

recorder := sendTestRequest(body, "workflow_run")
assert.Equal(t, http.StatusAccepted, recorder.Code)

select {
case <-processed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for async processing")
}
}

func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) {
body, err := os.ReadFile("../test_data/workflow_run.json")
if err != nil {
t.Fatalf("Failed to read test data file: %v", err)
}

blocker := make(chan struct{})
eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}, zap.NewNop())
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
<-blocker
}
defer func() {
close(blocker)
eventProcessor.Stop()
eventProcessor = nil
}()

if err := eventProcessor.Enqueue(context.Background(), "workflow_run", []byte(`{"id":1}`)); err != nil {
t.Fatalf("unexpected enqueue error: %v", err)
}

recorder := sendTestRequest(body, "workflow_run")
assert.Equal(t, http.StatusServiceUnavailable, recorder.Code)
}
12 changes: 12 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ func main() {
}
logRedisMode(logger, redisEnabled, redisConfig.Addr)

asyncConfig, err := newAsyncProcessorConfigFromEnv()
if err != nil {
logger.Fatal("Invalid async event processor configuration", zap.Error(err))
}
eventProcessor = newAsyncEventProcessor(asyncConfig, logger)
eventProcessor.Start()
defer eventProcessor.Stop()
logger.Info("Async webhook processing enabled",
zap.Int("workerCount", asyncConfig.WorkerCount),
zap.Int("queueSize", asyncConfig.QueueSize),
)

r := setupRouter(logger)

server := &http.Server{
Expand Down
Loading
Loading