Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
TEST?=$$(go list ./...)
RUN?=

# Build targets
.PHONY: build
Expand Down Expand Up @@ -122,13 +123,17 @@ test/setup:
@echo ""

test:
go test $(TEST) $(TESTARGS)
@if [ "$(RUN)" != "" ]; then \
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "$(RUN)"; \
else \
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS); \
fi

test/unit:
go test $(TEST) $(TESTARGS) -short
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -short

test/integration:
go test $(TEST) $(TESTARGS) -run "Integration"
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "Integration"

test/e2e/rediscluster:
@echo "Running Redis cluster e2e tests in Docker container..."
Expand Down
8 changes: 5 additions & 3 deletions cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ const (
)

type BasicOpts struct {
LogStorage LogStorageType
RedisConfig *redis.RedisConfig // Optional Redis config override
LogStorage LogStorageType
RedisConfig *redis.RedisConfig // Optional Redis config override
DeploymentID string // Optional deployment ID for multi-tenancy testing
}

func Basic(t *testing.T, opts BasicOpts) config.Config {
Expand Down Expand Up @@ -83,6 +84,7 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
c.RetryMaxLimit = 3
c.LogBatchThresholdSeconds = 1
c.LogBatchSize = 100
c.DeploymentID = opts.DeploymentID

// Setup cleanup
t.Cleanup(func() {
Expand Down Expand Up @@ -132,7 +134,7 @@ func CreateRedisClusterConfig(t *testing.T) *redis.RedisConfig {
// Test Redis connection before returning
t.Logf("Testing Redis cluster connection to %s:%d", redisHost, redisPort)
testCtx := context.Background()
_, err := redis.NewClient(testCtx, redisConfig)
_, err := redis.New(testCtx, redisConfig)
if err != nil {
t.Fatalf("Failed to create Redis client: %v", err)
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/e2e/suites_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type basicSuite struct {
e2eSuite
logStorageType configs.LogStorageType
redisConfig *redis.RedisConfig // Optional Redis config override
deploymentID string // Optional deployment ID
alertServer *alert.AlertMockServer
}

Expand All @@ -143,8 +144,9 @@ func (suite *basicSuite) SetupSuite() {

// Configure alert callback URL
cfg := configs.Basic(t, configs.BasicOpts{
LogStorage: suite.logStorageType,
RedisConfig: suite.redisConfig,
LogStorage: suite.logStorageType,
RedisConfig: suite.redisConfig,
DeploymentID: suite.deploymentID,
})
cfg.Alert.CallbackURL = alertServer.GetCallbackURL()

Expand Down Expand Up @@ -204,3 +206,15 @@ func TestRedisClusterBasicSuite(t *testing.T) {
redisConfig: redisConfig,
})
}

func TestBasicSuiteWithDeploymentID(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping e2e test")
}

suite.Run(t, &basicSuite{
logStorageType: configs.LogStorageTypePostgres,
deploymentID: "dp_e2e_test",
})
}
12 changes: 9 additions & 3 deletions cmd/outpost-migrate-redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,15 @@ Migrates Redis keys from legacy format to hash-tagged format for Redis Cluster c
**Purpose:** Ensures all keys for a tenant are routed to the same Redis Cluster node by using hash tags.

**Key Transformations:**
- `tenant:<id>:*` → `{tenant:<id>}:*`
- `destination_summary:<tenant>:<dest>` → `{tenant:<tenant>}:destination_summary:<dest>`
- Individual destination keys are properly hash-tagged by tenant
- `tenant:123` → `tenant:{123}:tenant`
- `tenant:123:destinations` → `tenant:{123}:destinations`
- `tenant:123:destination:abc` → `tenant:{123}:destination:abc`

**Deployment Mode Note:** If you are using `DEPLOYMENT_ID` configuration, this migration is **not needed**. Deployment-scoped keys already include hash tags:
- `dp_001:tenant:{123}:tenant` (already has hash tags)
- `dp_001:tenant:{123}:destinations` (already has hash tags)

See [001_hash_tags/README.md](./migration/001_hash_tags/README.md) for details.

**Safety:** This migration preserves original keys. Use the cleanup command after verification to remove old keys.

Expand Down
41 changes: 40 additions & 1 deletion cmd/outpost-migrate-redis/migration/001_hash_tags/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,47 @@ After verification, removes all legacy keys:
- Requires confirmation unless `-force` flag is used
- Processes deletions in batches of 100 keys

## Deployment Mode Compatibility

### When This Migration is NOT Needed

If you are using the `DEPLOYMENT_ID` configuration option (or `deployment_id` in YAML), **you can skip this migration entirely**. Deployments using deployment IDs already have keys in the correct format:

```
dp_001:tenant:{123}:tenant
dp_001:tenant:{123}:destinations
dp_001:tenant:{123}:destination:abc
```

These keys already include hash tags `{123}` and are Redis Cluster compatible.

### When This Migration IS Needed

This migration is only required for legacy deployments that:
1. Started before hash tag support was added
2. Are **NOT** using `DEPLOYMENT_ID` configuration
3. Have keys in the old format without curly braces:
```
tenant:123
tenant:123:destinations
tenant:123:destination:abc
```

### Checking If You Need This Migration

Run the migration planner to check:
```bash
outpost-migrate-redis plan
```

If the output shows `0 tenants to migrate`, your deployment either:
- Already has hash tags (you're good!)
- Is using deployment IDs (you're good!)
- Has no data yet (you're good!)

## Notes

- Original keys are preserved during Apply phase for rollback safety
- Migration is idempotent - can be run multiple times safely
- Skips tenants that are already migrated
- Skips tenants that are already migrated
- Does not touch deployment-prefixed keys (e.g., `dp_001:*`)
12 changes: 11 additions & 1 deletion cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@ import (
"github.com/hookdeck/outpost/internal/redis"
)

// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format ({tenant}:*)
// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format (tenant:{ID}:*)
//
// NOTE: This migration only handles non-deployment-prefixed keys.
// If you are using DEPLOYMENT_ID configuration, your keys already have the correct format
// with deployment prefixes (dp_001:tenant:{TENANT_ID}:*) and hash tags are already
// in place. In that case, this migration can be safely skipped.
//
// This migration is only needed for legacy deployments that:
// - Started before hash tag support was added
// - Are NOT using DEPLOYMENT_ID configuration
// - Have keys in the old format: tenant:ID:* (without curly braces)
type HashTagsMigration struct {
client redis.Client
logger migration.Logger
Expand Down
4 changes: 4 additions & 0 deletions docs/pages/references/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
| `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No |
| `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No |
| `DESTINATIONS_AWS_KINESIS_METADATA_IN_PAYLOAD` | If true, includes Outpost metadata (event ID, topic, etc.) within the Kinesis record payload. | `true` | No |
| `DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP` | If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging. | `false` | No |
| `DESTINATIONS_METADATA_PATH` | Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set. | `config/outpost/destinations` | No |
Expand Down Expand Up @@ -163,6 +164,9 @@ delivery_max_concurrency: 1
# Timeout in seconds for HTTP requests made during event delivery to webhook destinations.
delivery_timeout_seconds: 5

# Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation.
deployment_id: ""

# Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set.
destination_metadata_path: ""

Expand Down
20 changes: 14 additions & 6 deletions internal/alert/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func WithLogger(logger *logging.Logger) AlertOption {
}
}

// WithDeploymentID sets the deployment ID for the monitor
func WithDeploymentID(deploymentID string) AlertOption {
return func(m *alertMonitor) {
m.deploymentID = deploymentID
}
}

// DeliveryAttempt represents a single delivery attempt
type DeliveryAttempt struct {
Success bool
Expand All @@ -83,11 +90,12 @@ type DeliveryAttempt struct {
}

type alertMonitor struct {
logger *logging.Logger
store AlertStore
evaluator AlertEvaluator
notifier AlertNotifier
disabler DestinationDisabler
logger *logging.Logger
store AlertStore
evaluator AlertEvaluator
notifier AlertNotifier
disabler DestinationDisabler
deploymentID string

// autoDisableFailureCount is the number of consecutive failures before auto-disabling
autoDisableFailureCount int
Expand Down Expand Up @@ -119,7 +127,7 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ...
}

if alertMonitor.store == nil {
alertMonitor.store = NewRedisAlertStore(redisClient)
alertMonitor.store = NewRedisAlertStore(redisClient, alertMonitor.deploymentID)
}

if alertMonitor.evaluator == nil {
Expand Down
19 changes: 15 additions & 4 deletions internal/alert/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ type AlertStore interface {
}

type redisAlertStore struct {
client redis.Cmdable
client redis.Cmdable
deploymentID string
}

// NewRedisAlertStore creates a new Redis-backed alert store
func NewRedisAlertStore(client redis.Cmdable) AlertStore {
return &redisAlertStore{client: client}
func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore {
return &redisAlertStore{
client: client,
deploymentID: deploymentID,
}
}

func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) {
Expand Down Expand Up @@ -57,6 +61,13 @@ func (s *redisAlertStore) ResetConsecutiveFailureCount(ctx context.Context, tena
return s.client.Del(ctx, s.getFailuresKey(destinationID)).Err()
}

func (s *redisAlertStore) deploymentPrefix() string {
if s.deploymentID == "" {
return ""
}
return fmt.Sprintf("%s:", s.deploymentID)
}

func (s *redisAlertStore) getFailuresKey(destinationID string) string {
return fmt.Sprintf("%s:%s:%s", keyPrefixAlert, destinationID, keyFailures)
return fmt.Sprintf("%s%s:%s:%s", s.deploymentPrefix(), keyPrefixAlert, destinationID, keyFailures)
}
77 changes: 75 additions & 2 deletions internal/alert/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestRedisAlertStore(t *testing.T) {
t.Run("increment consecutive failures", func(t *testing.T) {
t.Parallel()
redisClient := testutil.CreateTestRedisClient(t)
store := alert.NewRedisAlertStore(redisClient)
store := alert.NewRedisAlertStore(redisClient, "")

// First increment
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
Expand All @@ -32,7 +32,7 @@ func TestRedisAlertStore(t *testing.T) {
t.Run("reset consecutive failures", func(t *testing.T) {
t.Parallel()
redisClient := testutil.CreateTestRedisClient(t)
store := alert.NewRedisAlertStore(redisClient)
store := alert.NewRedisAlertStore(redisClient, "")

// Set up initial failures
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2")
Expand All @@ -49,3 +49,76 @@ func TestRedisAlertStore(t *testing.T) {
assert.Equal(t, 1, count)
})
}

func TestRedisAlertStore_WithDeploymentID(t *testing.T) {
t.Parallel()

redisClient := testutil.CreateTestRedisClient(t)
store := alert.NewRedisAlertStore(redisClient, "dp_test_001")

// Test increment with deployment ID
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
require.NoError(t, err)
assert.Equal(t, 1, count)

// Second increment
count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
require.NoError(t, err)
assert.Equal(t, 2, count)

// Test reset with deployment ID
err = store.ResetConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
require.NoError(t, err)

// Verify counter is reset
count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
require.NoError(t, err)
assert.Equal(t, 1, count)
}

func TestAlertStoreIsolation(t *testing.T) {
t.Parallel()

redisClient := testutil.CreateTestRedisClient(t)

// Create two stores with different deployment IDs
store1 := alert.NewRedisAlertStore(redisClient, "dp_001")
store2 := alert.NewRedisAlertStore(redisClient, "dp_002")

// Use same tenant/destination IDs for both
tenantID := "tenant_shared"
destinationID := "dest_shared"

// Increment in store1
count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 1, count1)

count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 2, count1)

// Increment in store2 - should start at 1 (isolated from store1)
count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 1, count2, "Store 2 should have its own counter")

// Increment store1 again - should continue from 2
count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 3, count1, "Store 1 counter should be unaffected by store 2")

// Reset store1 - should not affect store2
err = store1.ResetConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)

// Verify store1 is reset
count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 1, count1, "Store 1 should be reset")

// Verify store2 is unaffected
count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
require.NoError(t, err)
assert.Equal(t, 2, count2, "Store 2 should be unaffected by store 1 reset")
}
Loading
Loading