diff --git a/cmd/e2e/api_test.go b/cmd/e2e/api_test.go index 782263ad..884370d0 100644 --- a/cmd/e2e/api_test.go +++ b/cmd/e2e/api_test.go @@ -274,6 +274,7 @@ func (suite *basicSuite) TestTenantsAPI() { func (suite *basicSuite) TestDestinationsAPI() { tenantID := idgen.String() sampleDestinationID := idgen.Destination() + destinationWithMetadataID := idgen.Destination() tests := []APITest{ { Name: "PUT /:tenantID", @@ -438,6 +439,177 @@ func (suite *basicSuite) TestDestinationsAPI() { }, }, }, + { + Name: "POST /:tenantID/destinations with delivery_metadata and metadata", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations", + Body: map[string]interface{}{ + "id": destinationWithMetadataID, + "type": "webhook", + "topics": []string{"user.created"}, + "config": map[string]interface{}{ + "url": "http://host.docker.internal:4444", + }, + "delivery_metadata": map[string]interface{}{ + "X-App-ID": "test-app", + "X-Version": "1.0", + }, + "metadata": map[string]interface{}{ + "environment": "test", + "team": "platform", + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + { + Name: "GET /:tenantID/destinations/:destinationID with delivery_metadata and metadata", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationWithMetadataID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + Body: map[string]interface{}{ + "id": destinationWithMetadataID, + "type": "webhook", + "topics": []string{"user.created"}, + "config": map[string]interface{}{ + "url": "http://host.docker.internal:4444", + }, + "credentials": map[string]interface{}{}, + "delivery_metadata": map[string]interface{}{ + "X-App-ID": "test-app", + "X-Version": "1.0", + }, + "metadata": map[string]interface{}{ + "environment": "test", + "team": "platform", + }, + }, + }, + }, + }, + { + Name: "PATCH /:tenantID/destinations/:destinationID update delivery_metadata", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPATCH, + Path: "/" + tenantID + "/destinations/" + destinationWithMetadataID, + Body: map[string]interface{}{ + "delivery_metadata": map[string]interface{}{ + "X-Version": "2.0", // Overwrite existing value (was "1.0") + "X-Region": "us-east-1", // Add new key + }, + // Note: X-App-ID not included, should be preserved from original + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + Body: map[string]interface{}{ + "id": destinationWithMetadataID, + "type": "webhook", + "topics": []string{"user.created"}, + "config": map[string]interface{}{ + "url": "http://host.docker.internal:4444", + }, + "credentials": map[string]interface{}{}, + "delivery_metadata": map[string]interface{}{ + "X-App-ID": "test-app", // PRESERVED: Not in PATCH request + "X-Version": "2.0", // OVERWRITTEN: Updated from "1.0" + "X-Region": "us-east-1", // NEW: Added by PATCH request + }, + "metadata": map[string]interface{}{ + "environment": "test", + "team": "platform", + }, + }, + }, + }, + }, + { + Name: "PATCH /:tenantID/destinations/:destinationID update metadata", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPATCH, + Path: "/" + tenantID + "/destinations/" + destinationWithMetadataID, + Body: map[string]interface{}{ + "metadata": map[string]interface{}{ + "team": "engineering", // Overwrite existing value (was "platform") + "region": "us", // Add new key + }, + // Note: environment not included, should be preserved from original + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + Body: map[string]interface{}{ + "id": destinationWithMetadataID, + "type": "webhook", + "topics": []string{"user.created"}, + "config": map[string]interface{}{ + "url": "http://host.docker.internal:4444", + }, + "credentials": map[string]interface{}{}, + "delivery_metadata": map[string]interface{}{ + "X-App-ID": "test-app", + "X-Version": "2.0", + "X-Region": "us-east-1", + }, + "metadata": map[string]interface{}{ + "environment": "test", // PRESERVED: Not in PATCH request + "team": "engineering", // OVERWRITTEN: Updated from "platform" + "region": "us", // NEW: Added by PATCH request + }, + }, + }, + }, + }, + { + Name: "GET /:tenantID/destinations/:destinationID verify merged fields", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationWithMetadataID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + Body: map[string]interface{}{ + "id": destinationWithMetadataID, + "type": "webhook", + "topics": []string{"user.created"}, + "config": map[string]interface{}{ + "url": "http://host.docker.internal:4444", + }, + "credentials": map[string]interface{}{}, + // Verify delivery_metadata merge behavior persists: + // - Original: {"X-App-ID": "test-app", "X-Version": "1.0"} + // - After PATCH 1: {"X-Version": "2.0", "X-Region": "us-east-1"} + // - Result: Preserved X-App-ID, overwrote X-Version, added X-Region + "delivery_metadata": map[string]interface{}{ + "X-App-ID": "test-app", + "X-Version": "2.0", + "X-Region": "us-east-1", + }, + // Verify metadata merge behavior persists: + // - Original: {"environment": "test", "team": "platform"} + // - After PATCH 2: {"team": "engineering", "region": "us"} + // - Result: Preserved environment, overwrote team, added region + "metadata": map[string]interface{}{ + "environment": "test", + "team": "engineering", + "region": "us", + }, + }, + }, + }, + }, { Name: "POST /:tenantID/destinations with duplicate ID", Request: suite.AuthRequest(httpclient.Request{ @@ -468,7 +640,7 @@ func (suite *basicSuite) TestDestinationsAPI() { Path: "/" + tenantID + "/destinations", }), Expected: APITestExpectation{ - Validate: makeDestinationListValidator(2), + Validate: makeDestinationListValidator(3), }, }, { @@ -621,7 +793,7 @@ func (suite *basicSuite) TestDestinationsAPI() { Path: "/" + tenantID + "/destinations", }), Expected: APITestExpectation{ - Validate: makeDestinationListValidator(1), + Validate: makeDestinationListValidator(2), }, }, } diff --git a/docs/apis/openapi.yaml b/docs/apis/openapi.yaml index d6109bd9..673dd600 100644 --- a/docs/apis/openapi.yaml +++ b/docs/apis/openapi.yaml @@ -341,6 +341,20 @@ components: $ref: "#/components/schemas/WebhookConfig" credentials: $ref: "#/components/schemas/WebhookCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (e.g., URL host). Read-only. @@ -396,6 +410,20 @@ components: $ref: "#/components/schemas/AWSSQSConfig" credentials: $ref: "#/components/schemas/AWSSQSCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (SQS queue name). Read-only. @@ -451,6 +479,20 @@ components: $ref: "#/components/schemas/RabbitMQConfig" credentials: $ref: "#/components/schemas/RabbitMQCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (RabbitMQ exchange). Read-only. @@ -506,6 +548,20 @@ components: config: {} # Empty config credentials: $ref: "#/components/schemas/HookdeckCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (Hookdeck). Read-only. @@ -558,6 +614,20 @@ components: $ref: "#/components/schemas/AWSKinesisConfig" credentials: $ref: "#/components/schemas/AWSKinesisCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (Kinesis stream name). Read-only. @@ -613,6 +683,20 @@ components: $ref: "#/components/schemas/AzureServiceBusConfig" credentials: $ref: "#/components/schemas/AzureServiceBusCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (Azure Service Bus queue/topic name). Read-only. @@ -667,6 +751,20 @@ components: $ref: "#/components/schemas/AWSS3Config" credentials: $ref: "#/components/schemas/AWSS3Credentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (bucket and region). Read-only. @@ -722,6 +820,20 @@ components: $ref: "#/components/schemas/GCPPubSubConfig" credentials: $ref: "#/components/schemas/GCPPubSubCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } target: type: string description: A human-readable representation of the destination target (project/topic). Read-only. @@ -788,6 +900,20 @@ components: credentials: # Secret is optional on create for admin, forbidden for tenant $ref: "#/components/schemas/WebhookCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateAWSSQS: type: object required: [type, topics, config, credentials] @@ -806,6 +932,20 @@ components: $ref: "#/components/schemas/AWSSQSConfig" credentials: $ref: "#/components/schemas/AWSSQSCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateRabbitMQ: type: object required: [type, topics, config, credentials] @@ -824,6 +964,20 @@ components: $ref: "#/components/schemas/RabbitMQConfig" credentials: $ref: "#/components/schemas/RabbitMQCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateHookdeck: type: object required: [type, topics, credentials] # No config @@ -841,6 +995,20 @@ components: config: {} credentials: $ref: "#/components/schemas/HookdeckCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateAWSKinesis: type: object required: [type, topics, config, credentials] @@ -859,6 +1027,20 @@ components: $ref: "#/components/schemas/AWSKinesisConfig" credentials: $ref: "#/components/schemas/AWSKinesisCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateAzureServiceBus: type: object @@ -878,6 +1060,20 @@ components: $ref: "#/components/schemas/AzureServiceBusConfig" credentials: $ref: "#/components/schemas/AzureServiceBusCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateAWSS3: type: object @@ -897,6 +1093,20 @@ components: $ref: "#/components/schemas/AWSS3Config" credentials: $ref: "#/components/schemas/AWSS3Credentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationCreateGCPPubSub: type: object required: [type, topics, config, credentials] @@ -915,6 +1125,20 @@ components: $ref: "#/components/schemas/GCPPubSubConfig" credentials: $ref: "#/components/schemas/GCPPubSubCredentials" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Creation Schema (for Request Bodies) DestinationCreate: @@ -966,6 +1190,20 @@ components: $ref: "#/components/schemas/WebhookConfig" # URL is required here, but PATCH means it's optional in the request credentials: $ref: "#/components/schemas/WebhookCredentialsUpdate" + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateAWSSQS: type: object # Properties duplicated from DestinationUpdateBase @@ -976,6 +1214,20 @@ components: $ref: "#/components/schemas/AWSSQSConfig" # queue_url is required here, but PATCH means it's optional credentials: $ref: "#/components/schemas/AWSSQSCredentials" # key/secret required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateRabbitMQ: type: object # Properties duplicated from DestinationUpdateBase @@ -986,6 +1238,20 @@ components: $ref: "#/components/schemas/RabbitMQConfig" # server_url/exchange required here, but PATCH means optional credentials: $ref: "#/components/schemas/RabbitMQCredentials" # username/password required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateHookdeck: type: object # Properties duplicated from DestinationUpdateBase @@ -995,6 +1261,20 @@ components: config: {} # Empty config, cannot be updated credentials: $ref: "#/components/schemas/HookdeckCredentials" # token required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateAWSKinesis: type: object # Properties duplicated from DestinationUpdateBase @@ -1005,6 +1285,20 @@ components: $ref: "#/components/schemas/AWSKinesisConfig" # stream_name/region required here, but PATCH means optional credentials: $ref: "#/components/schemas/AWSKinesisCredentials" # key/secret required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateAzureServiceBus: type: object # Properties duplicated from DestinationUpdateBase @@ -1015,6 +1309,20 @@ components: $ref: "#/components/schemas/AzureServiceBusConfig" # name required here, but PATCH means optional credentials: $ref: "#/components/schemas/AzureServiceBusCredentials" # connection_string required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateAWSS3: type: object @@ -1026,6 +1334,20 @@ components: $ref: "#/components/schemas/AWSS3Config" # bucket/region required here, but PATCH means optional credentials: $ref: "#/components/schemas/AWSS3Credentials" # key/secret required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } DestinationUpdateGCPPubSub: type: object # Properties duplicated from DestinationUpdateBase @@ -1036,6 +1358,20 @@ components: $ref: "#/components/schemas/GCPPubSubConfig" # project_id/topic required here, but PATCH means optional credentials: $ref: "#/components/schemas/GCPPubSubCredentials" # service_account_json required here, but PATCH means optional + delivery_metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Static key-value pairs merged into event metadata on every delivery. + example: { "app-id": "my-app", "region": "us-east-1" } + metadata: + type: object + additionalProperties: + type: string + nullable: true + description: Arbitrary contextual information stored with the destination. + example: { "internal-id": "123", "team": "platform" } # Polymorphic Destination Update Schema (for Request Bodies) DestinationUpdate: diff --git a/docs/pages/features/publish-events.mdx b/docs/pages/features/publish-events.mdx index a747e6e8..c153da1a 100644 --- a/docs/pages/features/publish-events.mdx +++ b/docs/pages/features/publish-events.mdx @@ -34,7 +34,13 @@ The events need to follow the given structure: Each event (without a `destination_id`) is evaluated against all the registered destinations. An event is delivered and logged for each eligible destination. -The `metadata` is translated to the destination's native metadata; for instance, with Webhooks, they are translated to HTTP headers. If the destination does not support metadata, the metadata will be included in the event payload. +The `metadata` field in published events is merged with the destination's `delivery_metadata` before being translated to the destination's native format. The merge priority is: system metadata (event-id, topic, timestamp) < destination's delivery_metadata < event metadata. This means event metadata has the highest priority and can override delivery_metadata values. + +For webhook destinations, the merged metadata is translated to HTTP headers. This allows you to: +- Set static headers (like `x-api-key`) on the destination via `delivery_metadata` +- Override or add dynamic headers on a per-event basis via the event's `metadata` field + +If the destination does not support metadata natively, it will be included in the event payload. ## Publishing from a message bus diff --git a/docs/pages/references/api.mdx b/docs/pages/references/api.mdx index fddc1b1e..8498f598 100644 --- a/docs/pages/references/api.mdx +++ b/docs/pages/references/api.mdx @@ -128,14 +128,30 @@ Empty body // Destination type specific credentials. AES encrypted. Schema depends on type "secret": "some***********" }, + "delivery_metadata": { + // Optional. Static key-value pairs automatically merged into event metadata on every delivery + "x-api-key": "sk_live_abc123", + "x-tenant-id": "tenant_456" + }, + "metadata": { + // Optional. Arbitrary contextual information stored with the destination + "name": "Production Webhook Server", + "description": "Main webhook endpoint for order processing" + }, "disabled_at": null, // null or ISO date if disabled "created_at": "2024-01-01T00:00:00Z" // Date the destination was created } ``` -The `topics` array can contain either a list of topics or a wildcard `*` implying that all topics are supported. If you do not wish to implement topics for your application, you set all destination topics to `*`. +#### Field Details + +- **`topics`**: Can contain either a list of topics or a wildcard `*` implying that all topics are supported. If you do not wish to implement topics for your application, set all destination topics to `*`. + +- **`credentials`** (optional): By default all destination credentials are obfuscated and the values cannot be read. This does not apply to the `webhook` type destination secret—each destination type can expose their own obfuscation logic. + +- **`delivery_metadata`** (optional): Static key-value pairs automatically merged into event metadata on every delivery. Values are encrypted like credentials. Useful for destination-specific metadata like API keys, authentication tokens, tenant identifiers, or any custom headers/attributes required by the receiving service. For webhook destinations, these are translated to HTTP headers. The merge priority is: system metadata (event-id, topic, timestamp) < delivery_metadata < event metadata. -By default all destination `credentials` are obfuscated and the values cannot be read. This does not apply to the `webhook` type destination secret and each destination can expose their own obfuscation logic. +- **`metadata`** (optional): Arbitrary contextual information stored with the destination for organizational purposes (e.g., name, description, notes). Not encrypted and not included in event deliveries. Useful for documenting what a destination is used for or adding labels for filtering. ### `GET` `/:tenant_id/destinations` diff --git a/internal/destregistry/baseprovider.go b/internal/destregistry/baseprovider.go index ecf4f613..49d63343 100644 --- a/internal/destregistry/baseprovider.go +++ b/internal/destregistry/baseprovider.go @@ -43,9 +43,11 @@ func NewBaseProvider(loader metadata.MetadataLoader, providerType string, opts . }, nil } -// NewPublisher creates a BasePublisher with provider-configured options -func (p *BaseProvider) NewPublisher() *BasePublisher { - return NewBasePublisher(p.basePublisherOpts...) +// NewPublisher creates a BasePublisher with provider-configured options plus any additional options +func (p *BaseProvider) NewPublisher(additionalOpts ...BasePublisherOption) *BasePublisher { + opts := append([]BasePublisherOption{}, p.basePublisherOpts...) + opts = append(opts, additionalOpts...) + return NewBasePublisher(opts...) } // Metadata returns the provider metadata diff --git a/internal/destregistry/basepublisher.go b/internal/destregistry/basepublisher.go index 79fab90d..98a75339 100644 --- a/internal/destregistry/basepublisher.go +++ b/internal/destregistry/basepublisher.go @@ -14,6 +14,7 @@ type BasePublisher struct { active sync.WaitGroup closed atomic.Bool includeMillisecondTimestamp bool + deliveryMetadata map[string]string } // BasePublisherOption is a functional option for configuring BasePublisher @@ -26,6 +27,15 @@ func WithMillisecondTimestamp(enabled bool) BasePublisherOption { } } +// WithDeliveryMetadata sets static metadata to be merged with every event delivery +func WithDeliveryMetadata(metadata map[string]string) BasePublisherOption { + return func(p *BasePublisher) { + if metadata != nil { + p.deliveryMetadata = metadata + } + } +} + // NewBasePublisher creates a new BasePublisher with the given options func NewBasePublisher(opts ...BasePublisherOption) *BasePublisher { p := &BasePublisher{} @@ -67,10 +77,17 @@ func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) m systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli()) } + // Merge with priority: system < deliveryMetadata < event.Metadata + // Start with system metadata (lowest priority) metadata := make(map[string]string) for k, v := range systemMetadata { metadata[k] = v } + // Merge delivery metadata (can override system metadata) + for k, v := range p.deliveryMetadata { + metadata[k] = v + } + // Merge event metadata (highest priority, can override both) for k, v := range event.Metadata { metadata[k] = v } diff --git a/internal/destregistry/basepublisher_test.go b/internal/destregistry/basepublisher_test.go new file mode 100644 index 00000000..a111e57d --- /dev/null +++ b/internal/destregistry/basepublisher_test.go @@ -0,0 +1,219 @@ +package destregistry_test + +import ( + "testing" + "time" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" +) + +func TestMakeMetadata_WithoutDeliveryMetadata(t *testing.T) { + t.Parallel() + + publisher := destregistry.NewBasePublisher() + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(map[string]string{ + "user-id": "usr_456", + "action": "signup", + }), + ) + timestamp := time.Unix(1609459200, 0) // 2021-01-01 00:00:00 UTC + + metadata := publisher.MakeMetadata(&event, timestamp) + + // System metadata should be present + assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "user.created", metadata["topic"]) + + // Event metadata should be present + assert.Equal(t, "usr_456", metadata["user-id"]) + assert.Equal(t, "signup", metadata["action"]) + + // Should have exactly 5 keys (3 system + 2 event) + assert.Len(t, metadata, 5) +} + +func TestMakeMetadata_WithDeliveryMetadata(t *testing.T) { + t.Parallel() + + publisher := destregistry.NewBasePublisher( + destregistry.WithDeliveryMetadata(map[string]string{ + "app-id": "my-app", + "source": "outpost", + "region": "us-east-1", + }), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(map[string]string{ + "user-id": "usr_456", + }), + ) + timestamp := time.Unix(1609459200, 0) + + metadata := publisher.MakeMetadata(&event, timestamp) + + // System metadata should be present + assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "user.created", metadata["topic"]) + + // Delivery metadata should be present + assert.Equal(t, "my-app", metadata["app-id"]) + assert.Equal(t, "outpost", metadata["source"]) + assert.Equal(t, "us-east-1", metadata["region"]) + + // Event metadata should be present + assert.Equal(t, "usr_456", metadata["user-id"]) + + // Should have 7 keys (3 system + 3 delivery + 1 event) + assert.Len(t, metadata, 7) +} + +func TestMakeMetadata_MergePriority(t *testing.T) { + t.Parallel() + + // Test the merge priority: System < DeliveryMetadata < Event + // Expected behavior: + // - System metadata has lowest priority + // - Delivery metadata can override system metadata + // - Event metadata has highest priority and can override both + + publisher := destregistry.NewBasePublisher( + destregistry.WithDeliveryMetadata(map[string]string{ + "timestamp": "999", // Should override system timestamp + "app-id": "my-app", // New key from delivery metadata + "source": "outpost", // Will be overridden by event metadata + }), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(map[string]string{ + "source": "user-service", // Should override delivery metadata "source" + "user-id": "usr_456", // New key from event metadata + }), + ) + timestamp := time.Unix(1609459200, 0) // System timestamp: 1609459200 + + metadata := publisher.MakeMetadata(&event, timestamp) + + // System metadata + assert.Equal(t, "evt_123", metadata["event-id"], "system event-id should be present") + assert.Equal(t, "user.created", metadata["topic"], "system topic should be present") + + // Delivery metadata should override system timestamp + assert.Equal(t, "999", metadata["timestamp"], "delivery_metadata should override system timestamp") + + // Delivery metadata new keys + assert.Equal(t, "my-app", metadata["app-id"], "delivery_metadata app-id should be present") + + // Event metadata should override delivery metadata source + assert.Equal(t, "user-service", metadata["source"], "event metadata should override delivery_metadata source") + + // Event metadata new keys + assert.Equal(t, "usr_456", metadata["user-id"], "event user-id should be present") + + // Should have 6 keys: event-id, topic, timestamp, app-id, source, user-id + assert.Len(t, metadata, 6) +} + +func TestMakeMetadata_WithMillisecondTimestamp(t *testing.T) { + t.Parallel() + + publisher := destregistry.NewBasePublisher( + destregistry.WithMillisecondTimestamp(true), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + ) + timestamp := time.Unix(1609459200, 123456789) // With nanoseconds + + metadata := publisher.MakeMetadata(&event, timestamp) + + // Should include both timestamp and timestamp-ms + assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "1609459200123", metadata["timestamp-ms"]) +} + +func TestMakeMetadata_WithMillisecondTimestampAndDeliveryMetadata(t *testing.T) { + t.Parallel() + + // Test that delivery_metadata can override millisecond timestamp too + publisher := destregistry.NewBasePublisher( + destregistry.WithMillisecondTimestamp(true), + destregistry.WithDeliveryMetadata(map[string]string{ + "timestamp-ms": "999999999999", // Override the millisecond timestamp + }), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + ) + timestamp := time.Unix(1609459200, 123456789) + + metadata := publisher.MakeMetadata(&event, timestamp) + + // Delivery metadata should override system timestamp-ms + assert.Equal(t, "999999999999", metadata["timestamp-ms"]) +} + +func TestMakeMetadata_EmptyDeliveryMetadata(t *testing.T) { + t.Parallel() + + // Test with empty delivery metadata (should behave same as without) + publisher := destregistry.NewBasePublisher( + destregistry.WithDeliveryMetadata(map[string]string{}), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(nil), // Explicitly set to nil + ) + timestamp := time.Unix(1609459200, 0) + + metadata := publisher.MakeMetadata(&event, timestamp) + + // Should only have system metadata + assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "user.created", metadata["topic"]) + assert.Len(t, metadata, 3) +} + +func TestMakeMetadata_NilEventMetadata(t *testing.T) { + t.Parallel() + + // Test with nil event metadata + publisher := destregistry.NewBasePublisher( + destregistry.WithDeliveryMetadata(map[string]string{ + "app-id": "my-app", + }), + ) + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(nil), // Explicitly set to nil + ) + timestamp := time.Unix(1609459200, 0) + + metadata := publisher.MakeMetadata(&event, timestamp) + + // System metadata + assert.Equal(t, "1609459200", metadata["timestamp"]) + assert.Equal(t, "evt_123", metadata["event-id"]) + assert.Equal(t, "user.created", metadata["topic"]) + + // Delivery metadata + assert.Equal(t, "my-app", metadata["app-id"]) + + // Should have 4 keys (3 system + 1 delivery) + assert.Len(t, metadata, 4) +} diff --git a/internal/destregistry/providers/destawskinesis/destawskinesis.go b/internal/destregistry/providers/destawskinesis/destawskinesis.go index 042b0630..12bfdff0 100644 --- a/internal/destregistry/providers/destawskinesis/destawskinesis.go +++ b/internal/destregistry/providers/destawskinesis/destawskinesis.go @@ -102,7 +102,7 @@ func (p *AWSKinesisProvider) CreatePublisher(ctx context.Context, destination *m }) return &AWSKinesisPublisher{ - BasePublisher: p.BaseProvider.NewPublisher(), + BasePublisher: p.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), client: kinesisClient, streamName: config.StreamName, partitionKeyTemplate: config.PartitionKeyTemplate, diff --git a/internal/destregistry/providers/destawss3/destawss3.go b/internal/destregistry/providers/destawss3/destawss3.go index b1b12b11..72abb7ff 100644 --- a/internal/destregistry/providers/destawss3/destawss3.go +++ b/internal/destregistry/providers/destawss3/destawss3.go @@ -102,7 +102,7 @@ func (p *AWSS3Provider) CreatePublisher(ctx context.Context, destination *models } return NewAWSS3Publisher( - p.BaseProvider.NewPublisher(), + p.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), client, cfg.Bucket, cfg.KeyTemplate, diff --git a/internal/destregistry/providers/destawssqs/destawssqs.go b/internal/destregistry/providers/destawssqs/destawssqs.go index 95aa5ec9..a22ce033 100644 --- a/internal/destregistry/providers/destawssqs/destawssqs.go +++ b/internal/destregistry/providers/destawssqs/destawssqs.go @@ -90,7 +90,7 @@ func (p *AWSSQSDestination) CreatePublisher(ctx context.Context, destination *mo }) return &AWSSQSPublisher{ - BasePublisher: p.BaseProvider.NewPublisher(), + BasePublisher: p.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), client: sqsClient, queueURL: cfg.QueueURL, }, nil diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 9e779b9d..31667e80 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -50,7 +50,7 @@ func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destin } return &AzureServiceBusPublisher{ - BasePublisher: d.BaseProvider.NewPublisher(), + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), connectionString: creds.ConnectionString, queueOrTopic: cfg.Name, }, nil diff --git a/internal/destregistry/providers/destgcppubsub/destgcppubsub.go b/internal/destregistry/providers/destgcppubsub/destgcppubsub.go index 5dd4c81d..07086ee7 100644 --- a/internal/destregistry/providers/destgcppubsub/destgcppubsub.go +++ b/internal/destregistry/providers/destgcppubsub/destgcppubsub.go @@ -81,7 +81,7 @@ func (d *GCPPubSubDestination) CreatePublisher(ctx context.Context, destination topic := client.Topic(cfg.Topic) return &GCPPubSubPublisher{ - BasePublisher: d.BaseProvider.NewPublisher(), + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), client: client, topic: topic, projectID: cfg.ProjectID, diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck.go b/internal/destregistry/providers/desthookdeck/desthookdeck.go index 08386a82..9780b191 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck.go @@ -160,7 +160,7 @@ func (p *HookdeckProvider) CreatePublisher(ctx context.Context, destination *mod // Create publisher with base publisher from provider publisher := &HookdeckPublisher{ - BasePublisher: p.BaseProvider.NewPublisher(), + BasePublisher: p.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), tokenString: tokenString, parsedToken: parsedToken, client: client, diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go index 98040f56..da1b9861 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go @@ -65,7 +65,7 @@ func (d *RabbitMQDestination) CreatePublisher(ctx context.Context, destination * return nil, err } return &RabbitMQPublisher{ - BasePublisher: d.BaseProvider.NewPublisher(), + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), url: rabbitURL(config, credentials), exchange: config.Exchange, }, nil diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index f7a120fa..91aa61ce 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -234,7 +234,7 @@ func (d *WebhookDestination) CreatePublisher(ctx context.Context, destination *m } return &WebhookPublisher{ - BasePublisher: d.BaseProvider.NewPublisher(), + BasePublisher: d.BaseProvider.NewPublisher(destregistry.WithDeliveryMetadata(destination.DeliveryMetadata)), httpClient: httpClient, url: config.URL, headerPrefix: d.headerPrefix, diff --git a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go index 0f0da10b..8614a41b 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go @@ -430,6 +430,74 @@ func TestWebhookPublisher_DisableDefaultHeaders(t *testing.T) { } } +func TestWebhookPublisher_DeliveryMetadata(t *testing.T) { + t.Parallel() + + consumer := NewWebhookConsumer("x-outpost-") + defer consumer.Close() + + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("webhook"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "url": consumer.server.URL + "/webhook", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "secret": "test-secret", + }), + testutil.DestinationFactory.WithDeliveryMetadata(map[string]string{ + "app-id": "my-app", + "source": "outpost-delivery", + "x-api-key": "secret-api-key", + "timestamp": "999", // Should override system timestamp + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithID("evt_123"), + testutil.EventFactory.WithTopic("user.created"), + testutil.EventFactory.WithMetadata(map[string]string{ + "user-id": "usr_456", + "source": "user-service", // Should override delivery_metadata source + }), + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + _, err = publisher.Publish(context.Background(), &event) + require.NoError(t, err) + + select { + case msg := <-consumer.Consume(): + req := msg.Raw.(*http.Request) + + // Verify delivery_metadata headers are present + assert.Equal(t, "my-app", req.Header.Get("x-outpost-app-id"), "app-id from delivery_metadata should be present") + assert.Equal(t, "secret-api-key", req.Header.Get("x-outpost-x-api-key"), "x-api-key from delivery_metadata should be present") + + // Verify merge priority: delivery_metadata overrides system timestamp + assert.Equal(t, "999", req.Header.Get("x-outpost-timestamp"), "delivery_metadata timestamp should override system timestamp") + + // Verify merge priority: event metadata overrides delivery_metadata source + assert.Equal(t, "user-service", req.Header.Get("x-outpost-source"), "event metadata source should override delivery_metadata source") + + // Verify system metadata still present + assert.Equal(t, "evt_123", req.Header.Get("x-outpost-event-id")) + assert.Equal(t, "user.created", req.Header.Get("x-outpost-topic")) + + // Verify event metadata present + assert.Equal(t, "usr_456", req.Header.Get("x-outpost-user-id")) + + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for webhook delivery") + } +} + func TestWebhookPublisher_SignatureTemplates(t *testing.T) { dest := testutil.DestinationFactory.Any( testutil.DestinationFactory.WithType("webhook"), diff --git a/internal/destregistry/testing/publisher_suite.go b/internal/destregistry/testing/publisher_suite.go index ad8f3316..92da4f57 100644 --- a/internal/destregistry/testing/publisher_suite.go +++ b/internal/destregistry/testing/publisher_suite.go @@ -145,9 +145,19 @@ func (s *PublisherSuite) verifyMessage(msg Message, event models.Event) { s.Require().NoError(err, "failed to marshal message data") s.Require().JSONEq(string(eventDataJSON), string(msgDataJSON), "message data mismatch") - // Verify that expected metadata is a subset of received metadata + // Verify that system metadata is present (these should always be included) + s.Require().NotEmpty(msg.Metadata["timestamp"], "system metadata 'timestamp' should be present") + s.Require().Equal(event.ID, msg.Metadata["event-id"], "system metadata 'event-id' should match") + s.Require().Equal(event.Topic, msg.Metadata["topic"], "system metadata 'topic' should match") + + // Verify that delivery_metadata is present + for k, v := range s.dest.DeliveryMetadata { + s.Require().Equal(v, msg.Metadata[k], "delivery_metadata key %s should be present", k) + } + + // Verify that expected event metadata is a subset of received metadata for k, v := range event.Metadata { - s.Require().Equal(v, msg.Metadata[k], "metadata key %s should match expected value", k) + s.Require().Equal(v, msg.Metadata[k], "event metadata key %s should match expected value", k) } // Provider-specific assertions if available @@ -177,6 +187,44 @@ func (s *PublisherSuite) TestBasicPublish() { } } +func (s *PublisherSuite) TestPublishWithDeliveryMetadata() { + // Create a new destination with delivery_metadata + destWithMetadata := *s.dest + destWithMetadata.DeliveryMetadata = map[string]string{ + "app-id": "test-app", + "source": "delivery-source", + "region": "us-east-1", + } + + // Create a new publisher with the delivery_metadata + pub, err := s.provider.CreatePublisher(context.Background(), &destWithMetadata) + s.Require().NoError(err) + defer pub.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{ + "test_key": "test_value", + }), + testutil.EventFactory.WithMetadata(map[string]string{ + "meta_key": "meta_value", + }), + ) + + _, err = pub.Publish(context.Background(), &event) + s.Require().NoError(err) + + select { + case msg := <-s.consumer.Consume(): + // Temporarily swap dest for verification + originalDest := s.dest + s.dest = &destWithMetadata + s.verifyMessage(msg, event) + s.dest = originalDest + case <-time.After(5 * time.Second): + s.Fail("timeout waiting for message") + } +} + func (s *PublisherSuite) TestConcurrentPublish() { const numMessages = 10 var wg sync.WaitGroup diff --git a/internal/models/destination.go b/internal/models/destination.go index 1748f42e..60ed9043 100644 --- a/internal/models/destination.go +++ b/internal/models/destination.go @@ -19,14 +19,16 @@ var ( ) type Destination struct { - ID string `json:"id" redis:"id"` - TenantID string `json:"tenant_id" redis:"-"` - Type string `json:"type" redis:"type"` - Topics Topics `json:"topics" redis:"-"` - Config Config `json:"config" redis:"-"` - Credentials Credentials `json:"credentials" redis:"-"` - CreatedAt time.Time `json:"created_at" redis:"created_at"` - DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` + ID string `json:"id" redis:"id"` + TenantID string `json:"tenant_id" redis:"-"` + Type string `json:"type" redis:"type"` + Topics Topics `json:"topics" redis:"-"` + Config Config `json:"config" redis:"-"` + Credentials Credentials `json:"credentials" redis:"-"` + DeliveryMetadata DeliveryMetadata `json:"delivery_metadata,omitempty" redis:"-"` + Metadata Metadata `json:"metadata,omitempty" redis:"-"` + CreatedAt time.Time `json:"created_at" redis:"created_at"` + DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` } func (d *Destination) parseRedisHash(cmd *redis.MapStringStringCmd, cipher Cipher) error { @@ -60,6 +62,24 @@ func (d *Destination) parseRedisHash(cmd *redis.MapStringStringCmd, cipher Ciphe if err != nil { return fmt.Errorf("invalid credentials: %w", err) } + // Decrypt and deserialize delivery_metadata if present + if deliveryMetadataStr, exists := hash["delivery_metadata"]; exists && deliveryMetadataStr != "" { + deliveryMetadataBytes, err := cipher.Decrypt([]byte(deliveryMetadataStr)) + if err != nil { + return fmt.Errorf("invalid delivery_metadata: %w", err) + } + err = d.DeliveryMetadata.UnmarshalBinary(deliveryMetadataBytes) + if err != nil { + return fmt.Errorf("invalid delivery_metadata: %w", err) + } + } + // Deserialize metadata if present + if metadataStr, exists := hash["metadata"]; exists && metadataStr != "" { + err = d.Metadata.UnmarshalBinary([]byte(metadataStr)) + if err != nil { + return fmt.Errorf("invalid metadata: %w", err) + } + } return nil } @@ -166,6 +186,7 @@ func TopicsFromString(s string) Topics { type Config = MapStringString type Credentials = MapStringString +type DeliveryMetadata = MapStringString type MapStringString map[string]string var _ encoding.BinaryMarshaler = &MapStringString{} diff --git a/internal/models/destination_test.go b/internal/models/destination_test.go index 11cc0316..4eed9328 100644 --- a/internal/models/destination_test.go +++ b/internal/models/destination_test.go @@ -1,6 +1,7 @@ package models_test import ( + "encoding/json" "fmt" "testing" @@ -84,3 +85,96 @@ func TestDestinationTopics_Validate(t *testing.T) { }) } } + +func TestDestination_JSONMarshalWithDeliveryMetadata(t *testing.T) { + t.Parallel() + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_123"), + testutil.DestinationFactory.WithType("webhook"), + testutil.DestinationFactory.WithTopics([]string{"user.created"}), + testutil.DestinationFactory.WithConfig(map[string]string{"url": "https://example.com"}), + testutil.DestinationFactory.WithDeliveryMetadata(map[string]string{ + "app-id": "my-app", + "source": "outpost", + "custom-key": "custom-value", + }), + testutil.DestinationFactory.WithMetadata(map[string]string{ + "description": "Production webhook", + "team": "platform", + }), + ) + + // Marshal to JSON + jsonBytes, err := json.Marshal(destination) + assert.NoError(t, err) + + // Unmarshal back + var unmarshaled models.Destination + err = json.Unmarshal(jsonBytes, &unmarshaled) + assert.NoError(t, err) + + // Verify new fields are preserved + assert.Equal(t, destination.DeliveryMetadata, unmarshaled.DeliveryMetadata) + assert.Equal(t, "my-app", unmarshaled.DeliveryMetadata["app-id"]) + assert.Equal(t, "outpost", unmarshaled.DeliveryMetadata["source"]) + assert.Equal(t, "custom-value", unmarshaled.DeliveryMetadata["custom-key"]) + + assert.Equal(t, destination.Metadata, unmarshaled.Metadata) + assert.Equal(t, "Production webhook", unmarshaled.Metadata["description"]) + assert.Equal(t, "platform", unmarshaled.Metadata["team"]) + + // Verify existing fields still work + assert.Equal(t, destination.ID, unmarshaled.ID) + assert.Equal(t, destination.Type, unmarshaled.Type) + assert.Equal(t, destination.Topics, unmarshaled.Topics) +} + +func TestDestination_JSONMarshalWithoutNewFields(t *testing.T) { + t.Parallel() + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_123"), + testutil.DestinationFactory.WithType("webhook"), + testutil.DestinationFactory.WithTopics([]string{"user.created"}), + ) + + // Marshal to JSON + jsonBytes, err := json.Marshal(destination) + assert.NoError(t, err) + + // Unmarshal back + var unmarshaled models.Destination + err = json.Unmarshal(jsonBytes, &unmarshaled) + assert.NoError(t, err) + + // Verify new fields are nil when not provided + assert.Nil(t, unmarshaled.DeliveryMetadata) + assert.Nil(t, unmarshaled.Metadata) +} + +func TestDestination_JSONUnmarshalEmptyMaps(t *testing.T) { + t.Parallel() + + jsonData := `{ + "id": "dest_123", + "tenant_id": "tenant_1", + "type": "webhook", + "topics": ["user.created"], + "config": {}, + "credentials": {}, + "delivery_metadata": {}, + "metadata": {}, + "created_at": "2024-01-01T00:00:00Z" + }` + + var destination models.Destination + err := json.Unmarshal([]byte(jsonData), &destination) + assert.NoError(t, err) + + // Empty maps should be preserved as empty, not nil + assert.NotNil(t, destination.DeliveryMetadata) + assert.Empty(t, destination.DeliveryMetadata) + assert.NotNil(t, destination.Metadata) + assert.Empty(t, destination.Metadata) +} diff --git a/internal/models/entity.go b/internal/models/entity.go index a3c2f708..2821359f 100644 --- a/internal/models/entity.go +++ b/internal/models/entity.go @@ -296,7 +296,7 @@ func (s *entityStoreImpl) CreateDestination(ctx context.Context, destination Des func (s *entityStoreImpl) UpsertDestination(ctx context.Context, destination Destination) error { key := s.redisDestinationID(destination.ID, destination.TenantID) - // Pre-marshal and encrypt credentials BEFORE starting Redis transaction + // Pre-marshal and encrypt credentials and delivery_metadata BEFORE starting Redis transaction // This isolates marshaling failures from Redis transaction failures credentialsBytes, err := destination.Credentials.MarshalBinary() if err != nil { @@ -307,6 +307,19 @@ func (s *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des return fmt.Errorf("failed to encrypt destination credentials: %w", err) } + // Encrypt delivery_metadata if present (contains sensitive data like auth tokens) + var encryptedDeliveryMetadata []byte + if destination.DeliveryMetadata != nil { + deliveryMetadataBytes, err := destination.DeliveryMetadata.MarshalBinary() + if err != nil { + return fmt.Errorf("invalid destination delivery_metadata: %w", err) + } + encryptedDeliveryMetadata, err = s.cipher.Encrypt(deliveryMetadataBytes) + if err != nil { + return fmt.Errorf("failed to encrypt destination delivery_metadata: %w", err) + } + } + // All keys use same tenant prefix - cluster compatible transaction summaryKey := s.redisTenantDestinationSummaryKey(destination.TenantID) @@ -329,6 +342,20 @@ func (s *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des pipe.HDel(ctx, key, "disabled_at") } + // Store encrypted delivery_metadata if present + if destination.DeliveryMetadata != nil { + pipe.HSet(ctx, key, "delivery_metadata", encryptedDeliveryMetadata) + } else { + pipe.HDel(ctx, key, "delivery_metadata") + } + + // Store metadata if present + if destination.Metadata != nil { + pipe.HSet(ctx, key, "metadata", &destination.Metadata) + } else { + pipe.HDel(ctx, key, "metadata") + } + // Update summary atomically pipe.HSet(ctx, summaryKey, destination.ID, destination.ToSummary()) return nil diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index b4221a28..2c8673f1 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -25,12 +25,12 @@ func TestEntityStore_WithDeploymentID(t *testing.T) { suite.Run(t, &EntityTestSuite{deploymentID: "dp_test_001"}) } -// TestDestinationCredentialsEncryption verifies that credentials are properly encrypted -// when stored in Redis. +// TestDestinationCredentialsEncryption verifies that credentials and delivery_metadata +// are properly encrypted when stored in Redis. // // NOTE: This test accesses Redis implementation details directly to verify encryption. // While this couples the test to the storage implementation, it's necessary to confirm -// that credentials are actually encrypted at rest. +// that sensitive fields are actually encrypted at rest. func TestDestinationCredentialsEncryption(t *testing.T) { t.Parallel() @@ -54,6 +54,10 @@ func TestDestinationCredentialsEncryption(t *testing.T) { "username": "guest", "password": "guest", }), + testutil.DestinationFactory.WithDeliveryMetadata(map[string]string{ + "Authorization": "Bearer secret-token", + "X-API-Key": "sensitive-key", + }), ) err := entityStore.UpsertDestination(ctx, input) @@ -67,11 +71,20 @@ func TestDestinationCredentialsEncryption(t *testing.T) { // Verify credentials are encrypted (not plaintext) assert.NotEqual(t, input.Credentials, actual["credentials"]) - // Verify we can decrypt back to original + // Verify we can decrypt credentials back to original decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) require.NoError(t, err) jsonCredentials, _ := json.Marshal(input.Credentials) assert.Equal(t, string(jsonCredentials), string(decryptedCredentials)) + + // Verify delivery_metadata is encrypted (not plaintext) + assert.NotEqual(t, input.DeliveryMetadata, actual["delivery_metadata"]) + + // Verify we can decrypt delivery_metadata back to original + decryptedDeliveryMetadata, err := cipher.Decrypt([]byte(actual["delivery_metadata"])) + require.NoError(t, err) + jsonDeliveryMetadata, _ := json.Marshal(input.DeliveryMetadata) + assert.Equal(t, string(jsonDeliveryMetadata), string(decryptedDeliveryMetadata)) } // TestMaxDestinationsPerTenant verifies that the entity store properly enforces diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go index 6991a30f..6486d5d8 100644 --- a/internal/models/entitysuite_test.go +++ b/internal/models/entitysuite_test.go @@ -22,6 +22,8 @@ func assertEqualDestination(t *testing.T, expected, actual models.Destination) { assert.Equal(t, expected.Topics, actual.Topics) assert.Equal(t, expected.Config, actual.Config) assert.Equal(t, expected.Credentials, actual.Credentials) + assert.Equal(t, expected.DeliveryMetadata, actual.DeliveryMetadata) + assert.Equal(t, expected.Metadata, actual.Metadata) assert.True(t, cmp.Equal(expected.CreatedAt, actual.CreatedAt)) assert.True(t, cmp.Equal(expected.DisabledAt, actual.DisabledAt)) } @@ -131,6 +133,14 @@ func (s *EntityTestSuite) TestDestinationCRUD() { "username": "guest", "password": "guest", }, + DeliveryMetadata: map[string]string{ + "app-id": "test-app", + "source": "outpost", + }, + Metadata: map[string]string{ + "environment": "test", + "team": "platform", + }, CreatedAt: time.Now(), DisabledAt: nil, TenantID: idgen.String(), @@ -155,6 +165,13 @@ func (s *EntityTestSuite) TestDestinationCRUD() { t.Run("updates", func(t *testing.T) { input.Topics = []string{"*"} + input.DeliveryMetadata = map[string]string{ + "app-id": "updated-app", + "version": "2.0", + } + input.Metadata = map[string]string{ + "environment": "staging", + } err := s.entityStore.UpsertDestination(s.ctx, input) require.NoError(s.T(), err) @@ -188,6 +205,22 @@ func (s *EntityTestSuite) TestDestinationCRUD() { // cleanup require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, input.TenantID, input.ID)) }) + + t.Run("handles nil delivery_metadata and metadata", func(t *testing.T) { + // Factory defaults to nil for DeliveryMetadata and Metadata + inputWithNilFields := testutil.DestinationFactory.Any() + + err := s.entityStore.CreateDestination(s.ctx, inputWithNilFields) + require.NoError(s.T(), err) + + actual, err := s.entityStore.RetrieveDestination(s.ctx, inputWithNilFields.TenantID, inputWithNilFields.ID) + require.NoError(s.T(), err) + assert.Nil(t, actual.DeliveryMetadata) + assert.Nil(t, actual.Metadata) + + // cleanup + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, inputWithNilFields.TenantID, inputWithNilFields.ID)) + }) } func (s *EntityTestSuite) TestListDestinationEmpty() { diff --git a/internal/models/event.go b/internal/models/event.go index 9593cc61..e0f545f0 100644 --- a/internal/models/event.go +++ b/internal/models/event.go @@ -33,6 +33,7 @@ func (d *Data) UnmarshalBinary(data []byte) error { type Metadata map[string]string var _ fmt.Stringer = &Metadata{} +var _ encoding.BinaryMarshaler = &Metadata{} var _ encoding.BinaryUnmarshaler = &Metadata{} func (m *Metadata) String() string { @@ -43,6 +44,10 @@ func (m *Metadata) String() string { return string(metadata) } +func (m *Metadata) MarshalBinary() ([]byte, error) { + return json.Marshal(m) +} + func (m *Metadata) UnmarshalBinary(metadata []byte) error { if string(metadata) == "" { return nil diff --git a/internal/services/api/destination_handlers.go b/internal/services/api/destination_handlers.go index ba581e03..90dfc3d4 100644 --- a/internal/services/api/destination_handlers.go +++ b/internal/services/api/destination_handlers.go @@ -169,6 +169,12 @@ func (h *DestinationHandlers) Update(c *gin.Context) { shouldRevalidate = true updatedDestination.Credentials = maputil.MergeStringMaps(originalDestination.Credentials, input.Credentials) } + if input.DeliveryMetadata != nil { + updatedDestination.DeliveryMetadata = maputil.MergeStringMaps(originalDestination.DeliveryMetadata, input.DeliveryMetadata) + } + if input.Metadata != nil { + updatedDestination.Metadata = maputil.MergeStringMaps(originalDestination.Metadata, input.Metadata) + } // Always preprocess before updating if err := h.registry.PreprocessDestination(&updatedDestination, originalDestination, &destregistry.PreprocessDestinationOpts{ @@ -310,11 +316,13 @@ func (h *DestinationHandlers) handleUpsertDestinationError(c *gin.Context, err e // ===== Requests ===== type CreateDestinationRequest struct { - ID string `json:"id" binding:"-"` - Type string `json:"type" binding:"required"` - Topics models.Topics `json:"topics" binding:"required"` - Config models.Config `json:"config" binding:"-"` - Credentials models.Credentials `json:"credentials" binding:"-"` + ID string `json:"id" binding:"-"` + Type string `json:"type" binding:"required"` + Topics models.Topics `json:"topics" binding:"required"` + Config models.Config `json:"config" binding:"-"` + Credentials models.Credentials `json:"credentials" binding:"-"` + DeliveryMetadata models.DeliveryMetadata `json:"delivery_metadata,omitempty" binding:"-"` + Metadata models.Metadata `json:"metadata,omitempty" binding:"-"` } func (r *CreateDestinationRequest) ToDestination(tenantID string) models.Destination { @@ -329,22 +337,26 @@ func (r *CreateDestinationRequest) ToDestination(tenantID string) models.Destina } return models.Destination{ - ID: r.ID, - Type: r.Type, - Topics: r.Topics, - Config: r.Config, - Credentials: r.Credentials, - CreatedAt: time.Now(), - DisabledAt: nil, - TenantID: tenantID, + ID: r.ID, + Type: r.Type, + Topics: r.Topics, + Config: r.Config, + Credentials: r.Credentials, + DeliveryMetadata: r.DeliveryMetadata, + Metadata: r.Metadata, + CreatedAt: time.Now(), + DisabledAt: nil, + TenantID: tenantID, } } type UpdateDestinationRequest struct { - Type string `json:"type" binding:"-"` - Topics models.Topics `json:"topics" binding:"-"` - Config models.Config `json:"config" binding:"-"` - Credentials models.Credentials `json:"credentials" binding:"-"` + Type string `json:"type" binding:"-"` + Topics models.Topics `json:"topics" binding:"-"` + Config models.Config `json:"config" binding:"-"` + Credentials models.Credentials `json:"credentials" binding:"-"` + DeliveryMetadata models.DeliveryMetadata `json:"delivery_metadata,omitempty" binding:"-"` + Metadata models.Metadata `json:"metadata,omitempty" binding:"-"` } func mustRoleFromContext(c *gin.Context) string { diff --git a/internal/util/testutil/destination.go b/internal/util/testutil/destination.go index ae47d6f2..05631dc4 100644 --- a/internal/util/testutil/destination.go +++ b/internal/util/testutil/destination.go @@ -80,3 +80,15 @@ func (f *mockDestinationFactory) WithDisabledAt(disabledAt time.Time) func(*mode destination.DisabledAt = &disabledAt } } + +func (f *mockDestinationFactory) WithDeliveryMetadata(deliveryMetadata map[string]string) func(*models.Destination) { + return func(destination *models.Destination) { + destination.DeliveryMetadata = deliveryMetadata + } +} + +func (f *mockDestinationFactory) WithMetadata(metadata map[string]string) func(*models.Destination) { + return func(destination *models.Destination) { + destination.Metadata = metadata + } +}