Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions contributing/loadtest/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Load Testing Overview

## Prerequisites

- k6 installed
- Docker running
- Outpost deployment with API key
- Node.js (for TypeScript compilation)

## Two-Phase Load Test

### 1. Throughput Test
Creates a tenant with one webhook destination and publishes events at a configured rate. Event IDs are stored in Redis for verification.

### 2. Verification Test
Queries the mock webhook to confirm delivery and measure latency metrics:
- End-to-end latency (publish to delivery)
- Receive latency (publish to Outpost receipt)
- Internal latency (Outpost processing time)

## Setup

### Start Supporting Services

```bash
cd loadtest
docker-compose up -d
```

This starts:
- **Redis** (`localhost:46379`): Coordinates test state between throughput and verification phases
- **Mock Webhook** (`localhost:48080`): Receives webhook deliveries and stores them for verification

### Configure Environment

Use `loadtest/config/environments/local.json` or create a new one (e.g., `staging.json`):

```json
{
"name": "local",
"api": {
"baseUrl": "http://localhost:3333",
"timeout": "30s"
},
"mockWebhook": {
"url": "http://localhost:48080",
"destinationUrl": "http://host.docker.internal:48080",
"verificationPollTimeout": "5s"
},
"redis": "redis://localhost:46379"
}
```

**Critical:** `mockWebhook.destinationUrl` must be accessible from your Outpost deployment:
- **Local Outpost in Docker**: `http://host.docker.internal:48080`
- **Local Outpost in Kubernetes**: `http://host.docker.internal:48080`
- **Remote Outpost**: Expose mock webhook publicly (e.g., ngrok tunnel) and use that URL

The mock webhook must be reachable by Outpost for event delivery to succeed.

### Configure Scenario

Use the default `basic.json` scenario, edit it locally, or create a new one.

Default scenario at `loadtest/config/scenarios/events-throughput/basic.json`:

```json
{
"options": {
"scenarios": {
"events": {
"rate": 100,
"timeUnit": "1s",
"duration": "30s",
"preAllocatedVUs": 20
}
}
}
}
```

To create a new scenario, add a file (e.g., `high-load.json`) in the same directory and reference it with `--scenario high-load`.

## Running Tests

### Throughput Test

```bash
export API_KEY=your-api-key
export TESTID=$(date +%s)

./run-test.sh events-throughput --environment local --scenario basic
```

### Verification Test

```bash
# Use same TESTID from throughput test
# MAX_ITERATIONS = rate × duration (e.g., 100 × 30 = 3000)
MAX_ITERATIONS=3000 ./run-test.sh events-verify --environment local --scenario basic
```

## Mock Webhook

The mock webhook service provides:
- `POST /webhook`: Receives event deliveries from Outpost
- `GET /events/{eventId}`: Returns event details for verification
- `GET /health`: Service status

Events are stored in an LRU cache with 10-minute expiration.

**Network Requirements:**
- k6 must reach mock webhook at `mockWebhook.url` to verify deliveries
- Outpost must reach mock webhook at `mockWebhook.destinationUrl` to deliver events
- For remote Outpost deployments, expose the mock webhook via tunnel or public endpoint

## Cleanup

```bash
docker-compose down
```
14 changes: 8 additions & 6 deletions internal/config/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

// DestinationsConfig is the main configuration for all destination types
type DestinationsConfig struct {
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"`
Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."`
AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."`
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"`
IncludeMillisecondTimestamp bool `yaml:"include_millisecond_timestamp" env:"DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP" desc:"If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging." required:"N"`
Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."`
AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."`
}

func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterDefaultDestinationOptions {
Expand All @@ -25,9 +26,10 @@ func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterD
}

return destregistrydefault.RegisterDefaultDestinationOptions{
UserAgent: userAgent,
Webhook: c.Webhook.toConfig(),
AWSKinesis: c.AWSKinesis.toConfig(),
UserAgent: userAgent,
IncludeMillisecondTimestamp: c.IncludeMillisecondTimestamp,
Webhook: c.Webhook.toConfig(),
AWSKinesis: c.AWSKinesis.toConfig(),
}
}

