Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/github/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2025 Canonical Ltd.
* See LICENSE file for licensing details.
*/

// Package github provides shared constants for GitHub webhook integration.
package github

const (
SignatureHeader = "X-Hub-Signature-256"
EventHeader = "X-GitHub-Event"
HookIDHeader = "X-GitHub-Hook-ID"
DeliveryHeader = "X-GitHub-Delivery"
HookInstallationTargetTypeHeader = "X-GitHub-Hook-Installation-Target-Type"
HookInstallationTargetIDHeader = "X-GitHub-Hook-Installation-Target-ID"
SignaturePrefix = "sha256="
)
37 changes: 30 additions & 7 deletions internal/planner/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"time"

"github.com/canonical/github-runner-operators/internal/database"
gh "github.com/canonical/github-runner-operators/internal/github"
"github.com/canonical/github-runner-operators/internal/queue"
"github.com/google/go-github/v82/github"
amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
)
Expand All @@ -27,9 +29,8 @@ type JobDatabase interface {
}

const (
platform = "github"
githubEventHeaderKey = "X-GitHub-Event"
maxBackoff = 5 * time.Minute
platform = "github"
maxBackoff = 5 * time.Minute
)

var supportedActions = []string{"queued", "in_progress", "completed"}
Expand Down Expand Up @@ -94,14 +95,14 @@ func getWorkflowJob(ctx context.Context, headers map[string]interface{}, body []
// Returns (nil, nil) for non-workflow_job events (should be ignored).
// Returns error only for malformed webhooks.
func parseWorkflowJobEvent(ctx context.Context, headers map[string]interface{}, body []byte) (*github.WorkflowJobEvent, error) {
eventTypeHeader, ok := headers[githubEventHeaderKey]
eventTypeHeader, ok := headers[gh.EventHeader]
if !ok {
return nil, fmt.Errorf("missing X-GitHub-Event header")
return nil, fmt.Errorf("missing %s header", gh.EventHeader)
}

eventType, ok := eventTypeHeader.(string)
if !ok {
return nil, fmt.Errorf("X-GitHub-Event must be string")
return nil, fmt.Errorf("%s must be string", gh.EventHeader)
}

event, err := github.ParseWebHook(eventType, body)
Expand All @@ -112,7 +113,7 @@ func parseWorkflowJobEvent(ctx context.Context, headers map[string]interface{},
jobEvent, ok := event.(*github.WorkflowJobEvent)
if !ok {
if eventType == "workflow_job" {
logger.WarnContext(ctx, "received workflow_job in \"X-GitHub-Event\" header but payload did not parse to expected type; possible GitHub API change or library issue")
logger.WarnContext(ctx, "received workflow_job event but payload did not parse to expected type; possible GitHub API change or library issue", "header_name", gh.EventHeader)
} else {
logger.DebugContext(ctx, "ignoring non-workflow_job event", "event_type", eventType)
}
Expand Down Expand Up @@ -270,21 +271,43 @@ func (c *JobConsumer) pullMessage(ctx context.Context) error {
ctx, span := trace.Start(ctx, "consume webhook")
defer span.End()

deliveryID, _ := msg.Headers[gh.DeliveryHeader].(string)
eventType, _ := msg.Headers[gh.EventHeader].(string)
span.SetAttributes(
attribute.String("github.delivery_id", deliveryID),
attribute.String("github.event", eventType),
)

job, err := getWorkflowJob(ctx, msg.Headers, msg.Body)
if err != nil {
logger.ErrorContext(ctx, "cannot parse webhook payload, discarding to DLQ", "error", err)
logger.DebugContext(ctx, "discarded webhook", "delivery_id", deliveryID)
span.RecordError(err)
c.metrics.ObserveWebhookError(ctx, platform)
c.discardMessage(ctx, &msg)
return err
}

if job == nil {
logger.DebugContext(ctx, "ignored webhook", "delivery_id", deliveryID)
c.metrics.ObserveDiscardedWebhook(ctx, platform)
c.ignoreMessage(ctx, &msg)
return nil
}

span.SetAttributes(
attribute.String("github.job_id", job.id),
attribute.String("github.repo", job.repo),
attribute.String("github.action", job.action),
)
logger.DebugContext(ctx, "consuming webhook",
"delivery_id", deliveryID,
"job_id", job.id,
"repo", job.repo,
"action", job.action,
"labels", strings.Join(job.labels, ","),
)

err = c.handleMessage(ctx, job)
if err == nil {
c.consumedMessage(ctx, &msg)
Expand Down
55 changes: 32 additions & 23 deletions internal/webhook/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,15 @@ import (
"net/http"
"time"

gh "github.com/canonical/github-runner-operators/internal/github"
"github.com/canonical/github-runner-operators/internal/queue"
"github.com/canonical/github-runner-operators/internal/server"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
)

const (
WebhookSignatureHeader = "X-Hub-Signature-256"
WebhookEventHeader = "X-GitHub-Event"
WebhookHookIDHeader = "X-GitHub-Hook-ID"
WebhookDeliveryHeader = "X-GitHub-Delivery"
WebhookHookInstallationTargetTypeHeader = "X-GitHub-Hook-Installation-Target-Type"
WebhookHookInstallationTargetIDHeader = "X-GitHub-Hook-Installation-Target-ID"
bodyLimit = 1048576
WebhookSignaturePrefix = "sha256="
)
const bodyLimit = 1048576

type httpError struct {
code int
Expand All @@ -57,7 +50,7 @@ type Handler struct {

func (h *Handler) receiveWebhook(ctx context.Context, r *http.Request) ([]byte, error) {
reader := io.LimitReader(r.Body, bodyLimit+1)
signature := r.Header.Get(WebhookSignatureHeader)
signature := r.Header.Get(gh.SignatureHeader)
if signature == "" {
logger.DebugContext(ctx, "missing signature header", "header", r.Header)
return nil, &httpError{code: http.StatusForbidden, message: "missing signature header"}
Expand All @@ -75,6 +68,9 @@ func (h *Handler) receiveWebhook(ctx context.Context, r *http.Request) ([]byte,
logger.DebugContext(ctx, "invalid signature", "signature", signature)
return nil, &httpError{code: http.StatusForbidden, message: "webhook contains invalid signature"}
}
deliveryID := r.Header.Get(gh.DeliveryHeader)
eventType := r.Header.Get(gh.EventHeader)
logger.DebugContext(ctx, "received webhook", "delivery_id", deliveryID, "event", eventType)
return body, nil
}

Expand All @@ -91,8 +87,12 @@ func (h *Handler) sendWebhook(ctx context.Context, githubHeaders map[string]stri
}
err := h.Producer.Push(ctx, rabbitHeaders, body)
if err != nil {
return fmt.Errorf("failed to send webhook: %v", err)
return fmt.Errorf("failed to send webhook: %w", err)
}
logger.DebugContext(ctx, "sent webhook to queue",
"delivery_id", githubHeaders[gh.DeliveryHeader],
"event", githubHeaders[gh.EventHeader],
)
return nil
}

Expand All @@ -109,21 +109,30 @@ func (h *Handler) serveHTTP(ctx context.Context, r *http.Request) error {
span.RecordError(err)
span.End()
return err
} else {
inboundWebhook.Add(ctx, 1)
span.End()
}
deliveryID := r.Header.Get(gh.DeliveryHeader)
eventType := r.Header.Get(gh.EventHeader)
span.SetAttributes(
attribute.String("github.delivery_id", deliveryID),
attribute.String("github.event", eventType),
)
inboundWebhook.Add(ctx, 1)
span.End()

// Extract GitHub headers from the request
githubHeaders := map[string]string{
WebhookEventHeader: r.Header.Get(WebhookEventHeader),
WebhookHookIDHeader: r.Header.Get(WebhookHookIDHeader),
WebhookDeliveryHeader: r.Header.Get(WebhookDeliveryHeader),
WebhookHookInstallationTargetTypeHeader: r.Header.Get(WebhookHookInstallationTargetTypeHeader),
WebhookHookInstallationTargetIDHeader: r.Header.Get(WebhookHookInstallationTargetIDHeader),
gh.EventHeader: r.Header.Get(gh.EventHeader),
gh.HookIDHeader: r.Header.Get(gh.HookIDHeader),
gh.DeliveryHeader: r.Header.Get(gh.DeliveryHeader),
gh.HookInstallationTargetTypeHeader: r.Header.Get(gh.HookInstallationTargetTypeHeader),
gh.HookInstallationTargetIDHeader: r.Header.Get(gh.HookInstallationTargetIDHeader),
}

ctx, span = trace.Start(ctx, "send webhook")
span.SetAttributes(
attribute.String("github.delivery_id", deliveryID),
attribute.String("github.event", eventType),
)
err = h.sendWebhook(ctx, githubHeaders, webhook)
if err != nil {
outboundWebhookErrors.Add(ctx, 1)
Expand Down Expand Up @@ -162,13 +171,13 @@ func (h *Handler) Webhook(w http.ResponseWriter, r *http.Request) {
}

func validateSignature(message []byte, secret string, signature string) bool {
if len(signature) < len(WebhookSignaturePrefix) {
if len(signature) < len(gh.SignaturePrefix) {
return false
}
if signature[:len(WebhookSignaturePrefix)] != WebhookSignaturePrefix {
if signature[:len(gh.SignaturePrefix)] != gh.SignaturePrefix {
return false
}
signatureWithoutPrefix := signature[len(WebhookSignaturePrefix):]
signatureWithoutPrefix := signature[len(gh.SignaturePrefix):]
h := hmac.New(sha256.New, []byte(secret))
h.Write(message)
sig, err := hex.DecodeString(signatureWithoutPrefix)
Expand Down
13 changes: 7 additions & 6 deletions internal/webhook/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"testing"

gh "github.com/canonical/github-runner-operators/internal/github"
"github.com/canonical/github-runner-operators/internal/telemetry"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestWebhookForwarded(t *testing.T) {
assert.Equal(t, 1, len(fakeProducer.Messages), "expected 1 message in queue")
assert.Equal(t, payload, string(fakeProducer.Messages[0]), "expected message body to match")
assert.Equal(t, 1, len(fakeProducer.Headers), "expected 1 header set")
assert.Equal(t, "workflow_job", fakeProducer.Headers[0]["X-GitHub-Event"], "expected X-GitHub-Event header to match")
assert.Equal(t, "workflow_job", fakeProducer.Headers[0][gh.EventHeader], "expected X-GitHub-Event header to match")
m := mr.Collect(t)
assert.Equal(t, 1.0, m.Counter(t, "github-runner.webhook.gateway.inbound"))
assert.Equal(t, 0.0, m.Counter(t, "github-runner.webhook.gateway.inbound.errors"))
Expand All @@ -75,8 +76,8 @@ func TestWebhookForwarded(t *testing.T) {
func setupRequest() *http.Request {
req := httptest.NewRequest(http.MethodPost, webhookPath, strings.NewReader(payload))
req.Header.Set("Content-Type", "application/json")
req.Header.Set(WebhookSignatureHeader, valid_signature_header)
req.Header.Set(WebhookEventHeader, "workflow_job")
req.Header.Set(gh.SignatureHeader, valid_signature_header)
req.Header.Set(gh.EventHeader, "workflow_job")

return req
}
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestWebhookInvalidSignature(t *testing.T) {
},
{
name: "Wrong Prefix",
signature: "mac256=" + valid_signature_header[len(WebhookSignaturePrefix):],
signature: "mac256=" + valid_signature_header[len(gh.SignaturePrefix):],
expectedResponseMessage: "invalid signature",
},
{
Expand Down Expand Up @@ -157,9 +158,9 @@ func TestWebhookInvalidSignature(t *testing.T) {
defer telemetry.ReleaseTestMetricReader(t)
req := setupRequest()
if tt.name == "Missing Signature Header" {
req.Header.Del(WebhookSignatureHeader)
req.Header.Del(gh.SignatureHeader)
} else {
req.Header.Set(WebhookSignatureHeader, tt.signature)
req.Header.Set(gh.SignatureHeader, tt.signature)
}
w := httptest.NewRecorder()

Expand Down
Loading