diff --git a/contributing/loadtest/overview.md b/contributing/loadtest/overview.md new file mode 100644 index 00000000..b25e47fb --- /dev/null +++ b/contributing/loadtest/overview.md @@ -0,0 +1,121 @@ +# Load Testing Overview + +## Prerequisites + +- k6 installed +- Docker running +- Outpost deployment with API key +- Node.js (for TypeScript compilation) + +## Two-Phase Load Test + +### 1. Throughput Test +Creates a tenant with one webhook destination and publishes events at a configured rate. Event IDs are stored in Redis for verification. + +### 2. Verification Test +Queries the mock webhook to confirm delivery and measure latency metrics: +- End-to-end latency (publish to delivery) +- Receive latency (publish to Outpost receipt) +- Internal latency (Outpost processing time) + +## Setup + +### Start Supporting Services + +```bash +cd loadtest +docker-compose up -d +``` + +This starts: +- **Redis** (`localhost:46379`): Coordinates test state between throughput and verification phases +- **Mock Webhook** (`localhost:48080`): Receives webhook deliveries and stores them for verification + +### Configure Environment + +Use `loadtest/config/environments/local.json` or create a new one (e.g., `staging.json`): + +```json +{ + "name": "local", + "api": { + "baseUrl": "http://localhost:3333", + "timeout": "30s" + }, + "mockWebhook": { + "url": "http://localhost:48080", + "destinationUrl": "http://host.docker.internal:48080", + "verificationPollTimeout": "5s" + }, + "redis": "redis://localhost:46379" +} +``` + +**Critical:** `mockWebhook.destinationUrl` must be accessible from your Outpost deployment: +- **Local Outpost in Docker**: `http://host.docker.internal:48080` +- **Local Outpost in Kubernetes**: `http://host.docker.internal:48080` +- **Remote Outpost**: Expose mock webhook publicly (e.g., ngrok tunnel) and use that URL + +The mock webhook must be reachable by Outpost for event delivery to succeed. + +### Configure Scenario + +Use the default `basic.json` scenario, edit it locally, or create a new one. + +Default scenario at `loadtest/config/scenarios/events-throughput/basic.json`: + +```json +{ + "options": { + "scenarios": { + "events": { + "rate": 100, + "timeUnit": "1s", + "duration": "30s", + "preAllocatedVUs": 20 + } + } + } +} +``` + +To create a new scenario, add a file (e.g., `high-load.json`) in the same directory and reference it with `--scenario high-load`. + +## Running Tests + +### Throughput Test + +```bash +export API_KEY=your-api-key +export TESTID=$(date +%s) + +./run-test.sh events-throughput --environment local --scenario basic +``` + +### Verification Test + +```bash +# Use same TESTID from throughput test +# MAX_ITERATIONS = rate × duration (e.g., 100 × 30 = 3000) +MAX_ITERATIONS=3000 ./run-test.sh events-verify --environment local --scenario basic +``` + +## Mock Webhook + +The mock webhook service provides: +- `POST /webhook`: Receives event deliveries from Outpost +- `GET /events/{eventId}`: Returns event details for verification +- `GET /health`: Service status + +Events are stored in an LRU cache with 10-minute expiration. + +**Network Requirements:** +- k6 must reach mock webhook at `mockWebhook.url` to verify deliveries +- Outpost must reach mock webhook at `mockWebhook.destinationUrl` to deliver events +- For remote Outpost deployments, expose the mock webhook via tunnel or public endpoint + +## Cleanup + +```bash +docker-compose down +``` diff --git a/internal/config/destinations.go b/internal/config/destinations.go index 75373b0b..f3ca4acb 100644 --- a/internal/config/destinations.go +++ b/internal/config/destinations.go @@ -9,9 +9,10 @@ import ( // DestinationsConfig is the main configuration for all destination types type DestinationsConfig struct { - MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"` - Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."` - AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."` + MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"` + IncludeMillisecondTimestamp bool `yaml:"include_millisecond_timestamp" env:"DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP" desc:"If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging." required:"N"` + Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."` + AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."` } func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterDefaultDestinationOptions { @@ -25,9 +26,10 @@ func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterD } return destregistrydefault.RegisterDefaultDestinationOptions{ - UserAgent: userAgent, - Webhook: c.Webhook.toConfig(), - AWSKinesis: c.AWSKinesis.toConfig(), + UserAgent: userAgent, + IncludeMillisecondTimestamp: c.IncludeMillisecondTimestamp, + Webhook: c.Webhook.toConfig(), + AWSKinesis: c.AWSKinesis.toConfig(), } } diff --git a/internal/destregistry/baseprovider.go b/internal/destregistry/baseprovider.go index 42daedce..4f4c7ecc 100644 --- a/internal/destregistry/baseprovider.go +++ b/internal/destregistry/baseprovider.go @@ -25,21 +25,28 @@ func ObfuscateValue(value string) string { // BaseProvider provides common functionality for all destination providers type BaseProvider struct { - metadata *metadata.ProviderMetadata + metadata *metadata.ProviderMetadata + basePublisherOpts []BasePublisherOption } // NewBaseProvider creates a new base provider with loaded metadata -func NewBaseProvider(loader metadata.MetadataLoader, providerType string) (*BaseProvider, error) { +func NewBaseProvider(loader metadata.MetadataLoader, providerType string, opts ...BasePublisherOption) (*BaseProvider, error) { meta, err := loader.Load(providerType) if err != nil { return nil, fmt.Errorf("loading provider metadata: %w", err) } return &BaseProvider{ - metadata: meta, + metadata: meta, + basePublisherOpts: opts, }, nil } +// NewPublisher creates a BasePublisher with provider-configured options +func (p *BaseProvider) NewPublisher() *BasePublisher { + return NewBasePublisher(p.basePublisherOpts...) +} + // Metadata returns the provider metadata func (p *BaseProvider) Metadata() *metadata.ProviderMetadata { return p.metadata diff --git a/internal/destregistry/basepublisher.go b/internal/destregistry/basepublisher.go index f544f613..79fab90d 100644 --- a/internal/destregistry/basepublisher.go +++ b/internal/destregistry/basepublisher.go @@ -11,8 +11,28 @@ import ( // BasePublisher provides common publisher functionality type BasePublisher struct { - active sync.WaitGroup - closed atomic.Bool + active sync.WaitGroup + closed atomic.Bool + includeMillisecondTimestamp bool +} + +// BasePublisherOption is a functional option for configuring BasePublisher +type BasePublisherOption func(*BasePublisher) + +// WithMillisecondTimestamp enables millisecond-precision timestamp in metadata +func WithMillisecondTimestamp(enabled bool) BasePublisherOption { + return func(p *BasePublisher) { + p.includeMillisecondTimestamp = enabled + } +} + +// NewBasePublisher creates a new BasePublisher with the given options +func NewBasePublisher(opts ...BasePublisherOption) *BasePublisher { + p := &BasePublisher{} + for _, opt := range opts { + opt(p) + } + return p } // StartPublish returns error if publisher is closed, otherwise adds to waitgroup @@ -41,6 +61,12 @@ func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) m "event-id": event.ID, "topic": event.Topic, } + + // Add millisecond timestamp if enabled + if p.includeMillisecondTimestamp { + systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli()) + } + metadata := make(map[string]string) for k, v := range systemMetadata { metadata[k] = v diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index aa276bd2..8261725d 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -28,9 +28,10 @@ type DestAWSKinesisConfig struct { } type RegisterDefaultDestinationOptions struct { - UserAgent string - Webhook *DestWebhookConfig - AWSKinesis *DestAWSKinesisConfig + UserAgent string + IncludeMillisecondTimestamp bool + Webhook *DestWebhookConfig + AWSKinesis *DestAWSKinesisConfig } // RegisterDefault registers the default destination providers with the registry. @@ -39,6 +40,12 @@ type RegisterDefaultDestinationOptions struct { func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestinationOptions) error { loader := registry.MetadataLoader() + // Build base publisher options that apply to all providers + basePublisherOpts := []destregistry.BasePublisherOption{} + if opts.IncludeMillisecondTimestamp { + basePublisherOpts = append(basePublisherOpts, destregistry.WithMillisecondTimestamp(opts.IncludeMillisecondTimestamp)) + } + webhookOpts := []destwebhook.Option{ destwebhook.WithUserAgent(opts.UserAgent), } @@ -55,20 +62,20 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina destwebhook.WithSignatureAlgorithm(opts.Webhook.SignatureAlgorithm), ) } - webhook, err := destwebhook.New(loader, webhookOpts...) + webhook, err := destwebhook.New(loader, basePublisherOpts, webhookOpts...) if err != nil { return err } registry.RegisterProvider("webhook", webhook) - hookdeck, err := desthookdeck.New(loader, + hookdeck, err := desthookdeck.New(loader, basePublisherOpts, desthookdeck.WithUserAgent(opts.UserAgent)) if err != nil { return err } registry.RegisterProvider("hookdeck", hookdeck) - awsSQS, err := destawssqs.New(loader) + awsSQS, err := destawssqs.New(loader, basePublisherOpts) if err != nil { return err } @@ -80,25 +87,25 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina destawskinesis.WithMetadataInPayload(opts.AWSKinesis.MetadataInPayload), ) } - awsKinesis, err := destawskinesis.New(loader, awsKinesisOpts...) + awsKinesis, err := destawskinesis.New(loader, basePublisherOpts, awsKinesisOpts...) if err != nil { return err } registry.RegisterProvider("aws_kinesis", awsKinesis) - awsS3, err := destawss3.New(loader) + awsS3, err := destawss3.New(loader, basePublisherOpts) if err != nil { return err } registry.RegisterProvider("aws_s3", awsS3) - azureServiceBus, err := destazureservicebus.New(loader) + azureServiceBus, err := destazureservicebus.New(loader, basePublisherOpts) if err != nil { return err } registry.RegisterProvider("azure_servicebus", azureServiceBus) - rabbitmq, err := destrabbitmq.New(loader) + rabbitmq, err := destrabbitmq.New(loader, basePublisherOpts) if err != nil { return err } diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis.go b/internal/destregistry/providers/destawskinesis/destawskinesis.go index 5fb588ca..042b0630 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis.go @@ -51,8 +51,8 @@ func WithMetadataInPayload(include bool) Option { } // Constructor -func New(loader metadata.MetadataLoader, opts ...Option) (*AWSKinesisProvider, error) { - base, err := destregistry.NewBaseProvider(loader, "aws_kinesis") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*AWSKinesisProvider, error) { + base, err := destregistry.NewBaseProvider(loader, "aws_kinesis", basePublisherOpts...) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (p *AWSKinesisProvider) CreatePublisher(ctx context.Context, destination *m }) return &AWSKinesisPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: p.BaseProvider.NewPublisher(), client: kinesisClient, streamName: config.StreamName, partitionKeyTemplate: config.PartitionKeyTemplate, diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go index 09fc8e61..fc308a4c 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go @@ -326,7 +326,7 @@ func (s *AWSKinesisSuite) SetupSuite() { require.NoError(t, err) // Create provider - provider, err := destawskinesis.New(testutil.Registry.MetadataLoader()) + provider, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) // Create destination with partition key template diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis_validate_test.go b/internal/destregistry/providers/destawskinesis/destawskinesis_validate_test.go index cf3f1e9a..24feb95a 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis_validate_test.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis_validate_test.go @@ -29,7 +29,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) { }), ) - awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader()) + awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -122,7 +122,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) { func TestAWSKinesisDestination_ComputeTarget(t *testing.T) { t.Parallel() - awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader()) + awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should return stream and region as target", func(t *testing.T) { diff --git a/internal/destregistry/providers/destawss3/destawss3.go b/internal/destregistry/providers/destawss3/destawss3.go index 21847d92..b1b12b11 100644 --- a/internal/destregistry/providers/destawss3/destawss3.go +++ b/internal/destregistry/providers/destawss3/destawss3.go @@ -49,8 +49,8 @@ type AWSS3Provider struct { var _ destregistry.Provider = (*AWSS3Provider)(nil) // New creates a new AWSS3Provider -func New(loader metadata.MetadataLoader) (*AWSS3Provider, error) { - base, err := destregistry.NewBaseProvider(loader, "aws_s3") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*AWSS3Provider, error) { + base, err := destregistry.NewBaseProvider(loader, "aws_s3", basePublisherOpts...) if err != nil { return nil, err } @@ -102,6 +102,7 @@ func (p *AWSS3Provider) CreatePublisher(ctx context.Context, destination *models } return NewAWSS3Publisher( + p.BaseProvider.NewPublisher(), client, cfg.Bucket, cfg.KeyTemplate, @@ -349,6 +350,7 @@ func parseStorageClass(storageClass string) (types.StorageClass, error) { // NewAWSS3Publisher exposed for testing func NewAWSS3Publisher( + basePublisher *destregistry.BasePublisher, client *s3.Client, bucket, keyTemplateStr, storageClass string, ) *AWSS3Publisher { @@ -360,7 +362,7 @@ func NewAWSS3Publisher( } return &AWSS3Publisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: basePublisher, client: client, bucket: bucket, keyTemplate: tmpl, diff --git a/internal/destregistry/providers/destawss3/destawss3_format_test.go b/internal/destregistry/providers/destawss3/destawss3_format_test.go index 498890bb..e649ffff 100644 --- a/internal/destregistry/providers/destawss3/destawss3_format_test.go +++ b/internal/destregistry/providers/destawss3/destawss3_format_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/providers/destawss3" "github.com/hookdeck/outpost/internal/models" "github.com/stretchr/testify/assert" @@ -30,6 +31,7 @@ func TestAWSS3Publisher_Format_DefaultTemplate(t *testing.T) { // Use default template publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", `join('', [time.rfc3339_nano, '_', metadata."event-id", '.json'])`, @@ -70,6 +72,7 @@ func TestAWSS3Publisher_Format_DatePartitionTemplate(t *testing.T) { // Use date partitioning template template := `join('/', ['year=', time.year, 'month=', time.month, 'day=', time.day, metadata."event-id", '.json'])` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -94,6 +97,7 @@ func TestAWSS3Publisher_Format_TopicBasedTemplate(t *testing.T) { // Use topic-based organization template := `join('/', [metadata.topic, time.date, metadata."event-id"])` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -120,6 +124,7 @@ func TestAWSS3Publisher_Format_DataFieldTemplate(t *testing.T) { // Use data fields in template template := `join('/', ['users', data.user_id, 'actions', data.action, metadata."event-id"])` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -153,6 +158,7 @@ func TestAWSS3Publisher_Format_ComplexTemplate(t *testing.T) { // Complex template with multiple fields template := `join('/', [metadata.env, metadata.region, time.year, time.month, metadata.topic, data.customer_id, join('_', [data.order_id, time.unix])])` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -169,6 +175,7 @@ func TestAWSS3Publisher_Format_InvalidTemplate(t *testing.T) { // This should panic since the template is invalid assert.Panics(t, func() { destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", "invalid[template", @@ -187,6 +194,7 @@ func TestAWSS3Publisher_Format_NilResult(t *testing.T) { // Template that accesses non-existent field template := `data.nonexistent` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -210,6 +218,7 @@ func TestAWSS3Publisher_Format_EmptyResult(t *testing.T) { // Template that returns empty string template := `data.empty` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -233,6 +242,7 @@ func TestAWSS3Publisher_Format_NumericResult(t *testing.T) { // Template that returns a number template := `data.count` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -256,6 +266,7 @@ func TestAWSS3Publisher_Format_BooleanResult(t *testing.T) { // Template that returns a boolean template := `data.active` publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", template, @@ -296,6 +307,7 @@ func TestAWSS3Publisher_Format_TimeFields(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", tt.template, @@ -311,6 +323,7 @@ func TestAWSS3Publisher_Format_TimeFields(t *testing.T) { func TestAWSS3Publisher_Format_InvalidStorageClass(t *testing.T) { publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", `metadata."event-id"`, @@ -419,6 +432,7 @@ func TestAWSS3Publisher_Format_LegacyPatterns(t *testing.T) { t.Logf("Old config: %s", tt.oldConfig) publisher := destawss3.NewAWSS3Publisher( + destregistry.NewBasePublisher(), nil, "my-bucket", tt.template, diff --git a/internal/destregistry/providers/destawss3/destawss3_publish_test.go b/internal/destregistry/providers/destawss3/destawss3_publish_test.go index 18480f3a..ad36e896 100644 --- a/internal/destregistry/providers/destawss3/destawss3_publish_test.go +++ b/internal/destregistry/providers/destawss3/destawss3_publish_test.go @@ -196,7 +196,7 @@ func (s *S3PublishSuite) SetupSuite() { require.NoError(t, err) // Create provider - provider, err := destawss3.New(testutil.Registry.MetadataLoader()) + provider, err := destawss3.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) // Create destination configuration diff --git a/internal/destregistry/providers/destawss3/destawss3_validate_test.go b/internal/destregistry/providers/destawss3/destawss3_validate_test.go index d0917086..b07bfff4 100644 --- a/internal/destregistry/providers/destawss3/destawss3_validate_test.go +++ b/internal/destregistry/providers/destawss3/destawss3_validate_test.go @@ -29,7 +29,7 @@ func TestAWSS3Destination_Validate(t *testing.T) { }), ) - awsS3Destination, err := destawss3.New(testutil.Registry.MetadataLoader()) + awsS3Destination, err := destawss3.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -137,7 +137,7 @@ func TestAWSS3Destination_Validate(t *testing.T) { func TestAWSS3Destination_ComputeTarget(t *testing.T) { t.Parallel() - awsS3Destination, err := destawss3.New(testutil.Registry.MetadataLoader()) + awsS3Destination, err := destawss3.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) destination := testutil.DestinationFactory.Any( diff --git a/internal/destregistry/providers/destawssqs/destawssqs.go b/internal/destregistry/providers/destawssqs/destawssqs.go index c9e35121..95aa5ec9 100644 --- a/internal/destregistry/providers/destawssqs/destawssqs.go +++ b/internal/destregistry/providers/destawssqs/destawssqs.go @@ -36,8 +36,8 @@ type AWSSQSDestinationCredentials struct { var _ destregistry.Provider = (*AWSSQSDestination)(nil) -func New(loader metadata.MetadataLoader) (*AWSSQSDestination, error) { - base, err := destregistry.NewBaseProvider(loader, "aws_sqs") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*AWSSQSDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "aws_sqs", basePublisherOpts...) if err != nil { return nil, err } @@ -90,7 +90,7 @@ func (p *AWSSQSDestination) CreatePublisher(ctx context.Context, destination *mo }) return &AWSSQSPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: p.BaseProvider.NewPublisher(), client: sqsClient, queueURL: cfg.QueueURL, }, nil diff --git a/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go b/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go index 650c80cd..2d58502b 100644 --- a/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go +++ b/internal/destregistry/providers/destawssqs/destawssqs_publish_test.go @@ -128,7 +128,7 @@ func (s *AWSSQSSuite) SetupSuite() { s.consumer = NewSQSConsumer(sqsClient, queueURL) // Create provider - provider, err := destawssqs.New(testutil.Registry.MetadataLoader()) + provider, err := destawssqs.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) // Create destination diff --git a/internal/destregistry/providers/destawssqs/destawssqs_validate_test.go b/internal/destregistry/providers/destawssqs/destawssqs_validate_test.go index 6f75f0f6..c758f3e6 100644 --- a/internal/destregistry/providers/destawssqs/destawssqs_validate_test.go +++ b/internal/destregistry/providers/destawssqs/destawssqs_validate_test.go @@ -27,7 +27,7 @@ func TestAWSSQSDestination_Validate(t *testing.T) { }), ) - awsSQSDestination, err := destawssqs.New(testutil.Registry.MetadataLoader()) + awsSQSDestination, err := destawssqs.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -103,7 +103,7 @@ func TestAWSSQSDestination_Validate(t *testing.T) { func TestAWSSQSDestination_ComputeTarget(t *testing.T) { t.Parallel() - awsSQSDestination, err := destawssqs.New(testutil.Registry.MetadataLoader()) + awsSQSDestination, err := destawssqs.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should return queue_url as target", func(t *testing.T) { diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 5f687606..9e779b9d 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -27,8 +27,8 @@ type AzureServiceBusDestinationCredentials struct { var _ destregistry.Provider = (*AzureServiceBusDestination)(nil) -func New(loader metadata.MetadataLoader) (*AzureServiceBusDestination, error) { - base, err := destregistry.NewBaseProvider(loader, "azure_servicebus") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*AzureServiceBusDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "azure_servicebus", basePublisherOpts...) if err != nil { return nil, err } @@ -50,7 +50,7 @@ func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destin } return &AzureServiceBusPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: d.BaseProvider.NewPublisher(), connectionString: creds.ConnectionString, queueOrTopic: cfg.Name, }, nil diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go index ec6b0d78..f4e7ef2c 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -153,7 +153,7 @@ func (s *AzureServiceBusSuite) SetupSuite() { s.consumer = consumer // Create provider - provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) // Create destination diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go index 68f50457..52666039 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go @@ -14,7 +14,7 @@ import ( ) func TestComputeTarget(t *testing.T) { - provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) tests := []struct { @@ -142,7 +142,7 @@ func TestParseNamespaceFromConnectionString(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // We need to test the parseNamespaceFromConnectionString function // Since it's not exported, we test it indirectly through ComputeTarget - provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) dest := models.Destination{ @@ -166,7 +166,7 @@ func TestParseNamespaceFromConnectionString(t *testing.T) { } func TestValidate(t *testing.T) { - provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) tests := []struct { diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck.go b/internal/destregistry/providers/desthookdeck/desthookdeck.go index 1023a83d..030823c6 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck.go @@ -49,8 +49,8 @@ func WithUserAgent(userAgent string) ProviderOption { } // Constructor -func New(loader metadata.MetadataLoader, opts ...ProviderOption) (*HookdeckProvider, error) { - base, err := destregistry.NewBaseProvider(loader, "hookdeck") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...ProviderOption) (*HookdeckProvider, error) { + base, err := destregistry.NewBaseProvider(loader, "hookdeck", basePublisherOpts...) if err != nil { return nil, err } @@ -108,9 +108,10 @@ func NewPublisher(tokenString string, opts ...PublisherOption) (*HookdeckPublish return nil, err } - // Create publisher with default settings + // Note: This NewPublisher is called from CreatePublisher which has access to BaseProvider + // For now, we create a default BasePublisher here - this will be refactored publisher := &HookdeckPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: destregistry.NewBasePublisher(), tokenString: tokenString, parsedToken: parsedToken, client: &http.Client{Timeout: 30 * time.Second}, @@ -134,26 +135,31 @@ func (p *HookdeckProvider) CreatePublisher(ctx context.Context, destination *mod // Get the token from credentials tokenString := destination.Credentials["token"] - // Create publisher options - var opts []PublisherOption + // Parse the token + parsedToken, err := ParseHookdeckToken(tokenString) + if err != nil { + return nil, destregistry.NewErrDestinationPublishAttempt(err, "hookdeck", map[string]interface{}{ + "error": "invalid_token", + "message": err.Error(), + }) + } - // Use the provider's HTTP client if set + // Determine HTTP client + var client *http.Client if p.httpClient != nil { - opts = append(opts, PublisherWithClient(p.httpClient)) + client = p.httpClient } else { - httpClient := p.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client = p.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &p.userAgent, }) - opts = append(opts, PublisherWithClient(httpClient)) } - // Use NewPublisher to create the publisher with options - publisher, err := NewPublisher(tokenString, opts...) - if err != nil { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "hookdeck", map[string]interface{}{ - "error": "invalid_token", - "message": err.Error(), - }) + // Create publisher with base publisher from provider + publisher := &HookdeckPublisher{ + BasePublisher: p.BaseProvider.NewPublisher(), + tokenString: tokenString, + parsedToken: parsedToken, + client: client, } return publisher, nil diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck_preprocess_test.go b/internal/destregistry/providers/desthookdeck/desthookdeck_preprocess_test.go index 0d98827c..364c7a74 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck_preprocess_test.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck_preprocess_test.go @@ -168,6 +168,7 @@ func TestPreprocess(t *testing.T) { // Create the provider with our custom client provider, err := desthookdeck.New( testutil.Registry.MetadataLoader(), + nil, desthookdeck.WithHTTPClient(client), ) require.NoError(t, err) @@ -257,6 +258,7 @@ func TestPreprocess_ServerError(t *testing.T) { // Create the provider with our custom client provider, err := desthookdeck.New( testutil.Registry.MetadataLoader(), + nil, desthookdeck.WithHTTPClient(client), ) require.NoError(t, err) @@ -312,6 +314,7 @@ func TestPreprocess_Timeout(t *testing.T) { // Create the provider with our custom client provider, err := desthookdeck.New( testutil.Registry.MetadataLoader(), + nil, desthookdeck.WithHTTPClient(client), ) require.NoError(t, err) diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck_publish_test.go b/internal/destregistry/providers/desthookdeck/desthookdeck_publish_test.go index eb2efeeb..f403b0f0 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck_publish_test.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck_publish_test.go @@ -205,6 +205,7 @@ func (s *HookdeckPublishSuite) SetupSuite() { // Create provider with the custom HTTP client provider, err := desthookdeck.New( testutil.Registry.MetadataLoader(), + nil, desthookdeck.WithHTTPClient(customClient), ) require.NoError(s.T(), err) @@ -261,6 +262,7 @@ func TestHookdeckProvider_WithClientOption(t *testing.T) { // Create a provider with the custom client provider, err := desthookdeck.New( testutil.Registry.MetadataLoader(), + nil, desthookdeck.WithHTTPClient(customClient), ) require.NoError(t, err) diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go index 701f0d32..98040f56 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go @@ -31,8 +31,8 @@ type RabbitMQDestinationCredentials struct { var _ destregistry.Provider = (*RabbitMQDestination)(nil) -func New(loader metadata.MetadataLoader) (*RabbitMQDestination, error) { - base, err := destregistry.NewBaseProvider(loader, "rabbitmq") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*RabbitMQDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "rabbitmq", basePublisherOpts...) if err != nil { return nil, err } @@ -65,7 +65,7 @@ func (d *RabbitMQDestination) CreatePublisher(ctx context.Context, destination * return nil, err } return &RabbitMQPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: d.BaseProvider.NewPublisher(), url: rabbitURL(config, credentials), exchange: config.Exchange, }, nil diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go index e629c4c0..45ab0bc1 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go @@ -169,7 +169,7 @@ func (s *RabbitMQPublishSuite) SetupSuite() { rabbitURL := testinfra.EnsureRabbitMQ() exchange := uuid.New().String() - provider, err := destrabbitmq.New(testutil.Registry.MetadataLoader()) + provider, err := destrabbitmq.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) dest := testutil.DestinationFactory.Any( diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq_validate_test.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq_validate_test.go index e54ae56a..e28695bd 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq_validate_test.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq_validate_test.go @@ -28,7 +28,7 @@ func TestRabbitMQDestination_Validate(t *testing.T) { }), ) - rabbitmqDestination, err := destrabbitmq.New(testutil.Registry.MetadataLoader()) + rabbitmqDestination, err := destrabbitmq.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -186,7 +186,7 @@ func TestRabbitMQDestination_Validate(t *testing.T) { func TestRabbitMQDestination_ComputeTarget(t *testing.T) { t.Parallel() - rabbitmqDestination, err := destrabbitmq.New(testutil.Registry.MetadataLoader()) + rabbitmqDestination, err := destrabbitmq.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should return 'exchange -> *' as target for destination with all topics", func(t *testing.T) { diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index da975bdf..2729d265 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -120,8 +120,8 @@ func WithSignatureAlgorithm(algorithm string) Option { } } -func New(loader metadata.MetadataLoader, opts ...Option) (*WebhookDestination, error) { - base, err := destregistry.NewBaseProvider(loader, "webhook") +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*WebhookDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "webhook", basePublisherOpts...) if err != nil { return nil, err } @@ -218,7 +218,7 @@ func (d *WebhookDestination) CreatePublisher(ctx context.Context, destination *m }) return &WebhookPublisher{ - BasePublisher: &destregistry.BasePublisher{}, + BasePublisher: d.BaseProvider.NewPublisher(), httpClient: httpClient, url: config.URL, headerPrefix: d.headerPrefix, diff --git a/internal/destregistry/providers/destwebhook/destwebhook_config_test.go b/internal/destregistry/providers/destwebhook/destwebhook_config_test.go index 295982ee..47fa9480 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_config_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_config_test.go @@ -122,7 +122,7 @@ func TestWebhookDestination_SignatureOptions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - dest, err := destwebhook.New(testutil.Registry.MetadataLoader(), tt.opts...) + dest, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil, tt.opts...) assert.NoError(t, err) assert.Equal(t, tt.wantEncoding, dest.GetSignatureEncoding()) assert.Equal(t, tt.wantAlgorithm, dest.GetSignatureAlgorithm()) diff --git a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go index 3a8c4a52..0f0da10b 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go @@ -148,7 +148,7 @@ func (s *WebhookPublishSuite) TearDownSuite() { func (s *WebhookPublishSuite) setupBasicSuite() { consumer := NewWebhookConsumer("x-outpost-") - provider, err := destwebhook.New(testutil.Registry.MetadataLoader()) + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(s.T(), err) dest := testutil.DestinationFactory.Any( @@ -179,7 +179,7 @@ func (s *WebhookPublishSuite) setupBasicSuite() { func (s *WebhookPublishSuite) setupSingleSecretSuite() { consumer := NewWebhookConsumer("x-outpost-") - provider, err := destwebhook.New(testutil.Registry.MetadataLoader()) + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(s.T(), err) dest := testutil.DestinationFactory.Any( @@ -210,7 +210,7 @@ func (s *WebhookPublishSuite) setupSingleSecretSuite() { func (s *WebhookPublishSuite) setupMultipleSecretsSuite() { consumer := NewWebhookConsumer("x-outpost-") - provider, err := destwebhook.New(testutil.Registry.MetadataLoader()) + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(s.T(), err) now := time.Now() @@ -245,7 +245,7 @@ func (s *WebhookPublishSuite) setupMultipleSecretsSuite() { func (s *WebhookPublishSuite) setupExpiredSecretsSuite() { consumer := NewWebhookConsumer("x-outpost-") - provider, err := destwebhook.New(testutil.Registry.MetadataLoader()) + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(s.T(), err) now := time.Now() @@ -283,6 +283,7 @@ func (s *WebhookPublishSuite) setupCustomHeaderSuite() { provider, err := destwebhook.New( testutil.Registry.MetadataLoader(), + nil, destwebhook.WithHeaderPrefix(customPrefix), ) require.NoError(s.T(), err) @@ -397,7 +398,7 @@ func TestWebhookPublisher_DisableDefaultHeaders(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - dest, err := destwebhook.New(testutil.Registry.MetadataLoader(), tt.options...) + dest, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil, tt.options...) require.NoError(t, err) destination := testutil.DestinationFactory.Any( @@ -498,6 +499,7 @@ func TestWebhookPublisher_SignatureTemplates(t *testing.T) { t.Run(tt.name, func(t *testing.T) { provider, err := destwebhook.New( testutil.Registry.MetadataLoader(), + nil, destwebhook.WithSignatureContentTemplate(tt.contentTemplate), destwebhook.WithSignatureHeaderTemplate(tt.headerTemplate), ) diff --git a/internal/destregistry/providers/destwebhook/destwebhook_validate_test.go b/internal/destregistry/providers/destwebhook/destwebhook_validate_test.go index 7fa4fcfb..f23bcf30 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_validate_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_validate_test.go @@ -27,7 +27,7 @@ func TestWebhookDestination_Validate(t *testing.T) { }), ) - webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader()) + webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -87,7 +87,7 @@ func TestWebhookDestination_ValidateSecrets(t *testing.T) { }), ) - webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader()) + webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should validate valid destination", func(t *testing.T) { @@ -139,7 +139,7 @@ func TestWebhookDestination_ValidateSecrets(t *testing.T) { func TestWebhookDestination_ComputeTarget(t *testing.T) { t.Parallel() - webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader()) + webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should return url as target", func(t *testing.T) { @@ -158,7 +158,7 @@ func TestWebhookDestination_ComputeTarget(t *testing.T) { func TestWebhookDestination_Preprocess(t *testing.T) { t.Parallel() - webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader()) + webhookDestination, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) require.NoError(t, err) t.Run("should generate default secret if not provided", func(t *testing.T) { diff --git a/loadtest/src/tests/events-throughput.ts b/loadtest/src/tests/events-throughput.ts index ae0cff00..e7d9762d 100644 --- a/loadtest/src/tests/events-throughput.ts +++ b/loadtest/src/tests/events-throughput.ts @@ -173,8 +173,8 @@ export default function (data: { tenantId: string }) { { headers } ); - // Check if the event was published successfully - const isSuccess = eventResponse.status === 200; + // Check if the event was published successfully (202 Accepted is the success status) + const isSuccess = eventResponse.status === 202; // Record custom metrics publishSuccessRate.add(isSuccess); diff --git a/loadtest/src/tests/events-verify.ts b/loadtest/src/tests/events-verify.ts index f33ef10a..46083cee 100644 --- a/loadtest/src/tests/events-verify.ts +++ b/loadtest/src/tests/events-verify.ts @@ -137,19 +137,33 @@ export default async function () { } } - // Extract delivery start timestamp from headers (X-Acme-Timestamp or X-Outpost-Timestamp) + // Extract delivery start timestamp from headers + // First try millisecond timestamp (X-Outpost-Timestamp-Ms), then fall back to second precision if (responseData && responseData.headers) { - const timestampHeader = - responseData.headers["X-Acme-Timestamp"] || - responseData.headers["X-Outpost-Timestamp"]; - if (timestampHeader) { - const timestamp = parseInt(timestampHeader, 10); + const timestampMsHeader = + responseData.headers["X-Acme-Timestamp-Ms"] || + responseData.headers["X-Outpost-Timestamp-Ms"]; + + if (timestampMsHeader) { + // Millisecond timestamp - use directly + const timestamp = parseInt(timestampMsHeader, 10); if (!isNaN(timestamp)) { - // Convert seconds to milliseconds if needed - deliveryStartTimestamp = - timestamp.toString().length === 10 - ? timestamp * 1000 - : timestamp; + deliveryStartTimestamp = timestamp; + } + } else { + // Fall back to second precision timestamp + const timestampHeader = + responseData.headers["X-Acme-Timestamp"] || + responseData.headers["X-Outpost-Timestamp"]; + if (timestampHeader) { + const timestamp = parseInt(timestampHeader, 10); + if (!isNaN(timestamp)) { + // Convert seconds to milliseconds if needed + deliveryStartTimestamp = + timestamp.toString().length === 10 + ? timestamp * 1000 + : timestamp; + } } } } diff --git a/loadtest/src/tests/events.ts b/loadtest/src/tests/events.ts index a5c03f9a..a2d734c0 100644 --- a/loadtest/src/tests/events.ts +++ b/loadtest/src/tests/events.ts @@ -117,10 +117,10 @@ export default function (): void { ); check(eventResponse, { - "event published": (r) => r.status === 200, + "event published": (r) => r.status === 202, }); - if (eventResponse.status !== 200) { + if (eventResponse.status !== 202) { throw new Error( `Failed to publish event: ${eventResponse.status} ${eventResponse.body}` );