diff --git a/.github/.jira_sync_config.yaml b/.github/.jira_sync_config.yaml new file mode 100644 index 00000000..f897418d --- /dev/null +++ b/.github/.jira_sync_config.yaml @@ -0,0 +1,16 @@ +# See https://github.com/canonical/gh-jira-sync-bot for config +settings: + jira_project_key: "ISD" + + status_mapping: + opened: Untriaged + closed: done + not_planned: rejected + + add_gh_comment: true + + epic_key: ISD-3981 + + label_mapping: + bug: Bug + enhancement: Story diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 00000000..b7bef165 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,13 @@ +### Overview + + + +### Rationale + + + +### Checklist + +- [ ] The PR is tagged with appropriate label (`urgent`, `trivial`, `senior-review-required`, `documentation`). + + \ No newline at end of file diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 00000000..c2bcd9e7 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,31 @@ +name: Tests + +on: + pull_request: + workflow_call: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + name: Run Unit and Lint Tests + runs-on: [self-hosted-linux-amd64-noble-edge] + + steps: + - uses: actions/checkout@v5 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version: 1.24 + + - name: Ensure No Formatting Changes + run: | + go fmt ./... + git diff --exit-code + + - name: Build and Test + run: | + go test -v -cover -race ./... diff --git a/.github/workflows/webhook_gateway_tests.yaml b/.github/workflows/webhook_gateway_tests.yaml new file mode 100644 index 00000000..a6cb3670 --- /dev/null +++ b/.github/workflows/webhook_gateway_tests.yaml @@ -0,0 +1,38 @@ +name: Webhook Gateway Integration Tests +on: + pull_request: + paths: + - 'webhook-gateway/**' + - 'internal/**' + workflow_call: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + + +jobs: + integration-test: + runs-on: [self-hosted-linux-amd64-noble-edge] + name: Run Integration Tests + steps: + - uses: actions/checkout@v5 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version: 1.24 + - name: Setup rabbitmq in an OCI container + run: | + docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management + until curl -fs http://localhost:15672; + do echo "waiting for rabbit mq" + sleep 2 + done + - name: Run integration test + env: + RABBITMQ_CONNECT_STRING: amqp://guest:guest@localhost:5672/ + APP_PORT: 8080 + WEBHOOK_SECRET: fake-secret + run: | + go test -cover -v ./webhook-gateway -integration diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 00000000..a34c3c49 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1 @@ +* @cbartz @yhaliaw @javierdelapuente @yanksyoon @weiiwang01 @florentianayuwono diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..24276aa4 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,108 @@ +# Contribute + +## Overview + +This document explains the processes and practices recommended for contributing enhancements to the codebase. + +* Generally, before developing enhancements to this code base, you should consider [opening an issue](https://github.com/canonical/github-runner-operator/issues) explaining your use case. +* If you would like to chat with us about your use-cases or proposed implementation, you can reach us at [Canonical Charm Development Matrix public channel](https://matrix.to/#/#charmhub-charmdev:ubuntu.com) or [Discourse](https://discourse.charmhub.io/). +* All enhancements require review before being merged. Code review typically examines + * code quality + * test coverage + +## Code of conduct + +When contributing, you must abide by the +[Ubuntu Code of Conduct](https://ubuntu.com/community/ethos/code-of-conduct). + +## Submissions + +If you want to address an issue or a bug in this project, +notify in advance the people involved to avoid confusion; +also, reference the issue or bug number when you submit the changes. + +- [Fork](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/about-forks) + our [GitHub repository](https://github.com/canonical/github-runner-operators) + and add the changes to your fork, properly structuring your commits, + providing detailed commit messages and signing your commits. +- Make sure the updated project builds and runs without warnings or errors; + this includes linting, documentation, code and tests. +- Submit the changes as a + [pull request (PR)](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork). + +Your changes will be reviewed in due time; if approved, they will be eventually merged. + +### Describing pull requests + +To be properly considered, reviewed and merged, +your pull request must provide the following details: + +- **Title**: Summarize the change in a short, descriptive title. + +- **Overview**: Describe the problem that your pull request solves. + Mention any new features, bug fixes or refactoring. + +- **Rationale**: Explain why the change is needed. + + +- **Checklist**: Complete the following items: + + - The PR is tagged with appropriate label (`urgent`, `trivial`, `senior-review-required`, `documentation`). + +### Signing commits + +To improve contribution tracking, +we use the [Canonical contributor license agreement](https://assets.ubuntu.com/v1/ff2478d1-Canonical-HA-CLA-ANY-I_v1.2.pdf) +(CLA) as a legal sign-off, and we require all commits to have verified signatures. + +### Canonical contributor agreement + +Canonical welcomes contributions to this repository. Please check out our [contributor agreement](https://ubuntu.com/legal/contributors) if you’re interested in contributing to the solution. + +#### Verified signatures on commits + +All commits in a pull request must have cryptographic (verified) signatures. +To add signatures on your commits, follow the +[GitHub documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits). + + +## Develop + +For any problems with this charm, please [report bugs here](https://github.com/canonical/github-runner-operator/issues). + +The code can be downloaded as follows: + +```shell +git clone https://github.com/canonical/github-runner-operators.git +``` + +The code structure is as follows + +- `internal/`: Internal libraries for the applications +- `webhook-gateway`: The webhook gateway application code + + +### Test + +This project uses standard Go testing tools for unit tests and integration tests. +You can have a look at the GitHub actions workflows in `.github/workflows/` to see how the tests are run in CI. + +Run unit tests using: + +```shell +go test -race -v ./... +``` + +Run `webhook-gateway` integration tests using: + +```shell +APP_PORT=8080 WEBHOOK_SECRET=fake RABBITMQ_CONNECT_STRING="amqp://guest:guest@localhost:5672/" go test -cover -v ./webhook-gateway -integration +``` + +It assumes you have access to a RabbitMQ server running reachable at $RABBITMQ_CONNECT_STRING. +You can use `docker` to run a RabbitMQ server locally: + +```shell +docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management +``` + diff --git a/README.md b/README.md new file mode 100644 index 00000000..836514ba --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +# GitHub runner operators + + +![WIP](https://img.shields.io/badge/status-WIP-yellow) + +A monorepo containing charms to operate Self-Hosted GitHub Action Runners. + +At the moment, it contains initial code for the `webhook-gateway` +application, that receives and forwards GitHub webhooks to an AMQP queue. diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..31f1bc1e --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/canonical/mayfly + +go 1.24.6 + +require ( + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..ff6acd88 --- /dev/null +++ b/go.sum @@ -0,0 +1,14 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/queue/producer.go b/internal/queue/producer.go new file mode 100644 index 00000000..f8f57788 --- /dev/null +++ b/internal/queue/producer.go @@ -0,0 +1,131 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package queue + +import ( + "context" + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func (p *AmqpProducer) Push(ctx context.Context, headers map[string]interface{}, msg []byte) error { + p.mu.Lock() // Lock to prevent concurrent access to connection/channel object + err := p.resetConnectionOrChannelIfNecessary() + if err != nil { + p.mu.Unlock() + return err + } + + msgConfirmation, err := p.publishMsg(msg, headers) + if err != nil { + p.mu.Unlock() + return err + } + p.mu.Unlock() // Unlock to not unblock other Push calls while waiting for confirmation + err = waitForMsgConfirmation(ctx, msgConfirmation) + + return err +} + +func (p *AmqpProducer) resetConnectionOrChannelIfNecessary() error { + if p.amqpConnection == nil || p.amqpConnection.IsClosed() { + err := p.resetConnection() + if err != nil { + return err + } + } + + if p.amqpChannel == nil || p.amqpChannel.IsClosed() { + err := p.resetChannel() + if err != nil { + return err + } + } + return nil +} + +func waitForMsgConfirmation(ctx context.Context, confirmation confirmation) error { + select { + case <-ctx.Done(): + return ctx.Err() + + case <-confirmation.Done(): + if !confirmation.Acked() { + return fmt.Errorf("confirmation not acknowledged") + } + } + return nil +} + +func (p *AmqpProducer) publishMsg(msg []byte, headers map[string]interface{}) (confirmation, error) { + confirmation, err := p.amqpChannel.PublishWithDeferredConfirm( + "", // exchange + p.queueName, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: msg, + Headers: headers, + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to publish message: %w", err) + } + return confirmation, nil +} + +func (p *AmqpProducer) resetConnection() error { + conn, err := p.connectFunc(p.uri) + + if err != nil { + return fmt.Errorf("failed to connect to AMQP server: %w", err) + } + p.amqpConnection = conn + return nil +} + +func (p *AmqpProducer) resetChannel() error { + c, err := p.amqpConnection.Channel() + + if err != nil { + return fmt.Errorf("failed to open channel: %w", err) + } + p.amqpChannel = c + + err = c.Confirm(false) + if err != nil { + return fmt.Errorf("failed to put channel in confirm mode: %w", err) + } + + _, err = c.QueueDeclare( + p.queueName, // queueName + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare queue: %w", err) + } + return nil +} + +func NewAmqpProducer(uri string, queueName string) *AmqpProducer { + return &AmqpProducer{ + uri: uri, + queueName: queueName, + connectFunc: amqpConnect, + } +} + +func amqpConnect(uri string) (amqpConnection, error) { + amqpConnection, err := amqp.Dial(uri) + return &amqpConnectionWrapper{Connection: amqpConnection}, err +} diff --git a/internal/queue/producer_test.go b/internal/queue/producer_test.go new file mode 100644 index 00000000..4d25554f --- /dev/null +++ b/internal/queue/producer_test.go @@ -0,0 +1,366 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package queue + +import ( + "context" + "errors" + "testing" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" +) + +const queueName = "test-queue" +const queueWithPublishNoAck = "queue-with-publish-no-ack" +const queueWithPublishError = "queue-with-publish-error" +const queueWithPublishHangs = "queue-with-publish-hangs" +const queueWithDeclareError = "queue-with-declare-error" + +type MockAmqpChannel struct { + msgs [][]byte + headers []map[string]interface{} + isclosed bool + confirmMode bool + queueName string + queueDurable bool + confirmModeError bool +} + +func (ch *MockAmqpChannel) PublishWithDeferredConfirm(_ string, key string, _, _ bool, msg amqp.Publishing) (confirmation, error) { + + if key == queueWithPublishError { + return nil, errors.New("publish error") + } + ch.msgs = append(ch.msgs, msg.Body) + ch.headers = append(ch.headers, msg.Headers) + + done_ch := make(chan struct{}, 1) + + if key != queueWithPublishHangs { + done_ch <- struct{}{} + } + + ack := key != queueWithPublishNoAck + confirmation := &MockConfirmation{ + done: done_ch, + ack: ack, + } + return confirmation, nil +} + +func (ch *MockAmqpChannel) IsClosed() bool { + return ch.isclosed +} + +func (ch *MockAmqpChannel) Confirm(_ bool) error { + if ch.confirmModeError { + return errors.New("confirm error") + } + ch.confirmMode = true + return nil +} + +func (ch *MockAmqpChannel) QueueDeclare(name string, durable, _, _, _ bool, _ amqp.Table) (amqp.Queue, error) { + if name == queueWithDeclareError { + return amqp.Queue{}, errors.New("queue declare error") + } + ch.queueName = name + ch.queueDurable = durable + return amqp.Queue{}, nil +} + +type MockConfirmation struct { + done <-chan struct{} + ack bool +} + +func (c *MockConfirmation) Done() <-chan struct{} { + return c.done +} + +func (c *MockConfirmation) Acked() bool { + return c.ack +} + +type MockAmqpConnection struct { + channelCalls int + amqpChannel *MockAmqpChannel + isclosed bool + errMode bool + confirmModeError bool +} + +func (m *MockAmqpConnection) Channel() (amqpChannel, error) { + if m.errMode { + return nil, errors.New("failed to open channel") + } + m.channelCalls++ + m.amqpChannel = &MockAmqpChannel{ + confirmModeError: m.confirmModeError, + } + return m.amqpChannel, nil +} + +func (m *MockAmqpConnection) IsClosed() bool { + return m.isclosed +} + +func TestPush(t *testing.T) { + /* + arrange: create a queue with a fake amqp connection + act: push a message to the queue + assert: message was published to the amqp channel + */ + mockAmqpChannel := &MockAmqpChannel{} + amqpProducer := &AmqpProducer{ + amqpChannel: mockAmqpChannel, + amqpConnection: &MockAmqpConnection{ + amqpChannel: mockAmqpChannel, + }, + queueName: queueName, + } + + headers := map[string]interface{}{"header1": "value1"} + amqpProducer.Push(context.Background(), headers, []byte("TestMessage")) + + assert.Contains(t, mockAmqpChannel.headers, headers) + assert.Contains(t, mockAmqpChannel.msgs, []byte("TestMessage"), "expected message to be published") +} + +func TestPushFailure(t *testing.T) { + /* + arrange: create a queue with a fake confirm handler that always fails + act: push a message to the queue that will fail to publish + assert: the push return an error + */ + tests := []struct { + name string + queueName string + context context.Context + errMsg string + }{ + { + name: "message is not acked", + queueName: queueWithPublishNoAck, + errMsg: "confirmation not acknowledged", + context: context.Background(), + }, + { + name: "context is done", + context: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + queueName: queueWithPublishHangs, + errMsg: "context canceled", + }, + { + name: "publish returns error", + queueName: queueWithPublishError, + errMsg: "publish error", + context: context.Background(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockAmqpChannel := &MockAmqpChannel{} + amqpProducer := &AmqpProducer{ + amqpChannel: mockAmqpChannel, + amqpConnection: &MockAmqpConnection{ + amqpChannel: mockAmqpChannel, + }, + queueName: tt.queueName, + } + + err := amqpProducer.Push(tt.context, nil, []byte("TestMessage")) + + assert.Error(t, err, "expected error when message fails to publish") + assert.ErrorContains(t, err, tt.errMsg) + }) + } +} + +func TestPushNoChannel(t *testing.T) { + /* + arrange: create a queue with no amqp channel + act: push a message to the queue + assert: connection got re-established and message was published to the amqp channel + */ + tests := []struct { + name string + channel amqpChannel + }{ + { + name: "channel is nil", + channel: nil, + }, + { + name: "channel is closed", + channel: &MockAmqpChannel{ + isclosed: true, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockAmqpConnection := &MockAmqpConnection{} + amqpProducer := &AmqpProducer{ + amqpChannel: tt.channel, + amqpConnection: mockAmqpConnection, + } + + amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + assert.Equal(t, 1, mockAmqpConnection.channelCalls, "expected connection to be re-established") + assert.Contains(t, mockAmqpConnection.amqpChannel.msgs, []byte("TestMessage"), "expected message to be published") + }) + } +} + +func TestPushNoChannelFailure(t *testing.T) { + /* + arrange: create a queue with no amqp channel where the channel function fails + act: push a message to the queue + assert: push returns an error + */ + mockAmqpConnection := &MockAmqpConnection{ + amqpChannel: nil, + errMode: true, + } + amqpProducer := &AmqpProducer{ + amqpChannel: nil, + amqpConnection: mockAmqpConnection, + } + err := amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + assert.Error(t, err, "expected error when channel fails to open") + assert.ErrorContains(t, err, "failed to open channel") +} + +func TestPushNoConnection(t *testing.T) { + /* + arrange: create a queue with no amqp connection + act: push a message to the queue + assert: connection got established and message was published to the amqp channel + */ + tests := []struct { + name string + connection amqpConnection + }{ + { + name: "connection is nil", + connection: nil, + }, + { + name: "connection is closed", + connection: &MockAmqpConnection{ + isclosed: true, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockAmqpConnection := &MockAmqpConnection{ + amqpChannel: &MockAmqpChannel{}, + } + amqpProducer := &AmqpProducer{ + amqpConnection: tt.connection, + uri: "amqp://guest:guest@localhost:5672/", + connectFunc: func(uri string) (amqpConnection, error) { + return mockAmqpConnection, nil + }, + } + + amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + assert.Equal(t, 1, mockAmqpConnection.channelCalls, "expected connection to be re-established") + assert.Contains(t, mockAmqpConnection.amqpChannel.msgs, []byte("TestMessage"), "expected message to be published") + }) + } +} + +func TestPushNoConnectionFailure(t *testing.T) { + /* + arrange: create a queue with no amqp connection where the connect function fails + act: push a message to the queue + assert: push returns an error + */ + amqpProducer := &AmqpProducer{ + amqpConnection: nil, + uri: "amqp://guest:guest@localhost:5672/", + connectFunc: func(uri string) (amqpConnection, error) { + return nil, errors.New("connection error") + }, + } + + err := amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + assert.Error(t, err, "expected error when connection fails") + assert.ErrorContains(t, err, "connection error") +} + +func TestPushQueueDeclare(t *testing.T) { + /* + arrange: create a queue with no amqp channel + act: push a message to the queue + assert: channel with confirm mode is established and queue is declared + */ + mockAmqpConnection := &MockAmqpConnection{} + amqpProducer := &AmqpProducer{ + amqpChannel: nil, + amqpConnection: mockAmqpConnection, + queueName: queueName, + } + + amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + + assert.Equal(t, mockAmqpConnection.amqpChannel.confirmMode, true, "expected channel to be in confirm mode") + assert.Equal(t, mockAmqpConnection.amqpChannel.queueName, queueName, "expected queue name to be "+queueName) + assert.Equal(t, mockAmqpConnection.amqpChannel.queueDurable, true, "expected queue to be durable") +} + +func TestPushQueueDeclareFailure(t *testing.T) { + /* + arrange: create a queue with no amqp channel where the queue declare fails + act: push a message to the queue + assert: push returns an error + */ + tests := []struct { + name string + queueName string + mockAmqpConnection *MockAmqpConnection + errMsg string + }{ + { + name: "queue declare error", + queueName: queueWithDeclareError, + mockAmqpConnection: &MockAmqpConnection{}, + errMsg: "queue declare error", + }, + { + name: "confirm error", + queueName: queueName, + mockAmqpConnection: &MockAmqpConnection{ + confirmModeError: true, + }, + errMsg: "confirm error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + amqpProducer := &AmqpProducer{ + amqpChannel: nil, + amqpConnection: tt.mockAmqpConnection, + queueName: tt.queueName, + } + + err := amqpProducer.Push(context.Background(), nil, []byte("TestMessage")) + + assert.Error(t, err, "expected error when queue declare fails") + assert.ErrorContains(t, err, tt.errMsg) + }) + } +} diff --git a/internal/queue/types.go b/internal/queue/types.go new file mode 100644 index 00000000..33a58278 --- /dev/null +++ b/internal/queue/types.go @@ -0,0 +1,60 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package queue + +import ( + "context" + "sync" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type AmqpProducer struct { + amqpChannel amqpChannel + amqpConnection amqpConnection + connectFunc func(uri string) (amqpConnection, error) + uri string + queueName string + mu sync.Mutex +} + +type Producer interface { + Push(ctx context.Context, headers map[string]interface{}, msg []byte) error +} + +type amqpChannel interface { + PublishWithDeferredConfirm(exchange string, key string, mandatory, immediate bool, msg amqp.Publishing) (confirmation, error) + IsClosed() bool + Confirm(noWait bool) error + QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) +} + +type amqpConnection interface { + Channel() (amqpChannel, error) + IsClosed() bool +} + +type confirmation interface { + Done() <-chan struct{} + Acked() bool +} + +type amqpConnectionWrapper struct { + *amqp.Connection +} + +func (q *amqpConnectionWrapper) Channel() (amqpChannel, error) { + ch, err := q.Connection.Channel() + return &amqpChannelWrapper{Channel: ch}, err +} + +type amqpChannelWrapper struct { + *amqp.Channel +} + +func (ch *amqpChannelWrapper) PublishWithDeferredConfirm(exchange string, key string, mandatory, immediate bool, msg amqp.Publishing) (confirmation, error) { + return ch.Channel.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +} diff --git a/internal/webhook/server.go b/internal/webhook/server.go new file mode 100644 index 00000000..d55c5ea0 --- /dev/null +++ b/internal/webhook/server.go @@ -0,0 +1,67 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package webhook + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "io" + "log/slog" + "net/http" + + "github.com/canonical/mayfly/internal/queue" +) + +const WebhookSignatureHeader = "X-Hub-Signature-256" + +type Handler struct { + WebhookSecret string + Producer queue.Producer +} + +func (h *Handler) Webhook(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, 1048576) + defer r.Body.Close() + + ctx := r.Context() + signature := r.Header.Get(WebhookSignatureHeader) + if signature == "" { + slog.DebugContext(ctx, "missing signature header", "header", r.Header) + http.Error(w, "Missing signature header", http.StatusForbidden) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + slog.Error("unable to read request body", "error", err) + http.Error(w, "Unable to read request body", http.StatusInternalServerError) + return + } + + if !validateSignature(body, h.WebhookSecret, signature) { + slog.Debug("invalid signature", "signature", signature) + http.Error(w, "Invalid signature", http.StatusForbidden) + return + } + + err = h.Producer.Push(r.Context(), nil, body) + if err != nil { + slog.Error("unable to push message to queue", "error", err) + http.Error(w, "Unable to push to queue", http.StatusInternalServerError) + return + } +} + +func validateSignature(message []byte, secret string, signature string) bool { + h := hmac.New(sha256.New, []byte(secret)) + h.Write(message) + sig, err := hex.DecodeString(signature) + if err != nil { + return false + } + return hmac.Equal(h.Sum(nil), sig) +} diff --git a/internal/webhook/server_test.go b/internal/webhook/server_test.go new file mode 100644 index 00000000..40838fd8 --- /dev/null +++ b/internal/webhook/server_test.go @@ -0,0 +1,152 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package webhook + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +const webhookPath = "/webhook" +const payload = `{"message":"Hello, Alice!"}` +const secret = "fake-secret" +const valid_signature_header = "0aca2d7154cddad4f56f246cad61f1485df34b8056e10c4e4799494376fb3413" // HMAC SHA256 of body with secret "fake-secret" + +type FakeProducer struct { + Messages [][]byte +} + +func (q *FakeProducer) Push(_ context.Context, _ map[string]interface{}, msg []byte) error { + q.Messages = append(q.Messages, msg) + return nil +} + +type ErrorProducer struct{} + +func (q *ErrorProducer) Push(_ context.Context, _ map[string]interface{}, _ []byte) error { + return fmt.Errorf("queue error") +} + +func TestWebhookForwarded(t *testing.T) { + /* + arrange: create request with valid signature header + act: call WebhookHandler + assert: status 200, message was forwarded to queue + */ + + req := setupRequest() + fakeProducer := &FakeProducer{} + handler := Handler{ + WebhookSecret: secret, + Producer: fakeProducer, + } + w := httptest.NewRecorder() + handler.Webhook(w, req) + res := w.Result() + defer res.Body.Close() + + assert.Equal(t, http.StatusOK, res.StatusCode, "expected status 200 got %v", res.Status) + assert.NotNil(t, fakeProducer.Messages, "expected messages in queue") + assert.Equal(t, 1, len(fakeProducer.Messages), "expected 1 message in queue") + assert.Equal(t, payload, string(fakeProducer.Messages[0]), "expected message body to match") + +} + +func setupRequest() *http.Request { + req := httptest.NewRequest(http.MethodPost, webhookPath, strings.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(WebhookSignatureHeader, valid_signature_header) + + return req +} + +func TestWebhookQueueError(t *testing.T) { + /* + arrange: create request with valid signature header and a queue that returns an error + act: call WebhookHandler + assert: status 500 + */ + req := setupRequest() + w := httptest.NewRecorder() + errProducer := &ErrorProducer{} + + handler := Handler{ + WebhookSecret: secret, + Producer: errProducer, + } + handler.Webhook(w, req) + res := w.Result() + defer res.Body.Close() + + assert.Equal(t, http.StatusInternalServerError, res.StatusCode, "expected status 500 got %v", res.Status) +} + +func TestWebhookInvalidSignature(t *testing.T) { + /* + arrange: create invalid signature test cases + act: call webhook handler + assert: A 403 response is returned + */ + tests := []struct { + name string + signature string + expectedResponseMessage string + }{ + { + name: "Invalid Signature", + signature: "0aca2d7154cinvalid56f246cad61f1485df34b8056e10c4e4799494376fb3412", + expectedResponseMessage: "Invalid signature", + }, + { + name: "Non ASCII Signature", + signature: "非ASCII签名", + expectedResponseMessage: "Invalid signature", + }, + { + name: "Empty Signature", + signature: "", + expectedResponseMessage: "Missing signature header", + }, + { + name: "Missing Signature Header", + signature: "", + expectedResponseMessage: "Missing signature header", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := setupRequest() + if tt.name == "Missing Signature Header" { + req.Header.Del(WebhookSignatureHeader) + } else { + req.Header.Set(WebhookSignatureHeader, tt.signature) + } + w := httptest.NewRecorder() + + fakeProducer := &FakeProducer{} + handler := Handler{ + WebhookSecret: secret, + Producer: fakeProducer, + } + handler.Webhook(w, req) + res := w.Result() + defer res.Body.Close() + + assert.Equal(t, http.StatusForbidden, res.StatusCode, "expected status %v got %v", http.StatusForbidden, res.Status) + assert.NotNil(t, res.Body, "expected response body") + respBody, _ := io.ReadAll(res.Body) + assert.Contains(t, string(respBody), tt.expectedResponseMessage) + assert.Equal(t, 0, len(fakeProducer.Messages), "expected 0 message in queue") + }) + } +} diff --git a/webhook-gateway/main.go b/webhook-gateway/main.go new file mode 100644 index 00000000..0cc32c76 --- /dev/null +++ b/webhook-gateway/main.go @@ -0,0 +1,45 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +package main + +import ( + "log" + "net/http" + "os" + + "github.com/canonical/mayfly/internal/queue" + "github.com/canonical/mayfly/internal/webhook" +) + +const queueName = "webhook-queue" +const webhookPath = "/webhook" + +func main() { + + port, found := os.LookupEnv("APP_PORT") + if !found { + log.Fatalln("APP_PORT environment variable not set") + } + uri, found := os.LookupEnv("RABBITMQ_CONNECT_STRING") + if !found { + log.Panicf("RABBITMQ_CONNECT_STRING environment variable not set") + } + webhookSecret, found := os.LookupEnv("WEBHOOK_SECRET") + if !found { + log.Panicf("WEBHOOK_SECRET environment variable not set") + } + + p := queue.NewAmqpProducer(uri, queueName) + handler := &webhook.Handler{ + WebhookSecret: webhookSecret, + Producer: p, + } + + mux := http.NewServeMux() + mux.HandleFunc(webhookPath, handler.Webhook) + log.Fatal(http.ListenAndServe(":"+port, mux)) + log.Fatal(http.ListenAndServe(":"+port, nil)) +} diff --git a/webhook-gateway/webhook_test.go b/webhook-gateway/webhook_test.go new file mode 100644 index 00000000..f011cb33 --- /dev/null +++ b/webhook-gateway/webhook_test.go @@ -0,0 +1,144 @@ +/* + * Copyright 2025 Canonical Ltd. + * See LICENSE file for licensing details. + */ + +// integration test for the webhook-gateway application + +package main + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "flag" + "net/http" + "os" + "strings" + "testing" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" +) + +var integration = flag.Bool("integration", false, "Run integration tests") + +func TestHTTPRequestIsForwarded(t *testing.T) { + if !*integration { + t.Skip("skipping integration test") + } + const payload = `{"message":"Hello, Bob!"}` + + go main() + + secret := getSecretFromEnv(t) + sendPayloadToHTTPServer(t, payload, secret) + + amqpUri := getAmqpUriFromEnv(t) + msg := consumeMessage(t, amqpUri) + + assert.Equal(t, payload, msg, "expected message body to match") +} + +func getSecretFromEnv(t *testing.T) string { + secret := os.Getenv("WEBHOOK_SECRET") + if secret == "" { + t.Fatal("WEBHOOK_SECRET environment variable not set") + } + return secret +} + +func getAmqpUriFromEnv(t *testing.T) string { + uri := os.Getenv("RABBITMQ_CONNECT_STRING") + if uri == "" { + t.Fatal("RABBITMQ_CONNECT_STRING environment variable not set") + } + return uri +} + +func sendPayloadToHTTPServer(t *testing.T, payload string, secret string) { + req := createRequest(t, payload, secret) + postRequestUsingRetryBecauseServerMightNotYetBeUp(t, req) +} + +func postRequestUsingRetryBecauseServerMightNotYetBeUp(t *testing.T, req *http.Request) { + client := &http.Client{} + + var resp *http.Response + var err error + for i := 0; i < 5; i++ { + resp, err = client.Do(req) + if err == nil { + break + } + t.Logf("Retrying... (%d/5)", i+1) + time.Sleep(time.Duration((1 << i) * time.Second)) + } + if err != nil { + t.Fatalf("Failed to send request: %v. Server did probably not start up.", err) + } + defer resp.Body.Close() +} + +func createRequest(t *testing.T, payload string, secret string) *http.Request { + headers := map[string]string{ + "X-Hub-Signature-256": createSignature(payload, secret), + "Content-Type": "application/json", + } + req, err := http.NewRequest("POST", "http://localhost:8080/webhook", strings.NewReader(payload)) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + for k, v := range headers { + req.Header.Set(k, v) + } + return req +} + +func createSignature(message string, secret string) string { + h := hmac.New(sha256.New, []byte(secret)) + h.Write([]byte(message)) + return hex.EncodeToString(h.Sum(nil)) +} + +func consumeMessage(t *testing.T, amqpUri string) string { + conn, err := amqp.Dial(amqpUri) + if err != nil { + t.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + t.Fatalf("Failed to open a channel: %v", err) + } + defer ch.Close() + + _, err = ch.QueueDeclare( + "webhook-queue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + t.Fatalf("Failed to declare a queue: %v", err) + } + + deliveryChan, err := ch.Consume("webhook-queue", "", true, false, false, false, nil) + + if err != nil { + t.Fatalf("Failed to register a consumer: %v", err) + } + + var msg amqp.Delivery + select { + case <-time.After(10 * time.Second): + t.Fatal("Timeout waiting for message") + case msg = <-deliveryChan: + } + + return string(msg.Body) +}