Expand Down
13 changes: 10 additions & 3 deletions internal/destregistry/baseprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@ func ObfuscateValue(value string) string {

// BaseProvider provides common functionality for all destination providers
type BaseProvider struct {
metadata *metadata.ProviderMetadata
metadata *metadata.ProviderMetadata
basePublisherOpts []BasePublisherOption
}

// NewBaseProvider creates a new base provider with loaded metadata
func NewBaseProvider(loader metadata.MetadataLoader, providerType string) (*BaseProvider, error) {
func NewBaseProvider(loader metadata.MetadataLoader, providerType string, opts ...BasePublisherOption) (*BaseProvider, error) {
meta, err := loader.Load(providerType)
if err != nil {
return nil, fmt.Errorf("loading provider metadata: %w", err)
}

return &BaseProvider{
metadata: meta,
metadata: meta,
basePublisherOpts: opts,
}, nil
}

// NewPublisher creates a BasePublisher with provider-configured options
func (p *BaseProvider) NewPublisher() *BasePublisher {
return NewBasePublisher(p.basePublisherOpts...)
}

// Metadata returns the provider metadata
func (p *BaseProvider) Metadata() *metadata.ProviderMetadata {
return p.metadata
Expand Down
30 changes: 28 additions & 2 deletions internal/destregistry/basepublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,28 @@ import (

// BasePublisher provides common publisher functionality
type BasePublisher struct {
active sync.WaitGroup
closed atomic.Bool
active sync.WaitGroup
closed atomic.Bool
includeMillisecondTimestamp bool
}

// BasePublisherOption is a functional option for configuring BasePublisher
type BasePublisherOption func(*BasePublisher)

// WithMillisecondTimestamp enables millisecond-precision timestamp in metadata
func WithMillisecondTimestamp(enabled bool) BasePublisherOption {
return func(p *BasePublisher) {
p.includeMillisecondTimestamp = enabled
}
}

// NewBasePublisher creates a new BasePublisher with the given options
func NewBasePublisher(opts ...BasePublisherOption) *BasePublisher {
p := &BasePublisher{}
for _, opt := range opts {
opt(p)
}
return p
}

// StartPublish returns error if publisher is closed, otherwise adds to waitgroup
Expand Down Expand Up @@ -41,6 +61,12 @@ func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) m
"event-id": event.ID,
"topic": event.Topic,
}

// Add millisecond timestamp if enabled
if p.includeMillisecondTimestamp {
systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli())
}

metadata := make(map[string]string)
for k, v := range systemMetadata {
metadata[k] = v
Expand Down
27 changes: 17 additions & 10 deletions internal/destregistry/providers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type DestAWSKinesisConfig struct {
}

type RegisterDefaultDestinationOptions struct {
UserAgent string
Webhook *DestWebhookConfig
AWSKinesis *DestAWSKinesisConfig
UserAgent string
IncludeMillisecondTimestamp bool
Webhook *DestWebhookConfig
AWSKinesis *DestAWSKinesisConfig
}

// RegisterDefault registers the default destination providers with the registry.
Expand All @@ -39,6 +40,12 @@ type RegisterDefaultDestinationOptions struct {
func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestinationOptions) error {
loader := registry.MetadataLoader()

// Build base publisher options that apply to all providers
basePublisherOpts := []destregistry.BasePublisherOption{}
if opts.IncludeMillisecondTimestamp {
basePublisherOpts = append(basePublisherOpts, destregistry.WithMillisecondTimestamp(opts.IncludeMillisecondTimestamp))
}

webhookOpts := []destwebhook.Option{
destwebhook.WithUserAgent(opts.UserAgent),
}
Expand All @@ -55,20 +62,20 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina
destwebhook.WithSignatureAlgorithm(opts.Webhook.SignatureAlgorithm),
)
}
webhook, err := destwebhook.New(loader, webhookOpts...)
webhook, err := destwebhook.New(loader, basePublisherOpts, webhookOpts...)
if err != nil {
return err
}
registry.RegisterProvider("webhook", webhook)

hookdeck, err := desthookdeck.New(loader,
hookdeck, err := desthookdeck.New(loader, basePublisherOpts,
desthookdeck.WithUserAgent(opts.UserAgent))
if err != nil {
return err
}
registry.RegisterProvider("hookdeck", hookdeck)

awsSQS, err := destawssqs.New(loader)
awsSQS, err := destawssqs.New(loader, basePublisherOpts)
if err != nil {
return err
}
Expand All @@ -80,25 +87,25 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina
destawskinesis.WithMetadataInPayload(opts.AWSKinesis.MetadataInPayload),
)
}
awsKinesis, err := destawskinesis.New(loader, awsKinesisOpts...)
awsKinesis, err := destawskinesis.New(loader, basePublisherOpts, awsKinesisOpts...)
if err != nil {
return err
}
registry.RegisterProvider("aws_kinesis", awsKinesis)

