From 56b8b3db25f904f3f65bf69c0a7a2deabefaed4d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 24 Nov 2024 18:43:30 +0700 Subject: [PATCH 01/11] test: Test infra with shared testcontainers & persistent test containers --- .env.test | 3 + Makefile | 6 ++ build/test/compose.yml | 19 ++++++ cmd/e2e/configs/basic.go | 31 +++------ internal/util/awsutil/awsutil.go | 7 +++ internal/util/testinfra/aws.go | 91 +++++++++++++++++++++++++++ internal/util/testinfra/clickhouse.go | 82 ++++++++++++++++++++++++ internal/util/testinfra/testinfra.go | 72 +++++++++++++++++++++ 8 files changed, 288 insertions(+), 23 deletions(-) create mode 100644 .env.test create mode 100644 build/test/compose.yml create mode 100644 internal/util/testinfra/aws.go create mode 100644 internal/util/testinfra/clickhouse.go create mode 100644 internal/util/testinfra/testinfra.go diff --git a/.env.test b/.env.test new file mode 100644 index 00000000..fcab76a8 --- /dev/null +++ b/.env.test @@ -0,0 +1,3 @@ +TEST_CLICKHOUSE_URL="localhost:39000" +TEST_LOCALSTACK_URL="localhost:34566" +TEST_RABBITMQ_URL="localhost:35672" diff --git a/Makefile b/Makefile index 2aa0933d..bd387c54 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,12 @@ down/uptrace: up/portal: cd internal/portal && npm install && npm run dev +up/test: + docker-compose -f build/test/compose.yml up -d + +down/test: + docker-compose -f build/test/compose.yml down + test: go test $(TEST) $(TESTARGS) diff --git a/build/test/compose.yml b/build/test/compose.yml new file mode 100644 index 00000000..80533076 --- /dev/null +++ b/build/test/compose.yml @@ -0,0 +1,19 @@ +name: "outpost-test" + +services: + clickhouse: + image: clickhouse/clickhouse-server:24-alpine + ports: + - 39000:9000 + rabbitmq: + image: rabbitmq:3-management + ports: + - 35672:5672 + - 45672:15672 + aws: + image: localstack/localstack:latest + environment: + - SERVICES=sns,sts,sqs + ports: + - 34566:4566 + - 34571:4571 diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index 08d8eb9b..d494baef 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -3,9 +3,8 @@ package configs import ( "testing" - "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/config" - "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" ) @@ -17,27 +16,13 @@ func Basic(t *testing.T) (*config.Config, func(), error) { } } - // Testcontainer - chEndpoint, cleanupCH, err := testutil.StartTestContainerClickHouse() - if err != nil { - return nil, cleanup, err - } - cleanupFns = append(cleanupFns, cleanupCH) - - awsEndpoint, cleanupAWS, err := testutil.StartTestcontainerLocalstack() - if err != nil { - return nil, cleanup, err - } - cleanupFns = append(cleanupFns, cleanupAWS) + t.Cleanup(testinfra.Start()) // Config redisConfig := testutil.CreateTestRedisConfig(t) - clickHouseConfig := &clickhouse.ClickHouseConfig{ - Addr: chEndpoint, - Username: "default", - Password: "", - Database: "default", - } + clickHouseConfig := testinfra.NewClickHouseConfig(t) + deliveryMQConfig := testinfra.NewMQAWSConfig(t, nil) + logMQConfig := testinfra.NewMQAWSConfig(t, nil) return &config.Config{ Hostname: "outpost", @@ -49,11 +34,11 @@ func Basic(t *testing.T) (*config.Config, func(), error) { PortalProxyURL: "", Topics: testutil.TestTopics, Redis: redisConfig, - ClickHouse: clickHouseConfig, + ClickHouse: &clickHouseConfig, OpenTelemetry: nil, PublishQueueConfig: nil, - DeliveryQueueConfig: &mqs.QueueConfig{AWSSQS: &mqs.AWSSQSConfig{Endpoint: awsEndpoint, Region: "us-east-1", ServiceAccountCredentials: "test:test:", Topic: "delivery"}}, - LogQueueConfig: &mqs.QueueConfig{AWSSQS: &mqs.AWSSQSConfig{Endpoint: awsEndpoint, Region: "us-east-1", ServiceAccountCredentials: "test:test:", Topic: "log"}}, + DeliveryQueueConfig: &deliveryMQConfig, + LogQueueConfig: &logMQConfig, PublishMaxConcurrency: 3, DeliveryMaxConcurrency: 3, LogMaxConcurrency: 3, diff --git a/internal/util/awsutil/awsutil.go b/internal/util/awsutil/awsutil.go index 5fc56cac..28e05af3 100644 --- a/internal/util/awsutil/awsutil.go +++ b/internal/util/awsutil/awsutil.go @@ -82,3 +82,10 @@ func GetQueueARN(ctx context.Context, sqsClient *sqs.Client, queueURL string) (s queueARN := attributesOutput.Attributes[string(types.QueueAttributeNameQueueArn)] return queueARN, nil } + +func DeleteQueue(ctx context.Context, sqsClient *sqs.Client, queueURL string) error { + _, err := sqsClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{ + QueueUrl: aws.String(queueURL), + }) + return err +} diff --git a/internal/util/testinfra/aws.go b/internal/util/testinfra/aws.go new file mode 100644 index 00000000..1cca8656 --- /dev/null +++ b/internal/util/testinfra/aws.go @@ -0,0 +1,91 @@ +package testinfra + +import ( + "context" + "log" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/awsutil" + "github.com/testcontainers/testcontainers-go/modules/localstack" +) + +func NewMQAWSConfig(t *testing.T, attributes map[string]string) mqs.QueueConfig { + queueConfig := mqs.QueueConfig{ + AWSSQS: &mqs.AWSSQSConfig{ + Endpoint: ensureLocalStack(), + Region: "us-east-1", + ServiceAccountCredentials: "test:test:", + Topic: uuid.New().String(), + }, + } + ctx := context.Background() + if _, err := DeclareTestAWSInfrastructure(ctx, queueConfig.AWSSQS, attributes); err != nil { + panic(err) + } + t.Cleanup(func() { + if err := TeardownTestAWSInfrastructure(ctx, queueConfig.AWSSQS); err != nil { + log.Println("Failed to teardown AWS infrastructure", err, *queueConfig.AWSSQS) + } + }) + return queueConfig +} + +func ensureLocalStack() string { + cfg := ReadConfig() + if cfg.LocalStackURL != "" { + return cfg.LocalStackURL + } + + ctx := context.Background() + + localstackContainer, err := localstack.Run(ctx, + "localstack/localstack:latest", + ) + + if err != nil { + panic(err) + } + + endpoint, err := localstackContainer.PortEndpoint(ctx, "4566/tcp", "") + if err != nil { + panic(err) + } + if !strings.Contains(endpoint, "http://") { + endpoint = "http://" + endpoint + } + log.Printf("Localstack running at %s", endpoint) + cfg.LocalStackURL = endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := localstackContainer.Terminate(ctx); err != nil { + log.Println("Failed to terminate localstack container", err) + } + }) + return endpoint +} + +func DeclareTestAWSInfrastructure(ctx context.Context, cfg *mqs.AWSSQSConfig, attributes map[string]string) (string, error) { + sqsClient, err := awsutil.SQSClientFromConfig(ctx, cfg) + if err != nil { + return "", err + } + queueURL, err := awsutil.EnsureQueue(ctx, sqsClient, cfg.Topic, awsutil.MakeCreateQueue(attributes)) + if err != nil { + return "", err + } + return queueURL, nil +} + +func TeardownTestAWSInfrastructure(ctx context.Context, cfg *mqs.AWSSQSConfig) error { + sqsClient, err := awsutil.SQSClientFromConfig(ctx, cfg) + if err != nil { + return err + } + queueURL, err := awsutil.EnsureQueue(ctx, sqsClient, cfg.Topic, nil) + if err != nil { + return err + } + return awsutil.DeleteQueue(ctx, sqsClient, queueURL) +} diff --git a/internal/util/testinfra/clickhouse.go b/internal/util/testinfra/clickhouse.go new file mode 100644 index 00000000..112ad0c7 --- /dev/null +++ b/internal/util/testinfra/clickhouse.go @@ -0,0 +1,82 @@ +package testinfra + +import ( + "context" + "log" + "testing" + + "github.com/hookdeck/outpost/internal/clickhouse" + "github.com/hookdeck/outpost/internal/util/testutil" + chTestcontainer "github.com/testcontainers/testcontainers-go/modules/clickhouse" +) + +func NewClickHouseConfig(t *testing.T) clickhouse.ClickHouseConfig { + chConfig := clickhouse.ClickHouseConfig{ + Addr: ensureClickHouse(), + Username: "default", + Password: "", + Database: "default", + } + database := "test_" + testutil.RandomString(10) + initDB(&chConfig, database) + t.Cleanup(func() { + clearDB(chConfig, database) + }) + return chConfig +} + +func initDB(chConfig *clickhouse.ClickHouseConfig, database string) { + chDB, err := clickhouse.New(chConfig) + if err != nil { + panic(err) + } + if err := chDB.Exec(context.Background(), "CREATE DATABASE IF NOT EXISTS "+database); err != nil { + log.Println("cmd", "CREATE DATABASE IF NOT EXISTS "+database) + panic(err) + } + chConfig.Database = database +} + +func clearDB(chConfig clickhouse.ClickHouseConfig, database string) { + chConfig.Database = "default" // ensure connecting to default DB + chDB, err := clickhouse.New(&chConfig) + if err != nil { + panic(err) + } + if err := chDB.Exec(context.Background(), "DROP DATABASE "+database); err != nil { + panic(err) + } +} + +func ensureClickHouse() string { + cfg := ReadConfig() + if cfg.ClickHouseURL != "" { + return cfg.ClickHouseURL + } + + // Start testcontainer + ctx := context.Background() + + clickHouseContainer, err := chTestcontainer.Run(ctx, + "clickhouse/clickhouse-server:latest", + chTestcontainer.WithUsername("default"), + chTestcontainer.WithPassword(""), + chTestcontainer.WithDatabase("default"), + ) + if err != nil { + panic(err) + } + + endpoint, err := clickHouseContainer.PortEndpoint(ctx, "9000/tcp", "") + if err != nil { + panic(err) + } + log.Printf("ClickHouse running at %s", endpoint) + cfg.ClickHouseURL = endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := clickHouseContainer.Terminate(ctx); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }) + return endpoint +} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go new file mode 100644 index 00000000..9a920982 --- /dev/null +++ b/internal/util/testinfra/testinfra.go @@ -0,0 +1,72 @@ +package testinfra + +import ( + "log" + "strings" + "sync" + + "github.com/spf13/viper" +) + +var ( + suiteCounter = 0 + cfgSync sync.Once + cfg *Config +) + +type Config struct { + TestInfra bool + ClickHouseURL string + LocalStackURL string + RabbitMQURL string + cleanupFns []func() +} + +func initConfig() { + v := viper.New() + v.AutomaticEnv() + v.SetConfigFile("../../.env.test") + v.SetConfigType("env") + if err := v.ReadInConfig(); err != nil { + panic(err) + } + + if v.GetBool("TESTINFRA") { + localstackURL := v.GetString("TEST_LOCALSTACK_URL") + if !strings.Contains(localstackURL, "http://") { + localstackURL = "http://" + localstackURL + } + cfg = &Config{ + TestInfra: v.GetBool("TESTINFRA"), + ClickHouseURL: v.GetString("TEST_CLICKHOUSE_URL"), + LocalStackURL: localstackURL, + RabbitMQURL: v.GetString("TEST_RABBITMQ_URL"), + } + return + } + + cfg = &Config{ + TestInfra: v.GetBool("TESTINFRA"), + ClickHouseURL: "", + LocalStackURL: "", + RabbitMQURL: "", + } +} + +func ReadConfig() *Config { + cfgSync.Do(initConfig) + return cfg +} + +func Start() func() { + suiteCounter += 1 + return func() { + suiteCounter -= 1 + if suiteCounter == 0 { + log.Println("cleaning up", len(cfg.cleanupFns)) + for _, fn := range cfg.cleanupFns { + fn() + } + } + } +} From 2e01a442f542fb5eb69019d69e7f9153fcd3a41d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 24 Nov 2024 18:56:54 +0700 Subject: [PATCH 02/11] test: Refactor infra to support parallel tests --- internal/models/log_test.go | 35 +++++++++------------------ internal/util/testinfra/aws.go | 13 +++++++--- internal/util/testinfra/clickhouse.go | 14 ++++++++--- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/models/log_test.go b/internal/models/log_test.go index 2cbe81c9..7f4269bb 100644 --- a/internal/models/log_test.go +++ b/internal/models/log_test.go @@ -7,34 +7,25 @@ import ( "testing" "time" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func setupClickHouseConnection(t *testing.T) (clickhouse.Conn, func()) { - endpoint, cleanup, err := testutil.StartTestContainerClickHouse() - require.NoError(t, err) +func setupClickHouseConnection(t *testing.T) clickhouse.DB { + t.Cleanup(testinfra.Start()) - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{endpoint}, - Auth: clickhouse.Auth{ - Database: "default", - Username: "default", - Password: "", - }, - // Debug: true, - // Debugf: func(format string, v ...any) { - // fmt.Printf(format+"\n", v...) - // }, - }) + chConfig := testinfra.NewClickHouseConfig(t) + + chDB, err := clickhouse.New(&chConfig) require.NoError(t, err) ctx := context.Background() - require.NoError(t, conn.Exec(ctx, ` + require.NoError(t, chDB.Exec(ctx, ` CREATE TABLE IF NOT EXISTS events ( id String, tenant_id String, @@ -48,7 +39,7 @@ func setupClickHouseConnection(t *testing.T) (clickhouse.Conn, func()) { ENGINE = MergeTree ORDER BY (id, time); `)) - require.NoError(t, conn.Exec(ctx, ` + require.NoError(t, chDB.Exec(ctx, ` CREATE TABLE IF NOT EXISTS deliveries ( id String, delivery_event_id String, @@ -61,7 +52,7 @@ func setupClickHouseConnection(t *testing.T) (clickhouse.Conn, func()) { ORDER BY (id, time); `)) - return conn, cleanup + return chDB } func TestIntegrationLogStore_EventCRUD(t *testing.T) { @@ -71,8 +62,7 @@ func TestIntegrationLogStore_EventCRUD(t *testing.T) { t.Parallel() - conn, cleanup := setupClickHouseConnection(t) - defer cleanup() + conn := setupClickHouseConnection(t) ctx := context.Background() logStore := models.NewLogStore(conn) @@ -142,8 +132,7 @@ func TestIntegrationLogStore_DeliveryCRUD(t *testing.T) { t.Parallel() - conn, cleanup := setupClickHouseConnection(t) - defer cleanup() + conn := setupClickHouseConnection(t) ctx := context.Background() logStore := models.NewLogStore(conn) diff --git a/internal/util/testinfra/aws.go b/internal/util/testinfra/aws.go index 1cca8656..7e5f4817 100644 --- a/internal/util/testinfra/aws.go +++ b/internal/util/testinfra/aws.go @@ -4,6 +4,7 @@ import ( "context" "log" "strings" + "sync" "testing" "github.com/google/uuid" @@ -33,12 +34,19 @@ func NewMQAWSConfig(t *testing.T, attributes map[string]string) mqs.QueueConfig return queueConfig } +var localstackOnce sync.Once + func ensureLocalStack() string { cfg := ReadConfig() - if cfg.LocalStackURL != "" { - return cfg.LocalStackURL + if cfg.LocalStackURL == "" { + localstackOnce.Do(func() { + startLocalStackTestContainer(cfg) + }) } + return cfg.LocalStackURL +} +func startLocalStackTestContainer(cfg *Config) { ctx := context.Background() localstackContainer, err := localstack.Run(ctx, @@ -63,7 +71,6 @@ func ensureLocalStack() string { log.Println("Failed to terminate localstack container", err) } }) - return endpoint } func DeclareTestAWSInfrastructure(ctx context.Context, cfg *mqs.AWSSQSConfig, attributes map[string]string) (string, error) { diff --git a/internal/util/testinfra/clickhouse.go b/internal/util/testinfra/clickhouse.go index 112ad0c7..f76467eb 100644 --- a/internal/util/testinfra/clickhouse.go +++ b/internal/util/testinfra/clickhouse.go @@ -3,6 +3,7 @@ package testinfra import ( "context" "log" + "sync" "testing" "github.com/hookdeck/outpost/internal/clickhouse" @@ -48,13 +49,19 @@ func clearDB(chConfig clickhouse.ClickHouseConfig, database string) { } } +var chOnce sync.Once + func ensureClickHouse() string { cfg := ReadConfig() - if cfg.ClickHouseURL != "" { - return cfg.ClickHouseURL + if cfg.ClickHouseURL == "" { + chOnce.Do(func() { + startCHTestcontainer(cfg) + }) } + return cfg.ClickHouseURL +} - // Start testcontainer +func startCHTestcontainer(cfg *Config) { ctx := context.Background() clickHouseContainer, err := chTestcontainer.Run(ctx, @@ -78,5 +85,4 @@ func ensureClickHouse() string { log.Printf("failed to terminate container: %s", err) } }) - return endpoint } From 2d72a0434651f599001e9b27ec7c01ef6db10187 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 24 Nov 2024 19:17:38 +0700 Subject: [PATCH 03/11] test: Fix issue finding .env.test at project root dir --- internal/util/testinfra/testinfra.go | 31 +++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index 9a920982..f64a8696 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -2,6 +2,8 @@ package testinfra import ( "log" + "os" + "path/filepath" "strings" "sync" @@ -23,9 +25,14 @@ type Config struct { } func initConfig() { + projectRoot, err := findProjectRoot() + if err != nil { + panic(err) + } + v := viper.New() v.AutomaticEnv() - v.SetConfigFile("../../.env.test") + v.SetConfigFile(filepath.Join(projectRoot, ".env.test")) v.SetConfigType("env") if err := v.ReadInConfig(); err != nil { panic(err) @@ -70,3 +77,25 @@ func Start() func() { } } } + +func findProjectRoot() (string, error) { + // Start from the current working directory + dir, err := os.Getwd() + if err != nil { + return "", err + } + + // Traverse up the directory tree until the project root is found + for { + if _, err := os.Stat(filepath.Join(dir, ".env.test")); err == nil { + return dir, nil + } + parentDir := filepath.Dir(dir) + if parentDir == dir { + break + } + dir = parentDir + } + + return "", os.ErrNotExist +} From 2c06d852bf752850ff776f56e475fdeb296f2353 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 24 Nov 2024 19:21:49 +0700 Subject: [PATCH 04/11] test: RabbitMQ infra --- .../adapters/rabbitmq/rabbitmq_test.go | 69 +++++-------------- internal/util/testinfra/rabbitmq.go | 68 ++++++++++++++++++ internal/util/testinfra/testinfra.go | 6 +- internal/util/testutil/rabbitmq.go | 21 ++++++ 4 files changed, 111 insertions(+), 53 deletions(-) create mode 100644 internal/util/testinfra/rabbitmq.go diff --git a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go index 27a8896a..d2672756 100644 --- a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go +++ b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/hookdeck/outpost/internal/destinationadapter/adapters" "github.com/hookdeck/outpost/internal/destinationadapter/adapters/rabbitmq" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" @@ -129,15 +130,8 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { t.Parallel() - rabbitmqURL, terminate, err := testutil.StartTestcontainerRabbitMQ() - require.Nil(t, err) - defer terminate() - - RABBIT_SERVER_URL := rabbitmqURL - const ( - RABBIT_EXCHANGE = "destination_exchange" - RABBIT_QUEUE = "destination_queue_test" - ) + t.Cleanup(testinfra.Start()) + mq := testinfra.NewMQRabbitMQConfig(t) rabbitmqDestination := rabbitmq.New() @@ -145,12 +139,12 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { ID: uuid.New().String(), Type: "rabbitmq", Config: map[string]string{ - "server_url": testutil.ExtractRabbitURL(RABBIT_SERVER_URL), - "exchange": RABBIT_EXCHANGE, + "server_url": testutil.ExtractRabbitURL(mq.RabbitMQ.ServerURL), + "exchange": mq.RabbitMQ.Exchange, }, Credentials: map[string]string{ - "username": testutil.ExtractRabbitUsername(RABBIT_SERVER_URL), - "password": testutil.ExtractRabbitPassword(RABBIT_SERVER_URL), + "username": testutil.ExtractRabbitUsername(mq.RabbitMQ.ServerURL), + "password": testutil.ExtractRabbitPassword(mq.RabbitMQ.ServerURL), }, } @@ -174,44 +168,19 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { cancelChan := make(chan bool) msgChan := make(chan *amqp091.Delivery) go func() { - conn, _ := amqp091.Dial(RABBIT_SERVER_URL) + conn, _ := amqp091.Dial(mq.RabbitMQ.ServerURL) defer conn.Close() ch, _ := conn.Channel() defer ch.Close() - ch.ExchangeDeclare( - RABBIT_EXCHANGE, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments - ) - q, _ := ch.QueueDeclare( - RABBIT_QUEUE, // name - false, // durable - false, // delete when unused - true, // exclusive - false, // no-wait - nil, // arguments - ) - ch.QueueBind( - q.Name, // queue name - "", // routing key - RABBIT_EXCHANGE, // exchange - false, - nil, - ) - msgs, _ := ch.Consume( - RABBIT_QUEUE, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + mq.RabbitMQ.Queue, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args ) log.Println("ready to receive messages") @@ -229,8 +198,7 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { <-readyChan log.Println("publishing message") - err = rabbitmqDestination.Publish(context.Background(), destination, event) - assert.Nil(t, err) + assert.NoError(t, rabbitmqDestination.Publish(context.Background(), destination, event)) func() { time.Sleep(time.Second / 2) @@ -244,10 +212,7 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { } log.Println("message received", msg) body := make(map[string]interface{}) - err = json.Unmarshal(msg.Body, &body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, json.Unmarshal(msg.Body, &body)) assert.Equal(t, event.Data, body) // metadata assert.Equal(t, "metadatavalue", msg.Headers["my_metadata"]) diff --git a/internal/util/testinfra/rabbitmq.go b/internal/util/testinfra/rabbitmq.go new file mode 100644 index 00000000..0f92a7d8 --- /dev/null +++ b/internal/util/testinfra/rabbitmq.go @@ -0,0 +1,68 @@ +package testinfra + +import ( + "context" + "log" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/testcontainers/testcontainers-go/modules/rabbitmq" +) + +func NewMQRabbitMQConfig(t *testing.T) mqs.QueueConfig { + queueConfig := mqs.QueueConfig{ + RabbitMQ: &mqs.RabbitMQConfig{ + ServerURL: ensureRabbitMQ(), + Exchange: uuid.New().String(), + Queue: uuid.New().String(), + }, + } + ctx := context.Background() + if err := testutil.DeclareTestRabbitMQInfrastructure(ctx, queueConfig.RabbitMQ); err != nil { + panic(err) + } + t.Cleanup(func() { + if err := testutil.TeardownTestRabbitMQInfrastructure(ctx, queueConfig.RabbitMQ); err != nil { + log.Println("Failed to teardown RabbitMQ infrastructure", err, *queueConfig.RabbitMQ) + } + }) + return queueConfig +} + +var rabbitmqOnce sync.Once + +func ensureRabbitMQ() string { + cfg := ReadConfig() + if cfg.RabbitMQURL == "" { + rabbitmqOnce.Do(func() { + startRabbitMQTestContainer(cfg) + }) + } + return cfg.RabbitMQURL +} + +func startRabbitMQTestContainer(cfg *Config) { + ctx := context.Background() + + rabbitmqContainer, err := rabbitmq.Run(ctx, + "rabbitmq:3-management-alpine", + ) + if err != nil { + panic(err) + } + + endpoint, err := rabbitmqContainer.PortEndpoint(ctx, "5672/tcp", "") + if err != nil { + panic(err) + } + log.Printf("RabbitMQ running at %s", endpoint) + cfg.RabbitMQURL = "amqp://guest:guest@" + endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := rabbitmqContainer.Terminate(ctx); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }) +} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index f64a8696..cb0c945b 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -43,11 +43,15 @@ func initConfig() { if !strings.Contains(localstackURL, "http://") { localstackURL = "http://" + localstackURL } + rabbitmqURL := v.GetString("TEST_RABBITMQ_URL") + if !strings.Contains(rabbitmqURL, "amqp://") { + rabbitmqURL = "amqp://guest:guest@" + rabbitmqURL + } cfg = &Config{ TestInfra: v.GetBool("TESTINFRA"), ClickHouseURL: v.GetString("TEST_CLICKHOUSE_URL"), LocalStackURL: localstackURL, - RabbitMQURL: v.GetString("TEST_RABBITMQ_URL"), + RabbitMQURL: rabbitmqURL, } return } diff --git a/internal/util/testutil/rabbitmq.go b/internal/util/testutil/rabbitmq.go index 5356c568..5654a171 100644 --- a/internal/util/testutil/rabbitmq.go +++ b/internal/util/testutil/rabbitmq.go @@ -12,6 +12,7 @@ func DeclareTestRabbitMQInfrastructure(ctx context.Context, config *mqs.RabbitMQ if err != nil { return err } + defer conn.Close() ch, err := conn.Channel() if err != nil { return err @@ -48,3 +49,23 @@ func DeclareTestRabbitMQInfrastructure(ctx context.Context, config *mqs.RabbitMQ nil, ) } + +func TeardownTestRabbitMQInfrastructure(ctx context.Context, cfg *mqs.RabbitMQConfig) error { + conn, err := amqp091.Dial(cfg.ServerURL) + if err != nil { + return err + } + defer conn.Close() + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + if _, err := ch.QueueDelete(cfg.Queue, false, false, false); err != nil { + return err + } + if err := ch.ExchangeDelete(cfg.Exchange, false, false); err != nil { + return err + } + return nil +} From f82b8750d250a5af0478412eab3efee937795e9f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 18:06:13 +0700 Subject: [PATCH 05/11] refactor: Confirm integration test before starting testinfra --- cmd/e2e/configs/basic.go | 2 +- .../destinationadapter/adapters/rabbitmq/rabbitmq_test.go | 2 +- internal/models/log_test.go | 2 +- internal/util/testinfra/testinfra.go | 5 ++++- internal/util/testutil/testutil.go | 6 ++++++ 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index d494baef..d960bb64 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -16,7 +16,7 @@ func Basic(t *testing.T) (*config.Config, func(), error) { } } - t.Cleanup(testinfra.Start()) + t.Cleanup(testinfra.Start(t)) // Config redisConfig := testutil.CreateTestRedisConfig(t) diff --git a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go index d2672756..f0c9ba35 100644 --- a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go +++ b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go @@ -130,7 +130,7 @@ func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { t.Parallel() - t.Cleanup(testinfra.Start()) + t.Cleanup(testinfra.Start(t)) mq := testinfra.NewMQRabbitMQConfig(t) rabbitmqDestination := rabbitmq.New() diff --git a/internal/models/log_test.go b/internal/models/log_test.go index 7f4269bb..83f12fdc 100644 --- a/internal/models/log_test.go +++ b/internal/models/log_test.go @@ -17,7 +17,7 @@ import ( ) func setupClickHouseConnection(t *testing.T) clickhouse.DB { - t.Cleanup(testinfra.Start()) + t.Cleanup(testinfra.Start(t)) chConfig := testinfra.NewClickHouseConfig(t) diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index cb0c945b..83eb1d4f 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -6,7 +6,9 @@ import ( "path/filepath" "strings" "sync" + "testing" + "github.com/hookdeck/outpost/internal/util/testutil" "github.com/spf13/viper" ) @@ -69,7 +71,8 @@ func ReadConfig() *Config { return cfg } -func Start() func() { +func Start(t *testing.T) func() { + testutil.CheckIntegrationTest(t) suiteCounter += 1 return func() { suiteCounter -= 1 diff --git a/internal/util/testutil/testutil.go b/internal/util/testutil/testutil.go index a6aa03c8..bac38aef 100644 --- a/internal/util/testutil/testutil.go +++ b/internal/util/testutil/testutil.go @@ -22,6 +22,12 @@ var TestTopics = []string{ "user.updated", } +func CheckIntegrationTest(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } +} + func CreateTestRedisConfig(t *testing.T) *internalredis.RedisConfig { mr := miniredis.RunT(t) From 807fa7c41599a145e737755066ab62104ada75af Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 18:18:06 +0700 Subject: [PATCH 06/11] test: Apply testinfra to destination adapter integration tests --- .../adapters/aws/aws_test.go | 74 ++++--------------- .../adapters/rabbitmq/rabbitmq_test.go | 7 +- 2 files changed, 14 insertions(+), 67 deletions(-) diff --git a/internal/destinationadapter/adapters/aws/aws_test.go b/internal/destinationadapter/adapters/aws/aws_test.go index bbcad6cf..284bce29 100644 --- a/internal/destinationadapter/adapters/aws/aws_test.go +++ b/internal/destinationadapter/adapters/aws/aws_test.go @@ -3,21 +3,17 @@ package aws_test import ( "context" "encoding/json" - "errors" "log" "testing" "time" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/aws/smithy-go" "github.com/google/uuid" "github.com/hookdeck/outpost/internal/destinationadapter/adapters" awsadapter "github.com/hookdeck/outpost/internal/destinationadapter/adapters/aws" - "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/hookdeck/outpost/internal/util/awsutil" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -111,41 +107,21 @@ func TestAWSDestination_Validate(t *testing.T) { } func TestIntegrationAWSDestination_Publish(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() + t.Cleanup(testinfra.Start(t)) - // Setup SQS - awsEndpoint, terminate, err := testutil.StartTestcontainerLocalstack() - require.Nil(t, err) - defer terminate() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - queueName := "destination_sqs_queue" - awsRegion := "eu-central-1" - - sdkConfig, err := config.LoadDefaultConfig(ctx, - config.WithRegion(awsRegion), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("test", "test", "")), - ) - require.Nil(t, err) - sqsClient := sqs.NewFromConfig(sdkConfig, func(o *sqs.Options) { - o.BaseEndpoint = aws.String(awsEndpoint) - }) - queueURL, err := ensureQueue(ctx, sqsClient, queueName) - require.Nil(t, err) - - // Setup Destination & Event + mq := testinfra.NewMQAWSConfig(t, nil) + sqsClient, err := awsutil.SQSClientFromConfig(context.Background(), mq.AWSSQS) + require.NoError(t, err) + queueURL, err := awsutil.EnsureQueue(context.Background(), sqsClient, mq.AWSSQS.Topic, nil) + require.NoError(t, err) awsdestination := awsadapter.New() destination := adapters.DestinationAdapterValue{ ID: uuid.New().String(), Type: "aws", Config: map[string]string{ - "endpoint": awsEndpoint, + "endpoint": mq.AWSSQS.Endpoint, "queue_url": queueURL, }, Credentials: map[string]string{ @@ -158,6 +134,9 @@ func TestIntegrationAWSDestination_Publish(t *testing.T) { errchan := make(chan error) msgchan := make(chan *types.Message) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + go func() { for { out, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ @@ -212,8 +191,7 @@ func TestIntegrationAWSDestination_Publish(t *testing.T) { "mykey": "myvaluee", }, } - err = awsdestination.Publish(context.Background(), destination, event) - require.Nil(t, err) + require.NoError(t, awsdestination.Publish(context.Background(), destination, event)) // Assert log.Println("waiting for msg...") @@ -237,29 +215,3 @@ func TestIntegrationAWSDestination_Publish(t *testing.T) { assert.Equal(t, "anothermetadatavalue", *msg.MessageAttributes["another_metadata"].StringValue) } } - -func ensureQueue(ctx context.Context, sqsClient *sqs.Client, queueName string) (string, error) { - queue, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{ - QueueName: aws.String(queueName), - }) - if err != nil { - var apiErr smithy.APIError - if errors.As(err, &apiErr) { - switch apiErr.(type) { - case *types.QueueDoesNotExist: - log.Println("Queue does not exist, creating...") - createdQueue, err := sqsClient.CreateQueue(ctx, &sqs.CreateQueueInput{ - QueueName: aws.String(queueName), - }) - if err != nil { - return "", err - } - return *createdQueue.QueueUrl, nil - default: - return "", err - } - } - return "", err - } - return *queue.QueueUrl, nil -} diff --git a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go index f0c9ba35..a1ed9713 100644 --- a/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go +++ b/internal/destinationadapter/adapters/rabbitmq/rabbitmq_test.go @@ -124,15 +124,10 @@ func TestRabbitMQDestination_Publish(t *testing.T) { } func TestIntegrationRabbitMQDestination_Publish(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() - t.Cleanup(testinfra.Start(t)) - mq := testinfra.NewMQRabbitMQConfig(t) + mq := testinfra.NewMQRabbitMQConfig(t) rabbitmqDestination := rabbitmq.New() destination := adapters.DestinationAdapterValue{ From fdb5c610a4242494402c4b6e104053f128f818b5 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 18:32:33 +0700 Subject: [PATCH 07/11] test: Refactor internal/mqs integration tests --- internal/idempotence/idempotence_test.go | 48 ++++++------------------ internal/mqs/queue_test.go | 32 +++------------- 2 files changed, 17 insertions(+), 63 deletions(-) diff --git a/internal/idempotence/idempotence_test.go b/internal/idempotence/idempotence_test.go index abcda9fe..7b97ff0f 100644 --- a/internal/idempotence/idempotence_test.go +++ b/internal/idempotence/idempotence_test.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -197,15 +198,10 @@ func TestIdempotence_Failure(t *testing.T) { // - 1 message, failed twice before success // The 2 failed execution won't ack or nack the message to test visibility timeout. func TestIntegrationIdempotence_WithUnackedFailures(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() visibilityTimeout := 5 * time.Second - mq, cleanup := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) - defer cleanup() + mq := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) ctx, cancel := context.WithTimeout(context.Background(), visibilityTimeout*3-visibilityTimeout/2) defer cancel() @@ -260,15 +256,10 @@ func TestIntegrationIdempotence_WithUnackedFailures(t *testing.T) { // - message will sleep for 2s before success // - publisher will publish the same message twice to test idempotency func TestIntegrationIdempotence_WithConcurrentHandlerAndSuccess(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() visibilityTimeout := 5 * time.Second - mq, cleanup := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) - defer cleanup() + mq := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // exec should only take 2-4s defer cancel() @@ -349,15 +340,10 @@ func TestIntegrationIdempotence_WithConcurrentHandlerAndSuccess(t *testing.T) { // - message will failed twice, each exec taking 2s // - publisher will publish the same message twice to test idempotency func TestIntegrationIdempotence_WithConcurrentHandlerAndFailedExecution(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() visibilityTimeout := 5 * time.Second - mq, cleanup := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) - defer cleanup() + mq := startAWSSQSQueueWithVisibilityTimeout(context.Background(), t, visibilityTimeout) ctx, cancel := context.WithTimeout(context.Background(), visibilityTimeout*3-visibilityTimeout/2) defer cancel() @@ -445,24 +431,14 @@ func (m *MockMsg) ToMessage() (*mqs.Message, error) { return &mqs.Message{Body: []byte(m.ID)}, nil } -func startAWSSQSQueueWithVisibilityTimeout(ctx context.Context, t *testing.T, visibilityTimeout time.Duration) (mqs.Queue, func()) { - endpoint, cleanup, err := testutil.StartTestcontainerLocalstack() - require.Nil(t, err) - config := &mqs.QueueConfig{ - AWSSQS: &mqs.AWSSQSConfig{ - Endpoint: endpoint, - Region: "us-east-1", - ServiceAccountCredentials: "test:test:", - Topic: testutil.RandomString(10), - }, - } - testutil.DeclareTestAWSInfrastructure(ctx, config.AWSSQS, map[string]string{ +func startAWSSQSQueueWithVisibilityTimeout(ctx context.Context, t *testing.T, visibilityTimeout time.Duration) mqs.Queue { + t.Cleanup(testinfra.Start(t)) + mqConfig := testinfra.NewMQAWSConfig(t, map[string]string{ "VisibilityTimeout": strconv.Itoa(int(visibilityTimeout.Seconds())), }) - mq := mqs.NewQueue(config) - cleanupQueue, err := mq.Init(ctx) - return mq, func() { - cleanupQueue() - cleanup() - } + mq := mqs.NewQueue(&mqConfig) + cleanup, err := mq.Init(ctx) + t.Cleanup(cleanup) + require.NoError(t, err) + return mq } diff --git a/internal/mqs/queue_test.go b/internal/mqs/queue_test.go index 1aa18015..e95e7530 100644 --- a/internal/mqs/queue_test.go +++ b/internal/mqs/queue_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,39 +28,16 @@ func TestIntegrationMQ_InMemory(t *testing.T) { } func TestIntegrationMQ_RabbitMQ(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - t.Parallel() - - rabbitmqURL, terminate, err := testutil.StartTestcontainerRabbitMQ() - require.Nil(t, err) - defer terminate() - - config := mqs.QueueConfig{RabbitMQ: &mqs.RabbitMQConfig{ - ServerURL: rabbitmqURL, - Exchange: "outpost", - Queue: "outpost.delivery", - }} - testutil.DeclareTestRabbitMQInfrastructure(context.Background(), config.RabbitMQ) + t.Cleanup(testinfra.Start(t)) + config := testinfra.NewMQRabbitMQConfig(t) testMQ(t, func() mqs.QueueConfig { return config }) } func TestIntegrationMQ_AWS(t *testing.T) { t.Parallel() - - awsEndpoint, terminate, err := testutil.StartTestcontainerLocalstack() - require.Nil(t, err) - defer terminate() - - config := mqs.QueueConfig{AWSSQS: &mqs.AWSSQSConfig{ - Endpoint: awsEndpoint, - Region: "eu-central-1", - ServiceAccountCredentials: "test:test:", - Topic: "outpost", - }} - testutil.DeclareTestAWSInfrastructure(context.Background(), config.AWSSQS, nil) + t.Cleanup(testinfra.Start(t)) + config := testinfra.NewMQAWSConfig(t, nil) testMQ(t, func() mqs.QueueConfig { return config }) } From 3a9805058d00d31d49510de199d41b2159772f05 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 18:33:35 +0700 Subject: [PATCH 08/11] test: Remove unused testutil code --- internal/util/testutil/rabbitmq.go | 23 +++++ internal/util/testutil/testcontainers.go | 119 ----------------------- 2 files changed, 23 insertions(+), 119 deletions(-) delete mode 100644 internal/util/testutil/testcontainers.go diff --git a/internal/util/testutil/rabbitmq.go b/internal/util/testutil/rabbitmq.go index 5654a171..e43cca49 100644 --- a/internal/util/testutil/rabbitmq.go +++ b/internal/util/testutil/rabbitmq.go @@ -2,6 +2,7 @@ package testutil import ( "context" + "strings" "github.com/hookdeck/outpost/internal/mqs" "github.com/rabbitmq/amqp091-go" @@ -69,3 +70,25 @@ func TeardownTestRabbitMQInfrastructure(ctx context.Context, cfg *mqs.RabbitMQCo } return nil } + +// ExtractRabbitURL extracts the address from the endpoint +// for example: amqp://guest:guest@localhost:5672 -> localhost:5672 +func ExtractRabbitURL(endpoint string) string { + return strings.Split(endpoint, "@")[1] +} + +// ExtractRabbitUsername extracts the username from the endpoint +// for example: amqp://u:p@localhost:5672 -> u +func ExtractRabbitUsername(endpoint string) string { + first := strings.Split(endpoint, "@")[0] + creds := strings.Split(first, "://")[1] + return strings.Split(creds, ":")[0] +} + +// ExtractRabbitPassword extracts the password from the endpoint +// for example: amqp://u:p@localhost:5672 -> p +func ExtractRabbitPassword(endpoint string) string { + first := strings.Split(endpoint, "@")[0] + creds := strings.Split(first, "://")[1] + return strings.Split(creds, ":")[1] +} diff --git a/internal/util/testutil/testcontainers.go b/internal/util/testutil/testcontainers.go deleted file mode 100644 index fdef8a06..00000000 --- a/internal/util/testutil/testcontainers.go +++ /dev/null @@ -1,119 +0,0 @@ -package testutil - -import ( - "context" - "log" - "strings" - - "github.com/testcontainers/testcontainers-go/modules/clickhouse" - "github.com/testcontainers/testcontainers-go/modules/localstack" - "github.com/testcontainers/testcontainers-go/modules/rabbitmq" -) - -func StartTestcontainerRabbitMQ() (string, func(), error) { - ctx := context.Background() - - rabbitmqContainer, err := rabbitmq.Run(ctx, - "rabbitmq:3-management-alpine", - ) - - if err != nil { - log.Printf("failed to start container: %s", err) - return "", func() {}, err - } - - endpoint, err := rabbitmqContainer.PortEndpoint(ctx, "5672/tcp", "") - if err != nil { - log.Printf("failed to get endpoint: %s", err) - return "", func() {}, err - } - log.Printf("RabbitMQ running at %s", endpoint) - return "amqp://guest:guest@" + endpoint, - func() { - if err := rabbitmqContainer.Terminate(ctx); err != nil { - log.Printf("failed to terminate container: %s", err) - } - }, - nil -} - -// ExtractRabbitURL extracts the address from the endpoint -// for example: amqp://guest:guest@localhost:5672 -> localhost:5672 -func ExtractRabbitURL(endpoint string) string { - return strings.Split(endpoint, "@")[1] -} - -// ExtractRabbitUsername extracts the username from the endpoint -// for example: amqp://u:p@localhost:5672 -> u -func ExtractRabbitUsername(endpoint string) string { - first := strings.Split(endpoint, "@")[0] - creds := strings.Split(first, "://")[1] - return strings.Split(creds, ":")[0] -} - -// ExtractRabbitPassword extracts the password from the endpoint -// for example: amqp://u:p@localhost:5672 -> p -func ExtractRabbitPassword(endpoint string) string { - first := strings.Split(endpoint, "@")[0] - creds := strings.Split(first, "://")[1] - return strings.Split(creds, ":")[1] -} - -func StartTestcontainerLocalstack() (string, func(), error) { - ctx := context.Background() - - localstackContainer, err := localstack.Run(ctx, - "localstack/localstack:latest", - ) - - if err != nil { - log.Printf("failed to start container: %s", err) - return "", func() {}, err - } - - endpoint, err := localstackContainer.PortEndpoint(ctx, "4566/tcp", "") - if err != nil { - log.Printf("failed to get endpoint: %s", err) - return "", func() {}, err - } - if !strings.Contains(endpoint, "http://") { - endpoint = "http://" + endpoint - } - log.Printf("Localstack running at %s", endpoint) - return endpoint, - func() { - if err := localstackContainer.Terminate(ctx); err != nil { - log.Printf("failed to terminate container: %s", err) - } - }, - nil -} - -func StartTestContainerClickHouse() (string, func(), error) { - ctx := context.Background() - - clickHouseContainer, err := clickhouse.Run(ctx, - "clickhouse/clickhouse-server:latest", - clickhouse.WithUsername("default"), - clickhouse.WithPassword(""), - clickhouse.WithDatabase("default"), - ) - if err != nil { - log.Printf("failed to start container: %s", err) - return "", func() {}, err - } - - endpoint, err := clickHouseContainer.PortEndpoint(ctx, "9000/tcp", "") - if err != nil { - log.Printf("failed to get endpoint: %s", err) - return "", func() {}, err - } - log.Printf("ClickHouse running at %s", endpoint) - return endpoint, - func() { - if err := clickHouseContainer.Terminate(ctx); err != nil { - log.Printf("failed to terminate container: %s", err) - } - }, - nil -} From 818f4ebbd9aed8938b4a75d4a4c62aae8d44a573 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 18:56:10 +0700 Subject: [PATCH 09/11] test: Fix flaky test --- internal/publishmq/eventhandler_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index d37a8480..681e9fbd 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -9,16 +9,17 @@ import ( "github.com/google/uuid" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/mqs" "github.com/hookdeck/outpost/internal/publishmq" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // NOTE: This test seems to be a bit flaky. -func TestPublishMQEventHandler_Concurrency(t *testing.T) { +func TestIntegrationPublishMQEventHandler_Concurrency(t *testing.T) { t.Parallel() + t.Cleanup(testinfra.Start(t)) exporter := tracetest.NewInMemoryExporter() mockEventTracer := testutil.NewMockEventTracer(exporter) @@ -27,9 +28,8 @@ func TestPublishMQEventHandler_Concurrency(t *testing.T) { logger := testutil.CreateTestLogger(t) redisClient := testutil.CreateTestRedisClient(t) entityStore := models.NewEntityStore(redisClient, models.NewAESCipher("secret"), testutil.TestTopics) - deliveryMQ := deliverymq.New(deliverymq.WithQueue(&mqs.QueueConfig{ - InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}, - })) + mqConfig := testinfra.NewMQAWSConfig(t, nil) + deliveryMQ := deliverymq.New(deliverymq.WithQueue(&mqConfig)) cleanup, err := deliveryMQ.Init(ctx) require.NoError(t, err) defer cleanup() From c9e90c375bcf88abf9b22adef508fcc934049743 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 19:15:29 +0700 Subject: [PATCH 10/11] docs: How to use test infra & integration test template --- docs/contributing/test.md | 46 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/docs/contributing/test.md b/docs/contributing/test.md index e3bfb950..321d073e 100644 --- a/docs/contributing/test.md +++ b/docs/contributing/test.md @@ -39,7 +39,7 @@ $ TESTARGS='-v -run "TestJWT"'' make test # go test $(go list ./...) -v -run "TestJWT" ``` -Keep in mind you can't use `-run "Test..."` along with `make test/integration` as the integration test already specify integration tests with `-run` option. However, since you're already specifying which test to run, I assume this is a non-issue. +Keep in mind you can't use `-run "Test..."` along with `make test/integration` as the integration test already specify integration tests with `-run` option. However, since you're already specifying which test to run, we assume this is a non-issue. ## Coverage @@ -62,3 +62,47 @@ Running the coverage test command above will generate the `coverage.out` file. Y $ make test/coverage/html # go tool cover -html=coverage.out ``` + +## Integration & E2E Tests + +When running integration & e2e tests, we often times require some test infrastructure such as ClickHouse, LocalStack, RabbitMQ, etc. We use [Testcontainers](https://testcontainers.com/) for that. It usually takes a few seconds (10s or so) to spawn the necessary containers. To improve the feedback loop, you can run a persistent test infrastructure and skip spawning testcontainers. + +To run the test infrastructure: + +```sh +$ make up/test + +## to take the test infra down +# $ make down/test +``` + +It will run a Docker compose stack called `outpost-test` which runs the necessary services at ports ":30000 + port". For example, ClickHouse usually runs on port `:9000`, so in the test infra it will run on port `:39000`. + +From here, you can provide env variable `TESTINFRA=1` to tell the test suite to use these services instead of spawning testcontainers. + +```sh +$ TESTINFRA=1 make test +``` + +Tip: You can `$ export TESTINFRA=1` to use the test infra for the whole terminal session. + +### Integration Test Template + +Here's a short template for how you can write integration tests that require an external test infra: + +```golang +// Integration test should always start with "TestIntegration...() {}" +func TestIntegrationMyIntegrationTest(t *testing.T) { + t.Parallel() + + // call testinfra.Start(t) to signal that you require the test infra. + // This helps the test runner properly terminate resources at the end. + t.Cleanup(testinfra.Start(t)) + + // use whichever infra you need + chConfig := testinfra.NewClickHouseConfig(t) + awsMQConfig := testinfra.NewMQAWSConfig(t, attributesMap) + rabbitmqConfig := testinfra.NewMQRabbitMQConfig(t) + // ... +} +``` From 5fe357a6af8a953535be9f5b6bf167a209b662dd Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 25 Nov 2024 19:21:38 +0700 Subject: [PATCH 11/11] chore: Remove no-longer-valid comment --- internal/publishmq/eventhandler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index 681e9fbd..8856294e 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" ) -// NOTE: This test seems to be a bit flaky. func TestIntegrationPublishMQEventHandler_Concurrency(t *testing.T) { t.Parallel() t.Cleanup(testinfra.Start(t))