awsS3, err := destawss3.New(loader)
awsS3, err := destawss3.New(loader, basePublisherOpts)
if err != nil {
return err
}
registry.RegisterProvider("aws_s3", awsS3)

azureServiceBus, err := destazureservicebus.New(loader)
azureServiceBus, err := destazureservicebus.New(loader, basePublisherOpts)
if err != nil {
return err
}
registry.RegisterProvider("azure_servicebus", azureServiceBus)

rabbitmq, err := destrabbitmq.New(loader)
rabbitmq, err := destrabbitmq.New(loader, basePublisherOpts)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func WithMetadataInPayload(include bool) Option {
}

// Constructor
func New(loader metadata.MetadataLoader, opts ...Option) (*AWSKinesisProvider, error) {
base, err := destregistry.NewBaseProvider(loader, "aws_kinesis")
func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*AWSKinesisProvider, error) {
base, err := destregistry.NewBaseProvider(loader, "aws_kinesis", basePublisherOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (p *AWSKinesisProvider) CreatePublisher(ctx context.Context, destination *m
})

return &AWSKinesisPublisher{
BasePublisher: &destregistry.BasePublisher{},
BasePublisher: p.BaseProvider.NewPublisher(),
client: kinesisClient,
streamName: config.StreamName,
partitionKeyTemplate: config.PartitionKeyTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *AWSKinesisSuite) SetupSuite() {
require.NoError(t, err)

// Create provider
provider, err := destawskinesis.New(testutil.Registry.MetadataLoader())
provider, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
require.NoError(t, err)

// Create destination with partition key template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) {
}),
)

awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader())
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
require.NoError(t, err)

t.Run("should validate valid destination", func(t *testing.T) {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) {
func TestAWSKinesisDestination_ComputeTarget(t *testing.T) {
t.Parallel()

awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader())
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
require.NoError(t, err)

t.Run("should return stream and region as target", func(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions internal/destregistry/providers/destawss3/destawss3.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type AWSS3Provider struct {
var _ destregistry.Provider = (*AWSS3Provider)(nil)

// New creates a new AWSS3Provider
func New(loader metadata.MetadataLoader) (*AWSS3Provider, error) {
base, err := destregistry.NewBaseProvider(loader, "aws_s3")
func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*AWSS3Provider, error) {
base, err := destregistry.NewBaseProvider(loader, "aws_s3", basePublisherOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -102,6 +102,7 @@ func (p *AWSS3Provider) CreatePublisher(ctx context.Context, destination *models
}

return NewAWSS3Publisher(
p.BaseProvider.NewPublisher(),
client,
cfg.Bucket,
cfg.KeyTemplate,
Expand Down Expand Up @@ -349,6 +350,7 @@ func parseStorageClass(storageClass string) (types.StorageClass, error) {

// NewAWSS3Publisher exposed for testing
func NewAWSS3Publisher(
basePublisher *destregistry.BasePublisher,
client *s3.Client,
bucket, keyTemplateStr, storageClass string,
) *AWSS3Publisher {
Expand All @@ -360,7 +362,7 @@ func NewAWSS3Publisher(
}

return &AWSS3Publisher{
BasePublisher: &destregistry.BasePublisher{},
BasePublisher: basePublisher,
client: client,
bucket: bucket,
keyTemplate: tmpl,
Expand Down
Loading
Loading