From b46f4b6956f7d8caf60776db28be9d69eab07b30 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 20:09:58 +0700 Subject: [PATCH 01/20] chore: add deployment_id config for multi-tenancy support --- internal/config/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/config/config.go b/internal/config/config.go index 23168bee..1d5a771f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -52,6 +52,7 @@ type Config struct { GinMode string `yaml:"gin_mode" env:"GIN_MODE" desc:"Sets the Gin framework mode (e.g., 'debug', 'release', 'test'). See Gin documentation for details." required:"N"` // Application + DeploymentID string `yaml:"deployment_id" env:"DEPLOYMENT_ID" desc:"Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation." required:"N"` AESEncryptionSecret string `yaml:"aes_encryption_secret" env:"AES_ENCRYPTION_SECRET" desc:"A 16, 24, or 32 byte secret key used for AES encryption of sensitive data at rest." required:"Y"` Topics []string `yaml:"topics" env:"TOPICS" envSeparator:"," desc:"Comma-separated list of topics that this Outpost instance should subscribe to for event processing." required:"N"` OrganizationName string `yaml:"organization_name" env:"ORGANIZATION_NAME" desc:"Name of the organization, used for display purposes and potentially in user agent strings." required:"N"` From b80a60a7a77aabace5e318bdf85b402825136314 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 23:27:10 +0700 Subject: [PATCH 02/20] test: refactor entity store test suite --- internal/models/entity_test.go | 776 +--------------------------- internal/models/entitysuite_test.go | 712 +++++++++++++++++++++++++ 2 files changed, 716 insertions(+), 772 deletions(-) create mode 100644 internal/models/entitysuite_test.go diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index f95d6654..431a3795 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -1,781 +1,13 @@ package models_test import ( - "context" - "encoding/json" - "fmt" "testing" - "time" - "github.com/google/go-cmp/cmp" - "github.com/google/uuid" - "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/redis" - "github.com/hookdeck/outpost/internal/util/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -func TestEntityStore_TenantCRUD(t *testing.T) { +// Run full entity store test suite +func TestEntityStore(t *testing.T) { t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - - input := models.Tenant{ - ID: uuid.New().String(), - CreatedAt: time.Now(), - } - - t.Run("gets empty", func(t *testing.T) { - actual, err := entityStore.RetrieveTenant(context.Background(), input.ID) - assert.Nil(t, actual) - assert.NoError(t, err) - }) - - t.Run("sets", func(t *testing.T) { - err := entityStore.UpsertTenant(context.Background(), input) - require.NoError(t, err) - - retrieved, err := entityStore.RetrieveTenant(context.Background(), input.ID) - require.NoError(t, err) - assert.Equal(t, input.ID, retrieved.ID) - assert.True(t, input.CreatedAt.Equal(retrieved.CreatedAt)) - }) - - t.Run("gets", func(t *testing.T) { - actual, err := entityStore.RetrieveTenant(context.Background(), input.ID) - require.NoError(t, err) - assert.Equal(t, input.ID, actual.ID) - assert.True(t, input.CreatedAt.Equal(actual.CreatedAt)) - }) - - t.Run("overrides", func(t *testing.T) { - input.CreatedAt = time.Now() - - err := entityStore.UpsertTenant(context.Background(), input) - require.NoError(t, err) - - actual, err := entityStore.RetrieveTenant(context.Background(), input.ID) - require.NoError(t, err) - assert.Equal(t, input.ID, actual.ID) - assert.True(t, input.CreatedAt.Equal(actual.CreatedAt)) - }) - - t.Run("clears", func(t *testing.T) { - require.NoError(t, entityStore.DeleteTenant(context.Background(), input.ID)) - - actual, err := entityStore.RetrieveTenant(context.Background(), input.ID) - assert.ErrorIs(t, err, models.ErrTenantDeleted) - assert.Nil(t, actual) - }) - - t.Run("deletes again", func(t *testing.T) { - assert.NoError(t, entityStore.DeleteTenant(context.Background(), input.ID)) - }) - - t.Run("deletes non-existent", func(t *testing.T) { - assert.ErrorIs(t, entityStore.DeleteTenant(context.Background(), "non-existent-tenant"), models.ErrTenantNotFound) - }) - - t.Run("creates & overrides deleted resource", func(t *testing.T) { - require.NoError(t, entityStore.UpsertTenant(context.Background(), input)) - - actual, err := entityStore.RetrieveTenant(context.Background(), input.ID) - require.NoError(t, err) - assert.Equal(t, input.ID, actual.ID) - assert.True(t, input.CreatedAt.Equal(actual.CreatedAt)) - }) -} - -func TestEntityStore_DestinationCRUD(t *testing.T) { - t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - - input := models.Destination{ - ID: uuid.New().String(), - Type: "rabbitmq", - Topics: []string{"user.created", "user.updated"}, - Config: map[string]string{ - "server_url": "localhost:5672", - "exchange": "events", - }, - Credentials: map[string]string{ - "username": "guest", - "password": "guest", - }, - CreatedAt: time.Now(), - DisabledAt: nil, - TenantID: uuid.New().String(), - } - - t.Run("gets empty", func(t *testing.T) { - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - assert.Nil(t, actual) - }) - - t.Run("sets", func(t *testing.T) { - err := entityStore.CreateDestination(context.Background(), input) - require.NoError(t, err) - }) - - t.Run("gets", func(t *testing.T) { - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - assertEqualDestination(t, input, *actual) - }) - - t.Run("updates", func(t *testing.T) { - input.Topics = []string{"*"} - - err := entityStore.UpsertDestination(context.Background(), input) - require.NoError(t, err) - - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - assertEqualDestination(t, input, *actual) - }) - - t.Run("clears", func(t *testing.T) { - err := entityStore.DeleteDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - assert.ErrorIs(t, err, models.ErrDestinationDeleted) - assert.Nil(t, actual) - }) - - t.Run("creates & overrides deleted resource", func(t *testing.T) { - err := entityStore.CreateDestination(context.Background(), input) - require.NoError(t, err) - - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - assertEqualDestination(t, input, *actual) - }) - - t.Run("err when creates duplicate", func(t *testing.T) { - assert.ErrorIs(t, entityStore.CreateDestination(context.Background(), input), models.ErrDuplicateDestination) - - // cleanup - require.NoError(t, entityStore.DeleteDestination(context.Background(), input.TenantID, input.ID)) - }) -} - -func TestEntityStore_ListDestinationEmpty(t *testing.T) { - t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - - destinations, err := entityStore.ListDestinationByTenant(context.Background(), uuid.New().String()) - require.NoError(t, err) - assert.Empty(t, destinations) -} - -func TestEntityStore_DeleteTenantAndAssociatedDestinations(t *testing.T) { - t.Parallel() - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - tenant := models.Tenant{ - ID: uuid.New().String(), - CreatedAt: time.Now(), - } - // Arrange - require.NoError(t, entityStore.UpsertTenant(context.Background(), tenant)) - destinationIDs := []string{uuid.New().String(), uuid.New().String(), uuid.New().String()} - for _, id := range destinationIDs { - require.NoError(t, entityStore.UpsertDestination(context.Background(), testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithID(id), - testutil.DestinationFactory.WithTenantID(tenant.ID), - ))) - } - // Act - require.NoError(t, entityStore.DeleteTenant(context.Background(), tenant.ID)) - // Assert - _, err := entityStore.RetrieveTenant(context.Background(), tenant.ID) - assert.ErrorIs(t, err, models.ErrTenantDeleted) - for _, id := range destinationIDs { - _, err := entityStore.RetrieveDestination(context.Background(), tenant.ID, id) - assert.ErrorIs(t, err, models.ErrDestinationDeleted) - } -} - -func TestEntityStore_DestinationCredentialsEncryption(t *testing.T) { - t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - cipher := models.NewAESCipher("secret") - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(cipher), - models.WithAvailableTopics(testutil.TestTopics), - ) - - testEntityStoreDestinationCredentialsEncryption(t, redisClient, cipher, entityStore) -} - -// testEntityStoreDestinationCredentialsEncryption tests that credentials are properly encrypted in storage. -// This test intentionally accesses Redis directly (implementation detail) because the public interface -// transparently encrypts/decrypts credentials, making it impossible to verify encryption through the -// public API alone. We need to ensure that sensitive credentials are never stored in plaintext in Redis, -// which requires examining the actual stored values. If the key format changes, this test will need -// to be updated accordingly. -func testEntityStoreDestinationCredentialsEncryption(t *testing.T, redisClient redis.Cmdable, cipher models.Cipher, entityStore models.EntityStore) { - input := models.Destination{ - ID: uuid.New().String(), - Type: "rabbitmq", - Topics: []string{"user.created", "user.updated"}, - Config: map[string]string{ - "server_url": "localhost:5672", - "exchange": "events", - }, - Credentials: map[string]string{ - "username": "guest", - "password": "guest", - }, - CreatedAt: time.Now(), - DisabledAt: nil, - TenantID: uuid.New().String(), - } - - err := entityStore.UpsertDestination(context.Background(), input) - require.NoError(t, err) - - actual, err := redisClient.HGetAll(context.Background(), fmt.Sprintf("tenant:{%s}:destination:%s", input.TenantID, input.ID)).Result() - require.NoError(t, err) - assert.NotEqual(t, input.Credentials, actual["credentials"]) - decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) - require.NoError(t, err) - jsonCredentials, _ := json.Marshal(input.Credentials) - assert.Equal(t, string(jsonCredentials), string(decryptedCredentials)) -} - -func assertEqualDestination(t *testing.T, expected, actual models.Destination) { - assert.Equal(t, expected.ID, actual.ID) - assert.Equal(t, expected.Type, actual.Type) - assert.Equal(t, expected.Topics, actual.Topics) - assert.Equal(t, expected.Config, actual.Config) - assert.Equal(t, expected.Credentials, actual.Credentials) - assert.True(t, cmp.Equal(expected.CreatedAt, actual.CreatedAt)) - assert.True(t, cmp.Equal(expected.DisabledAt, actual.DisabledAt)) -} - -type multiDestinationSuite struct { - ctx context.Context - redisClient redis.Client - entityStore models.EntityStore - tenant models.Tenant - destinations []models.Destination -} - -func (suite *multiDestinationSuite) SetupTest(t *testing.T) { - if suite.ctx == nil { - suite.ctx = context.Background() - } - suite.redisClient = testutil.CreateTestRedisClient(t) - suite.entityStore = models.NewEntityStore(suite.redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - suite.destinations = make([]models.Destination, 5) - suite.tenant = models.Tenant{ - ID: uuid.New().String(), - CreatedAt: time.Now(), - } - require.NoError(t, suite.entityStore.UpsertTenant(suite.ctx, suite.tenant)) - - ids := make([]string, 5) - destinationTopicList := [][]string{ - {"*"}, - {"user.created"}, - {"user.updated"}, - {"user.deleted"}, - {"user.created", "user.updated"}, - } - for i := 0; i < 5; i++ { - ids[i] = uuid.New().String() - suite.destinations[i] = testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithID(ids[i]), - testutil.DestinationFactory.WithTenantID(suite.tenant.ID), - testutil.DestinationFactory.WithTopics(destinationTopicList[i]), - ) - require.NoError(t, suite.entityStore.UpsertDestination(suite.ctx, suite.destinations[i])) - } - - // Insert & Delete destination to ensure it's cleaned up properly - toBeDeletedID := uuid.New().String() - require.NoError(t, suite.entityStore.UpsertDestination(suite.ctx, - testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithID(toBeDeletedID), - testutil.DestinationFactory.WithTenantID(suite.tenant.ID), - testutil.DestinationFactory.WithTopics([]string{"*"}), - ))) - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, toBeDeletedID)) -} - -func TestMultiDestinationSuite_RetrieveTenant_DestinationsCount(t *testing.T) { - t.Parallel() - suite := multiDestinationSuite{} - suite.SetupTest(t) - - tenant, err := suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, 5, tenant.DestinationsCount) -} - -func TestMultiDestinationSuite_RetrieveTenant_Topics(t *testing.T) { - t.Parallel() - suite := multiDestinationSuite{} - suite.SetupTest(t) - - tenant, err := suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) - - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[0].ID)) - tenant, err = suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) - - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[1].ID)) - tenant, err = suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) - - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[2].ID)) - tenant, err = suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) - - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[3].ID)) - tenant, err = suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{"user.created", "user.updated"}, tenant.Topics) - - require.NoError(t, suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[4].ID)) - tenant, err = suite.entityStore.RetrieveTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Equal(t, []string{}, tenant.Topics) -} - -func TestMultiDestinationSuite_ListDestinationByTenant(t *testing.T) { - t.Parallel() - suite := multiDestinationSuite{} - suite.SetupTest(t) - - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - require.Len(t, destinations, 5) - for index, destination := range destinations { - require.Equal(t, suite.destinations[index].ID, destination.ID) - } -} - -func TestMultiDestinationSuite_ListDestination_WithOpts(t *testing.T) { - t.Parallel() - - suite := multiDestinationSuite{} - suite.SetupTest(t) - - t.Run("filter by type: webhook", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Type: []string{"webhook"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 5) - }) - - t.Run("filter by type: rabbitmq", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Type: []string{"rabbitmq"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 0) - }) - - t.Run("filter by type: webhook,rabbitmq", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Type: []string{"webhook", "rabbitmq"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 5) - }) - - t.Run("filter by topic: user.created", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Topics: []string{"user.created"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 3) - }) - - t.Run("filter by topic: user.created,user.updated", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Topics: []string{"user.created", "user.updated"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 2) - }) - - t.Run("filter by type: rabbitmq, topic: user.created,user.updated", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Type: []string{"rabbitmq"}, - Topics: []string{"user.created", "user.updated"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 0) - }) - - t.Run("filter by topic: *", func(t *testing.T) { - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ - Topics: []string{"*"}, - })) - require.NoError(t, err) - require.Len(t, destinations, 1) - }) -} - -func TestMultiDestinationSuite_MatchEvent(t *testing.T) { - t.Parallel() - - suite := multiDestinationSuite{} - suite.SetupTest(t) - - t.Run("match by topic", func(t *testing.T) { - // Act - event := models.Event{ - ID: uuid.New().String(), - Topic: "user.created", - Time: time.Now(), - TenantID: suite.tenant.ID, - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - - // Assert - require.Len(t, matchedDestinationSummaryList, 3) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[0].ID, suite.destinations[1].ID, suite.destinations[4].ID}, summary.ID) - } - }) - - t.Run("match by topic & destination", func(t *testing.T) { - // Act - event := models.Event{ - ID: uuid.New().String(), - Topic: "user.created", - Time: time.Now(), - TenantID: suite.tenant.ID, - DestinationID: suite.destinations[1].ID, - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - - // Assert - require.Len(t, matchedDestinationSummaryList, 1) - require.Equal(t, suite.destinations[1].ID, matchedDestinationSummaryList[0].ID) - }) - - t.Run("destination not found", func(t *testing.T) { - // Act - event := models.Event{ - ID: uuid.New().String(), - Topic: "user.created", - Time: time.Now(), - TenantID: suite.tenant.ID, - DestinationID: "not-found", - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - - // Assert - require.Len(t, matchedDestinationSummaryList, 0) - }) - - t.Run("destination topic is invalid", func(t *testing.T) { - // Act - event := models.Event{ - ID: uuid.New().String(), - Topic: "user.created", - Time: time.Now(), - TenantID: suite.tenant.ID, - DestinationID: suite.destinations[3].ID, // "user-deleted" destination - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - - // Assert - require.Len(t, matchedDestinationSummaryList, 0) - }) - - t.Run("match after destination is updated", func(t *testing.T) { - updatedIndex := 2 - updatedTopics := []string{"user.created"} - updatedDestination := suite.destinations[updatedIndex] - updatedDestination.Topics = updatedTopics - require.NoError(t, suite.entityStore.UpsertDestination(suite.ctx, updatedDestination)) - - actual, err := suite.entityStore.RetrieveDestination(suite.ctx, updatedDestination.TenantID, updatedDestination.ID) - require.NoError(t, err) - assert.Equal(t, updatedDestination.Topics, actual.Topics) - - destinations, err := suite.entityStore.ListDestinationByTenant(suite.ctx, suite.tenant.ID) - require.NoError(t, err) - assert.Len(t, destinations, 5) - - // Match user.created - event := models.Event{ - ID: uuid.New().String(), - Topic: "user.created", - Time: time.Now(), - TenantID: suite.tenant.ID, - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 4) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[0].ID, suite.destinations[1].ID, suite.destinations[2].ID, suite.destinations[4].ID}, summary.ID) - } - - // Match user.updated - event = models.Event{ - ID: uuid.New().String(), - Topic: "user.updated", - Time: time.Now(), - TenantID: suite.tenant.ID, - Metadata: map[string]string{}, - Data: map[string]interface{}{}, - } - matchedDestinationSummaryList, err = suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 2) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[0].ID, suite.destinations[4].ID}, summary.ID) - } - }) -} - -func TestDestinationEnableDisable(t *testing.T) { - t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - - input := testutil.DestinationFactory.Any() - require.NoError(t, entityStore.UpsertDestination(context.Background(), input)) - - assertDestination := func(t *testing.T, expected models.Destination) { - actual, err := entityStore.RetrieveDestination(context.Background(), input.TenantID, input.ID) - require.NoError(t, err) - assert.Equal(t, expected.ID, actual.ID) - assert.True(t, cmp.Equal(expected.DisabledAt, actual.DisabledAt), "expected %v, got %v", expected.DisabledAt, actual.DisabledAt) - } - - t.Run("should disable", func(t *testing.T) { - now := time.Now() - input.DisabledAt = &now - require.NoError(t, entityStore.UpsertDestination(context.Background(), input)) - assertDestination(t, input) - }) - - t.Run("should enable", func(t *testing.T) { - input.DisabledAt = nil - require.NoError(t, entityStore.UpsertDestination(context.Background(), input)) - assertDestination(t, input) - }) -} - -func TestMultiSuite_DisableAndMatch(t *testing.T) { - t.Parallel() - - suite := multiDestinationSuite{} - suite.SetupTest(t) - - t.Run("initial match user.deleted", func(t *testing.T) { - event := testutil.EventFactory.Any( - testutil.EventFactory.WithTenantID(suite.tenant.ID), - testutil.EventFactory.WithTopic("user.deleted"), - ) - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 2) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[0].ID, suite.destinations[3].ID}, summary.ID) - } - }) - - t.Run("should not match disabled destination", func(t *testing.T) { - destination := suite.destinations[0] - now := time.Now() - destination.DisabledAt = &now - require.NoError(t, suite.entityStore.UpsertDestination(suite.ctx, destination)) - - event := testutil.EventFactory.Any( - testutil.EventFactory.WithTenantID(suite.tenant.ID), - testutil.EventFactory.WithTopic("user.deleted"), - ) - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 1) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[3].ID}, summary.ID) - } - }) - - t.Run("should match after re-enabled destination", func(t *testing.T) { - destination := suite.destinations[0] - destination.DisabledAt = nil - require.NoError(t, suite.entityStore.UpsertDestination(suite.ctx, destination)) - - event := testutil.EventFactory.Any( - testutil.EventFactory.WithTenantID(suite.tenant.ID), - testutil.EventFactory.WithTopic("user.deleted"), - ) - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 2) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[0].ID, suite.destinations[3].ID}, summary.ID) - } - }) -} - -func TestEntityStore_DeleteDestination(t *testing.T) { - t.Parallel() - - ctx := context.Background() - redisClient := testutil.CreateTestRedisClient(t) - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - ) - - destination := testutil.DestinationFactory.Any() - require.NoError(t, entityStore.CreateDestination(ctx, destination)) - - t.Run("should not return error when deleting existing destination", func(t *testing.T) { - assert.NoError(t, entityStore.DeleteDestination(ctx, destination.TenantID, destination.ID)) - }) - - t.Run("should not return error when deleting already-deleted destination", func(t *testing.T) { - assert.NoError(t, entityStore.DeleteDestination(ctx, destination.TenantID, destination.ID)) - }) - - t.Run("should return error when deleting non-existent destination", func(t *testing.T) { - err := entityStore.DeleteDestination(ctx, destination.TenantID, uuid.New().String()) - assert.ErrorIs(t, err, models.ErrDestinationNotFound) - }) - - t.Run("should return ErrDestinationDeleted when retrieving deleted destination", func(t *testing.T) { - dest, err := entityStore.RetrieveDestination(ctx, destination.TenantID, destination.ID) - assert.ErrorIs(t, err, models.ErrDestinationDeleted) - assert.Nil(t, dest) - }) - - t.Run("should not return deleted destination in list", func(t *testing.T) { - destinations, err := entityStore.ListDestinationByTenant(ctx, destination.TenantID) - assert.NoError(t, err) - assert.Empty(t, destinations) - }) -} - -func TestMultiSuite_DeleteAndMatch(t *testing.T) { - t.Parallel() - - suite := multiDestinationSuite{} - suite.SetupTest(t) - - t.Run("delete first destination", func(t *testing.T) { - require.NoError(t, - suite.entityStore.DeleteDestination(suite.ctx, suite.tenant.ID, suite.destinations[0].ID), - ) - }) - - t.Run("match event", func(t *testing.T) { - event := testutil.EventFactory.Any( - testutil.EventFactory.WithTenantID(suite.tenant.ID), - testutil.EventFactory.WithTopic("user.created"), - ) - - matchedDestinationSummaryList, err := suite.entityStore.MatchEvent(suite.ctx, event) - require.NoError(t, err) - require.Len(t, matchedDestinationSummaryList, 2) - for _, summary := range matchedDestinationSummaryList { - require.Contains(t, []string{suite.destinations[1].ID, suite.destinations[4].ID}, summary.ID) - } - }) -} - -func TestEntityStore_MaxDestinationsPerTenant(t *testing.T) { - t.Parallel() - - redisClient := testutil.CreateTestRedisClient(t) - maxDestinations := 2 - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - models.WithMaxDestinationsPerTenant(maxDestinations), - ) - - tenant := models.Tenant{ - ID: uuid.New().String(), - CreatedAt: time.Now(), - } - require.NoError(t, entityStore.UpsertTenant(context.Background(), tenant)) - - // Should be able to create up to maxDestinations - for i := 0; i < maxDestinations; i++ { - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err := entityStore.CreateDestination(context.Background(), destination) - require.NoError(t, err, "Should be able to create destination %d", i+1) - } - - // Should fail when trying to create one more - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err := entityStore.CreateDestination(context.Background(), destination) - require.Error(t, err) - require.ErrorIs(t, err, models.ErrMaxDestinationsPerTenantReached) - - // Should be able to create after deleting one - destinations, err := entityStore.ListDestinationByTenant(context.Background(), tenant.ID) - require.NoError(t, err) - require.NoError(t, entityStore.DeleteDestination(context.Background(), tenant.ID, destinations[0].ID)) - - destination = testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err = entityStore.CreateDestination(context.Background(), destination) - require.NoError(t, err, "Should be able to create destination after deleting one") + suite.Run(t, &EntityTestSuite{}) } diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go new file mode 100644 index 00000000..cfcc6ff2 --- /dev/null +++ b/internal/models/entitysuite_test.go @@ -0,0 +1,712 @@ +package models_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/redis" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// Helper function used by test suite +func assertEqualDestination(t *testing.T, expected, actual models.Destination) { + assert.Equal(t, expected.ID, actual.ID) + assert.Equal(t, expected.Type, actual.Type) + assert.Equal(t, expected.Topics, actual.Topics) + assert.Equal(t, expected.Config, actual.Config) + assert.Equal(t, expected.Credentials, actual.Credentials) + assert.True(t, cmp.Equal(expected.CreatedAt, actual.CreatedAt)) + assert.True(t, cmp.Equal(expected.DisabledAt, actual.DisabledAt)) +} + +// EntityTestSuite contains all entity store tests +type EntityTestSuite struct { + suite.Suite + ctx context.Context + redisClient redis.Cmdable + entityStore models.EntityStore +} + +func (s *EntityTestSuite) SetupTest() { + s.ctx = context.Background() + s.redisClient = testutil.CreateTestRedisClient(s.T()) + + opts := []models.EntityStoreOption{ + models.WithCipher(models.NewAESCipher("secret")), + models.WithAvailableTopics(testutil.TestTopics), + } + s.entityStore = models.NewEntityStore(s.redisClient, opts...) +} + +func (s *EntityTestSuite) TestTenantCRUD() { + t := s.T() + input := models.Tenant{ + ID: uuid.New().String(), + CreatedAt: time.Now(), + } + + t.Run("gets empty", func(t *testing.T) { + actual, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + assert.Nil(s.T(), actual) + assert.NoError(s.T(), err) + }) + + t.Run("sets", func(t *testing.T) { + err := s.entityStore.UpsertTenant(s.ctx, input) + require.NoError(s.T(), err) + + retrieved, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), input.ID, retrieved.ID) + assert.True(s.T(), input.CreatedAt.Equal(retrieved.CreatedAt)) + }) + + t.Run("gets", func(t *testing.T) { + actual, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), input.ID, actual.ID) + assert.True(s.T(), input.CreatedAt.Equal(actual.CreatedAt)) + }) + + t.Run("overrides", func(t *testing.T) { + input.CreatedAt = time.Now() + + err := s.entityStore.UpsertTenant(s.ctx, input) + require.NoError(s.T(), err) + + actual, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), input.ID, actual.ID) + assert.True(s.T(), input.CreatedAt.Equal(actual.CreatedAt)) + }) + + t.Run("clears", func(t *testing.T) { + require.NoError(s.T(), s.entityStore.DeleteTenant(s.ctx, input.ID)) + + actual, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + assert.ErrorIs(s.T(), err, models.ErrTenantDeleted) + assert.Nil(s.T(), actual) + }) + + t.Run("deletes again", func(t *testing.T) { + assert.NoError(s.T(), s.entityStore.DeleteTenant(s.ctx, input.ID)) + }) + + t.Run("deletes non-existent", func(t *testing.T) { + assert.ErrorIs(s.T(), s.entityStore.DeleteTenant(s.ctx, "non-existent-tenant"), models.ErrTenantNotFound) + }) + + t.Run("creates & overrides deleted resource", func(t *testing.T) { + require.NoError(s.T(), s.entityStore.UpsertTenant(s.ctx, input)) + + actual, err := s.entityStore.RetrieveTenant(s.ctx, input.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), input.ID, actual.ID) + assert.True(s.T(), input.CreatedAt.Equal(actual.CreatedAt)) + }) +} + +func (s *EntityTestSuite) TestDestinationCRUD() { + t := s.T() + input := models.Destination{ + ID: uuid.New().String(), + Type: "rabbitmq", + Topics: []string{"user.created", "user.updated"}, + Config: map[string]string{ + "server_url": "localhost:5672", + "exchange": "events", + }, + Credentials: map[string]string{ + "username": "guest", + "password": "guest", + }, + CreatedAt: time.Now(), + DisabledAt: nil, + TenantID: uuid.New().String(), + } + + t.Run("gets empty", func(t *testing.T) { + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + assert.Nil(s.T(), actual) + }) + + t.Run("sets", func(t *testing.T) { + err := s.entityStore.CreateDestination(s.ctx, input) + require.NoError(s.T(), err) + }) + + t.Run("gets", func(t *testing.T) { + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + assertEqualDestination(t, input, *actual) + }) + + t.Run("updates", func(t *testing.T) { + input.Topics = []string{"*"} + + err := s.entityStore.UpsertDestination(s.ctx, input) + require.NoError(s.T(), err) + + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + assertEqualDestination(t, input, *actual) + }) + + t.Run("clears", func(t *testing.T) { + err := s.entityStore.DeleteDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + assert.ErrorIs(s.T(), err, models.ErrDestinationDeleted) + assert.Nil(s.T(), actual) + }) + + t.Run("creates & overrides deleted resource", func(t *testing.T) { + err := s.entityStore.CreateDestination(s.ctx, input) + require.NoError(s.T(), err) + + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + assertEqualDestination(t, input, *actual) + }) + + t.Run("err when creates duplicate", func(t *testing.T) { + assert.ErrorIs(s.T(), s.entityStore.CreateDestination(s.ctx, input), models.ErrDuplicateDestination) + + // cleanup + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, input.TenantID, input.ID)) + }) +} + +func (s *EntityTestSuite) TestListDestinationEmpty() { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, uuid.New().String()) + require.NoError(s.T(), err) + assert.Empty(s.T(), destinations) +} + +func (s *EntityTestSuite) TestDeleteTenantAndAssociatedDestinations() { + tenant := models.Tenant{ + ID: uuid.New().String(), + CreatedAt: time.Now(), + } + // Arrange + require.NoError(s.T(), s.entityStore.UpsertTenant(s.ctx, tenant)) + destinationIDs := []string{uuid.New().String(), uuid.New().String(), uuid.New().String()} + for _, id := range destinationIDs { + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID(id), + testutil.DestinationFactory.WithTenantID(tenant.ID), + ))) + } + // Act + require.NoError(s.T(), s.entityStore.DeleteTenant(s.ctx, tenant.ID)) + // Assert + _, err := s.entityStore.RetrieveTenant(s.ctx, tenant.ID) + assert.ErrorIs(s.T(), err, models.ErrTenantDeleted) + for _, id := range destinationIDs { + _, err := s.entityStore.RetrieveDestination(s.ctx, tenant.ID, id) + assert.ErrorIs(s.T(), err, models.ErrDestinationDeleted) + } +} + +func (s *EntityTestSuite) TestDestinationCredentialsEncryption() { + cipher := models.NewAESCipher("secret") + + input := models.Destination{ + ID: uuid.New().String(), + Type: "rabbitmq", + Topics: []string{"user.created", "user.updated"}, + Config: map[string]string{ + "server_url": "localhost:5672", + "exchange": "events", + }, + Credentials: map[string]string{ + "username": "guest", + "password": "guest", + }, + CreatedAt: time.Now(), + DisabledAt: nil, + TenantID: uuid.New().String(), + } + + err := s.entityStore.UpsertDestination(s.ctx, input) + require.NoError(s.T(), err) + + actual, err := s.redisClient.HGetAll(s.ctx, fmt.Sprintf("tenant:{%s}:destination:%s", input.TenantID, input.ID)).Result() + require.NoError(s.T(), err) + assert.NotEqual(s.T(), input.Credentials, actual["credentials"]) + decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) + require.NoError(s.T(), err) + jsonCredentials, _ := json.Marshal(input.Credentials) + assert.Equal(s.T(), string(jsonCredentials), string(decryptedCredentials)) +} + +// Helper struct for multi-destination tests +type multiDestinationData struct { + tenant models.Tenant + destinations []models.Destination +} + +func (s *EntityTestSuite) setupMultiDestination() multiDestinationData { + data := multiDestinationData{ + tenant: models.Tenant{ + ID: uuid.New().String(), + CreatedAt: time.Now(), + }, + destinations: make([]models.Destination, 5), + } + require.NoError(s.T(), s.entityStore.UpsertTenant(s.ctx, data.tenant)) + + destinationTopicList := [][]string{ + {"*"}, + {"user.created"}, + {"user.updated"}, + {"user.deleted"}, + {"user.created", "user.updated"}, + } + for i := 0; i < 5; i++ { + id := uuid.New().String() + data.destinations[i] = testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID(id), + testutil.DestinationFactory.WithTenantID(data.tenant.ID), + testutil.DestinationFactory.WithTopics(destinationTopicList[i]), + ) + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, data.destinations[i])) + } + + // Insert & Delete destination to ensure it's cleaned up properly + toBeDeletedID := uuid.New().String() + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, + testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID(toBeDeletedID), + testutil.DestinationFactory.WithTenantID(data.tenant.ID), + testutil.DestinationFactory.WithTopics([]string{"*"}), + ))) + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, toBeDeletedID)) + + return data +} + +func (s *EntityTestSuite) TestMultiDestinationRetrieveTenantDestinationsCount() { + data := s.setupMultiDestination() + + tenant, err := s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), 5, tenant.DestinationsCount) +} + +func (s *EntityTestSuite) TestMultiDestinationRetrieveTenantTopics() { + data := s.setupMultiDestination() + + tenant, err := s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) + + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[0].ID)) + tenant, err = s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) + + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[1].ID)) + tenant, err = s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) + + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[2].ID)) + tenant, err = s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{"user.created", "user.deleted", "user.updated"}, tenant.Topics) + + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[3].ID)) + tenant, err = s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{"user.created", "user.updated"}, tenant.Topics) + + require.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[4].ID)) + tenant, err = s.entityStore.RetrieveTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), []string{}, tenant.Topics) +} + +func (s *EntityTestSuite) TestMultiDestinationListDestinationByTenant() { + data := s.setupMultiDestination() + + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 5) + for index, destination := range destinations { + require.Equal(s.T(), data.destinations[index].ID, destination.ID) + } +} + +func (s *EntityTestSuite) TestMultiDestinationListDestinationWithOpts() { + t := s.T() + data := s.setupMultiDestination() + + t.Run("filter by type: webhook", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Type: []string{"webhook"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 5) + }) + + t.Run("filter by type: rabbitmq", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Type: []string{"rabbitmq"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 0) + }) + + t.Run("filter by type: webhook,rabbitmq", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Type: []string{"webhook", "rabbitmq"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 5) + }) + + t.Run("filter by topic: user.created", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Topics: []string{"user.created"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 3) + }) + + t.Run("filter by topic: user.created,user.updated", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Topics: []string{"user.created", "user.updated"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 2) + }) + + t.Run("filter by type: rabbitmq, topic: user.created,user.updated", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Type: []string{"rabbitmq"}, + Topics: []string{"user.created", "user.updated"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 0) + }) + + t.Run("filter by topic: *", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID, models.WithDestinationFilter(models.DestinationFilter{ + Topics: []string{"*"}, + })) + require.NoError(s.T(), err) + require.Len(s.T(), destinations, 1) + }) +} + +func (s *EntityTestSuite) TestMultiDestinationMatchEvent() { + t := s.T() + data := s.setupMultiDestination() + + t.Run("match by topic", func(t *testing.T) { + event := models.Event{ + ID: uuid.New().String(), + Topic: "user.created", + Time: time.Now(), + TenantID: data.tenant.ID, + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + + require.Len(s.T(), matchedDestinationSummaryList, 3) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[0].ID, data.destinations[1].ID, data.destinations[4].ID}, summary.ID) + } + }) + + t.Run("match by topic & destination", func(t *testing.T) { + event := models.Event{ + ID: uuid.New().String(), + Topic: "user.created", + Time: time.Now(), + TenantID: data.tenant.ID, + DestinationID: data.destinations[1].ID, + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + + require.Len(s.T(), matchedDestinationSummaryList, 1) + require.Equal(s.T(), data.destinations[1].ID, matchedDestinationSummaryList[0].ID) + }) + + t.Run("destination not found", func(t *testing.T) { + event := models.Event{ + ID: uuid.New().String(), + Topic: "user.created", + Time: time.Now(), + TenantID: data.tenant.ID, + DestinationID: "not-found", + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + + require.Len(s.T(), matchedDestinationSummaryList, 0) + }) + + t.Run("destination topic is invalid", func(t *testing.T) { + event := models.Event{ + ID: uuid.New().String(), + Topic: "user.created", + Time: time.Now(), + TenantID: data.tenant.ID, + DestinationID: data.destinations[3].ID, // "user-deleted" destination + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + + require.Len(s.T(), matchedDestinationSummaryList, 0) + }) + + t.Run("match after destination is updated", func(t *testing.T) { + updatedIndex := 2 + updatedTopics := []string{"user.created"} + updatedDestination := data.destinations[updatedIndex] + updatedDestination.Topics = updatedTopics + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, updatedDestination)) + + actual, err := s.entityStore.RetrieveDestination(s.ctx, updatedDestination.TenantID, updatedDestination.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), updatedDestination.Topics, actual.Topics) + + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, data.tenant.ID) + require.NoError(s.T(), err) + assert.Len(s.T(), destinations, 5) + + // Match user.created + event := models.Event{ + ID: uuid.New().String(), + Topic: "user.created", + Time: time.Now(), + TenantID: data.tenant.ID, + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 4) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[0].ID, data.destinations[1].ID, data.destinations[2].ID, data.destinations[4].ID}, summary.ID) + } + + // Match user.updated + event = models.Event{ + ID: uuid.New().String(), + Topic: "user.updated", + Time: time.Now(), + TenantID: data.tenant.ID, + Metadata: map[string]string{}, + Data: map[string]interface{}{}, + } + matchedDestinationSummaryList, err = s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 2) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[0].ID, data.destinations[4].ID}, summary.ID) + } + }) +} + +func (s *EntityTestSuite) TestDestinationEnableDisable() { + t := s.T() + input := testutil.DestinationFactory.Any() + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, input)) + + assertDestination := func(t *testing.T, expected models.Destination) { + actual, err := s.entityStore.RetrieveDestination(s.ctx, input.TenantID, input.ID) + require.NoError(s.T(), err) + assert.Equal(s.T(), expected.ID, actual.ID) + assert.True(s.T(), cmp.Equal(expected.DisabledAt, actual.DisabledAt), "expected %v, got %v", expected.DisabledAt, actual.DisabledAt) + } + + t.Run("should disable", func(t *testing.T) { + now := time.Now() + input.DisabledAt = &now + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, input)) + assertDestination(t, input) + }) + + t.Run("should enable", func(t *testing.T) { + input.DisabledAt = nil + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, input)) + assertDestination(t, input) + }) +} + +func (s *EntityTestSuite) TestMultiSuiteDisableAndMatch() { + t := s.T() + data := s.setupMultiDestination() + + t.Run("initial match user.deleted", func(t *testing.T) { + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(data.tenant.ID), + testutil.EventFactory.WithTopic("user.deleted"), + ) + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 2) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[0].ID, data.destinations[3].ID}, summary.ID) + } + }) + + t.Run("should not match disabled destination", func(t *testing.T) { + destination := data.destinations[0] + now := time.Now() + destination.DisabledAt = &now + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, destination)) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(data.tenant.ID), + testutil.EventFactory.WithTopic("user.deleted"), + ) + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 1) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[3].ID}, summary.ID) + } + }) + + t.Run("should match after re-enabled destination", func(t *testing.T) { + destination := data.destinations[0] + destination.DisabledAt = nil + require.NoError(s.T(), s.entityStore.UpsertDestination(s.ctx, destination)) + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(data.tenant.ID), + testutil.EventFactory.WithTopic("user.deleted"), + ) + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 2) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[0].ID, data.destinations[3].ID}, summary.ID) + } + }) +} + +func (s *EntityTestSuite) TestDeleteDestination() { + t := s.T() + destination := testutil.DestinationFactory.Any() + require.NoError(s.T(), s.entityStore.CreateDestination(s.ctx, destination)) + + t.Run("should not return error when deleting existing destination", func(t *testing.T) { + assert.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, destination.TenantID, destination.ID)) + }) + + t.Run("should not return error when deleting already-deleted destination", func(t *testing.T) { + assert.NoError(s.T(), s.entityStore.DeleteDestination(s.ctx, destination.TenantID, destination.ID)) + }) + + t.Run("should return error when deleting non-existent destination", func(t *testing.T) { + err := s.entityStore.DeleteDestination(s.ctx, destination.TenantID, uuid.New().String()) + assert.ErrorIs(s.T(), err, models.ErrDestinationNotFound) + }) + + t.Run("should return ErrDestinationDeleted when retrieving deleted destination", func(t *testing.T) { + dest, err := s.entityStore.RetrieveDestination(s.ctx, destination.TenantID, destination.ID) + assert.ErrorIs(s.T(), err, models.ErrDestinationDeleted) + assert.Nil(s.T(), dest) + }) + + t.Run("should not return deleted destination in list", func(t *testing.T) { + destinations, err := s.entityStore.ListDestinationByTenant(s.ctx, destination.TenantID) + assert.NoError(s.T(), err) + assert.Empty(s.T(), destinations) + }) +} + +func (s *EntityTestSuite) TestMultiSuiteDeleteAndMatch() { + t := s.T() + data := s.setupMultiDestination() + + t.Run("delete first destination", func(t *testing.T) { + require.NoError(s.T(), + s.entityStore.DeleteDestination(s.ctx, data.tenant.ID, data.destinations[0].ID), + ) + }) + + t.Run("match event", func(t *testing.T) { + event := testutil.EventFactory.Any( + testutil.EventFactory.WithTenantID(data.tenant.ID), + testutil.EventFactory.WithTopic("user.created"), + ) + + matchedDestinationSummaryList, err := s.entityStore.MatchEvent(s.ctx, event) + require.NoError(s.T(), err) + require.Len(s.T(), matchedDestinationSummaryList, 2) + for _, summary := range matchedDestinationSummaryList { + require.Contains(s.T(), []string{data.destinations[1].ID, data.destinations[4].ID}, summary.ID) + } + }) +} + +func (s *EntityTestSuite) TestMaxDestinationsPerTenant() { + // Create a new entity store with limited max destinations + maxDestinations := 2 + opts := []models.EntityStoreOption{ + models.WithCipher(models.NewAESCipher("secret")), + models.WithAvailableTopics(testutil.TestTopics), + models.WithMaxDestinationsPerTenant(maxDestinations), + } + limitedStore := models.NewEntityStore(s.redisClient, opts...) + + tenant := models.Tenant{ + ID: uuid.New().String(), + CreatedAt: time.Now(), + } + require.NoError(s.T(), limitedStore.UpsertTenant(s.ctx, tenant)) + + // Should be able to create up to maxDestinations + for i := 0; i < maxDestinations; i++ { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err := limitedStore.CreateDestination(s.ctx, destination) + require.NoError(s.T(), err, "Should be able to create destination %d", i+1) + } + + // Should fail when trying to create one more + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err := limitedStore.CreateDestination(s.ctx, destination) + require.Error(s.T(), err) + require.ErrorIs(s.T(), err, models.ErrMaxDestinationsPerTenantReached) + + // Should be able to create after deleting one + destinations, err := limitedStore.ListDestinationByTenant(s.ctx, tenant.ID) + require.NoError(s.T(), err) + require.NoError(s.T(), limitedStore.DeleteDestination(s.ctx, tenant.ID, destinations[0].ID)) + + destination = testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err = limitedStore.CreateDestination(s.ctx, destination) + require.NoError(s.T(), err, "Should be able to create destination after deleting one") +} From aa1724b283efaa96efb80b8a656a04142a849d95 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 23:30:08 +0700 Subject: [PATCH 03/20] fix: catch unhandled err --- internal/models/entity.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/models/entity.go b/internal/models/entity.go index daf25c7f..56450761 100644 --- a/internal/models/entity.go +++ b/internal/models/entity.go @@ -212,6 +212,9 @@ func (s *entityStoreImpl) ListDestinationByTenant(ctx context.Context, tenantID } destinationSummaryList, err := s.listDestinationSummaryByTenant(ctx, tenantID, opts) + if err != nil { + return nil, err + } pipe := s.redisClient.Pipeline() cmds := make([]*redis.MapStringStringCmd, len(destinationSummaryList)) From ad06c9289ae1a21a74c057dedc2e72b2151add64 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 23:41:09 +0700 Subject: [PATCH 04/20] feat: entity store support deployment id --- internal/models/entity.go | 89 +++++++++++++++++------------ internal/models/entity_test.go | 10 +++- internal/models/entitysuite_test.go | 21 +++++-- 3 files changed, 76 insertions(+), 44 deletions(-) diff --git a/internal/models/entity.go b/internal/models/entity.go index 56450761..e5210924 100644 --- a/internal/models/entity.go +++ b/internal/models/entity.go @@ -34,24 +34,33 @@ var ( ErrMaxDestinationsPerTenantReached = errors.New("maximum number of destinations per tenant reached") ) -// New cluster-compatible key formats with hash tags -func redisTenantID(tenantID string) string { - return fmt.Sprintf("tenant:{%s}:tenant", tenantID) +type entityStoreImpl struct { + redisClient redis.Cmdable + cipher Cipher + availableTopics []string + maxDestinationsPerTenant int + deploymentID string } -func redisTenantDestinationSummaryKey(tenantID string) string { - return fmt.Sprintf("tenant:{%s}:destinations", tenantID) +// deploymentPrefix returns the deployment prefix for Redis keys +func (s *entityStoreImpl) deploymentPrefix() string { + if s.deploymentID == "" { + return "" + } + return fmt.Sprintf("deployment:%s:", s.deploymentID) } -func redisDestinationID(destinationID, tenantID string) string { - return fmt.Sprintf("tenant:{%s}:destination:%s", tenantID, destinationID) +// New cluster-compatible key formats with hash tags +func (s *entityStoreImpl) redisTenantID(tenantID string) string { + return fmt.Sprintf("%stenant:{%s}:tenant", s.deploymentPrefix(), tenantID) } -type entityStoreImpl struct { - redisClient redis.Cmdable - cipher Cipher - availableTopics []string - maxDestinationsPerTenant int +func (s *entityStoreImpl) redisTenantDestinationSummaryKey(tenantID string) string { + return fmt.Sprintf("%stenant:{%s}:destinations", s.deploymentPrefix(), tenantID) +} + +func (s *entityStoreImpl) redisDestinationID(destinationID, tenantID string) string { + return fmt.Sprintf("%stenant:{%s}:destination:%s", s.deploymentPrefix(), tenantID, destinationID) } var _ EntityStore = (*entityStoreImpl)(nil) @@ -76,6 +85,12 @@ func WithMaxDestinationsPerTenant(maxDestinationsPerTenant int) EntityStoreOptio } } +func WithDeploymentID(deploymentID string) EntityStoreOption { + return func(s *entityStoreImpl) { + s.deploymentID = deploymentID + } +} + func NewEntityStore(redisClient redis.Cmdable, opts ...EntityStoreOption) EntityStore { store := &entityStoreImpl{ redisClient: redisClient, @@ -93,8 +108,8 @@ func NewEntityStore(redisClient redis.Cmdable, opts ...EntityStoreOption) Entity func (s *entityStoreImpl) RetrieveTenant(ctx context.Context, tenantID string) (*Tenant, error) { pipe := s.redisClient.Pipeline() - tenantCmd := pipe.HGetAll(ctx, redisTenantID(tenantID)) - destinationListCmd := pipe.HGetAll(ctx, redisTenantDestinationSummaryKey(tenantID)) + tenantCmd := pipe.HGetAll(ctx, s.redisTenantID(tenantID)) + destinationListCmd := pipe.HGetAll(ctx, s.redisTenantDestinationSummaryKey(tenantID)) if _, err := pipe.Exec(ctx); err != nil { return nil, err @@ -123,7 +138,7 @@ func (s *entityStoreImpl) RetrieveTenant(ctx context.Context, tenantID string) ( } func (s *entityStoreImpl) UpsertTenant(ctx context.Context, tenant Tenant) error { - key := redisTenantID(tenant.ID) + key := s.redisTenantID(tenant.ID) // For cluster compatibility, execute commands individually instead of in a transaction // Support overriding deleted resources @@ -140,14 +155,14 @@ func (s *entityStoreImpl) UpsertTenant(ctx context.Context, tenant Tenant) error } func (s *entityStoreImpl) DeleteTenant(ctx context.Context, tenantID string) error { - if exists, err := s.redisClient.Exists(ctx, redisTenantID(tenantID)).Result(); err != nil { + if exists, err := s.redisClient.Exists(ctx, s.redisTenantID(tenantID)).Result(); err != nil { return err } else if exists == 0 { return ErrTenantNotFound } // Get destination IDs before transaction - destinationIDs, err := s.redisClient.HKeys(ctx, redisTenantDestinationSummaryKey(tenantID)).Result() + destinationIDs, err := s.redisClient.HKeys(ctx, s.redisTenantDestinationSummaryKey(tenantID)).Result() if err != nil { return err } @@ -158,15 +173,15 @@ func (s *entityStoreImpl) DeleteTenant(ctx context.Context, tenantID string) err // Delete all destinations atomically for _, destinationID := range destinationIDs { - destKey := redisDestinationID(destinationID, tenantID) + destKey := s.redisDestinationID(destinationID, tenantID) pipe.HSet(ctx, destKey, "deleted_at", now) pipe.Expire(ctx, destKey, 7*24*time.Hour) } // Delete summary and mark tenant as deleted - pipe.Del(ctx, redisTenantDestinationSummaryKey(tenantID)) - pipe.HSet(ctx, redisTenantID(tenantID), "deleted_at", now) - pipe.Expire(ctx, redisTenantID(tenantID), 7*24*time.Hour) + pipe.Del(ctx, s.redisTenantDestinationSummaryKey(tenantID)) + pipe.HSet(ctx, s.redisTenantID(tenantID), "deleted_at", now) + pipe.Expire(ctx, s.redisTenantID(tenantID), 7*24*time.Hour) return nil }) @@ -175,7 +190,7 @@ func (s *entityStoreImpl) DeleteTenant(ctx context.Context, tenantID string) err } func (s *entityStoreImpl) listDestinationSummaryByTenant(ctx context.Context, tenantID string, opts ListDestinationByTenantOpts) ([]DestinationSummary, error) { - return s.parseListDestinationSummaryByTenantCmd(s.redisClient.HGetAll(ctx, redisTenantDestinationSummaryKey(tenantID)), opts) + return s.parseListDestinationSummaryByTenantCmd(s.redisClient.HGetAll(ctx, s.redisTenantDestinationSummaryKey(tenantID)), opts) } func (s *entityStoreImpl) parseListDestinationSummaryByTenantCmd(cmd *redis.MapStringStringCmd, opts ListDestinationByTenantOpts) ([]DestinationSummary, error) { @@ -219,7 +234,7 @@ func (s *entityStoreImpl) ListDestinationByTenant(ctx context.Context, tenantID pipe := s.redisClient.Pipeline() cmds := make([]*redis.MapStringStringCmd, len(destinationSummaryList)) for i, destinationSummary := range destinationSummaryList { - cmds[i] = pipe.HGetAll(ctx, redisDestinationID(destinationSummary.ID, tenantID)) + cmds[i] = pipe.HGetAll(ctx, s.redisDestinationID(destinationSummary.ID, tenantID)) } _, err = pipe.Exec(ctx) if err != nil { @@ -244,7 +259,7 @@ func (s *entityStoreImpl) ListDestinationByTenant(ctx context.Context, tenantID } func (s *entityStoreImpl) RetrieveDestination(ctx context.Context, tenantID, destinationID string) (*Destination, error) { - cmd := s.redisClient.HGetAll(ctx, redisDestinationID(destinationID, tenantID)) + cmd := s.redisClient.HGetAll(ctx, s.redisDestinationID(destinationID, tenantID)) destination := &Destination{TenantID: tenantID} if err := destination.parseRedisHash(cmd, s.cipher); err != nil { if err == redis.Nil { @@ -255,10 +270,10 @@ func (s *entityStoreImpl) RetrieveDestination(ctx context.Context, tenantID, des return destination, nil } -func (m *entityStoreImpl) CreateDestination(ctx context.Context, destination Destination) error { - key := redisDestinationID(destination.ID, destination.TenantID) +func (s *entityStoreImpl) CreateDestination(ctx context.Context, destination Destination) error { + key := s.redisDestinationID(destination.ID, destination.TenantID) // Check if destination exists - if fields, err := m.redisClient.HGetAll(ctx, key).Result(); err != nil { + if fields, err := s.redisClient.HGetAll(ctx, key).Result(); err != nil { return err } else if len(fields) > 0 { if _, isDeleted := fields["deleted_at"]; !isDeleted { @@ -267,19 +282,19 @@ func (m *entityStoreImpl) CreateDestination(ctx context.Context, destination Des } // Check if tenant has reached max destinations by counting entries in the summary hash - count, err := m.redisClient.HLen(ctx, redisTenantDestinationSummaryKey(destination.TenantID)).Result() + count, err := s.redisClient.HLen(ctx, s.redisTenantDestinationSummaryKey(destination.TenantID)).Result() if err != nil { return err } - if count >= int64(m.maxDestinationsPerTenant) { + if count >= int64(s.maxDestinationsPerTenant) { return ErrMaxDestinationsPerTenantReached } - return m.UpsertDestination(ctx, destination) + return s.UpsertDestination(ctx, destination) } -func (m *entityStoreImpl) UpsertDestination(ctx context.Context, destination Destination) error { - key := redisDestinationID(destination.ID, destination.TenantID) +func (s *entityStoreImpl) UpsertDestination(ctx context.Context, destination Destination) error { + key := s.redisDestinationID(destination.ID, destination.TenantID) // Pre-marshal and encrypt credentials BEFORE starting Redis transaction // This isolates marshaling failures from Redis transaction failures @@ -287,15 +302,15 @@ func (m *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des if err != nil { return fmt.Errorf("invalid destination credentials: %w", err) } - encryptedCredentials, err := m.cipher.Encrypt(credentialsBytes) + encryptedCredentials, err := s.cipher.Encrypt(credentialsBytes) if err != nil { return fmt.Errorf("failed to encrypt destination credentials: %w", err) } // All keys use same tenant prefix - cluster compatible transaction - summaryKey := redisTenantDestinationSummaryKey(destination.TenantID) + summaryKey := s.redisTenantDestinationSummaryKey(destination.TenantID) - _, err = m.redisClient.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + _, err = s.redisClient.TxPipelined(ctx, func(pipe redis.Pipeliner) error { // Clear deletion markers pipe.Persist(ctx, key) pipe.HDel(ctx, key, "deleted_at") @@ -323,8 +338,8 @@ func (m *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des } func (s *entityStoreImpl) DeleteDestination(ctx context.Context, tenantID, destinationID string) error { - key := redisDestinationID(destinationID, tenantID) - summaryKey := redisTenantDestinationSummaryKey(tenantID) + key := s.redisDestinationID(destinationID, tenantID) + summaryKey := s.redisTenantDestinationSummaryKey(tenantID) // Check if destination exists if exists, err := s.redisClient.Exists(ctx, key).Result(); err != nil { diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index 431a3795..473b8db0 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -6,8 +6,12 @@ import ( "github.com/stretchr/testify/suite" ) -// Run full entity store test suite -func TestEntityStore(t *testing.T) { +func TestEntityStore_WithoutDeploymentID(t *testing.T) { t.Parallel() - suite.Run(t, &EntityTestSuite{}) + suite.Run(t, &EntityTestSuite{deploymentID: ""}) +} + +func TestEntityStore_WithDeploymentID(t *testing.T) { + t.Parallel() + suite.Run(t, &EntityTestSuite{deploymentID: "dp_test_001"}) } diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go index cfcc6ff2..2f9eb6cc 100644 --- a/internal/models/entitysuite_test.go +++ b/internal/models/entitysuite_test.go @@ -31,9 +31,10 @@ func assertEqualDestination(t *testing.T, expected, actual models.Destination) { // EntityTestSuite contains all entity store tests type EntityTestSuite struct { suite.Suite - ctx context.Context - redisClient redis.Cmdable - entityStore models.EntityStore + ctx context.Context + redisClient redis.Cmdable + entityStore models.EntityStore + deploymentID string } func (s *EntityTestSuite) SetupTest() { @@ -44,6 +45,9 @@ func (s *EntityTestSuite) SetupTest() { models.WithCipher(models.NewAESCipher("secret")), models.WithAvailableTopics(testutil.TestTopics), } + if s.deploymentID != "" { + opts = append(opts, models.WithDeploymentID(s.deploymentID)) + } s.entityStore = models.NewEntityStore(s.redisClient, opts...) } @@ -242,7 +246,13 @@ func (s *EntityTestSuite) TestDestinationCredentialsEncryption() { err := s.entityStore.UpsertDestination(s.ctx, input) require.NoError(s.T(), err) - actual, err := s.redisClient.HGetAll(s.ctx, fmt.Sprintf("tenant:{%s}:destination:%s", input.TenantID, input.ID)).Result() + // Build key format based on deploymentID + keyFormat := "tenant:{%s}:destination:%s" + if s.deploymentID != "" { + keyFormat = fmt.Sprintf("deployment:%s:tenant:{%%s}:destination:%%s", s.deploymentID) + } + + actual, err := s.redisClient.HGetAll(s.ctx, fmt.Sprintf(keyFormat, input.TenantID, input.ID)).Result() require.NoError(s.T(), err) assert.NotEqual(s.T(), input.Credentials, actual["credentials"]) decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) @@ -674,6 +684,9 @@ func (s *EntityTestSuite) TestMaxDestinationsPerTenant() { models.WithAvailableTopics(testutil.TestTopics), models.WithMaxDestinationsPerTenant(maxDestinations), } + if s.deploymentID != "" { + opts = append(opts, models.WithDeploymentID(s.deploymentID)) + } limitedStore := models.NewEntityStore(s.redisClient, opts...) tenant := models.Tenant{ From c5285f4b67e6fa8e1eac8ac1f37b71246751378b Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 23:48:39 +0700 Subject: [PATCH 05/20] test: move credentials encryption test out of the suite --- internal/models/entity_test.go | 56 +++++++++++++++++++++++++++++ internal/models/entitysuite_test.go | 39 -------------------- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index 473b8db0..7e55fa80 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -1,8 +1,15 @@ package models_test import ( + "context" + "encoding/json" + "fmt" "testing" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -15,3 +22,52 @@ func TestEntityStore_WithDeploymentID(t *testing.T) { t.Parallel() suite.Run(t, &EntityTestSuite{deploymentID: "dp_test_001"}) } + +// TestDestinationCredentialsEncryption verifies that credentials are properly encrypted +// when stored in Redis. +// +// NOTE: This test accesses Redis implementation details directly to verify encryption. +// While this couples the test to the storage implementation, it's necessary to confirm +// that credentials are actually encrypted at rest. +func TestDestinationCredentialsEncryption(t *testing.T) { + t.Parallel() + + ctx := context.Background() + redisClient := testutil.CreateTestRedisClient(t) + cipher := models.NewAESCipher("secret") + + entityStore := models.NewEntityStore(redisClient, + models.WithCipher(cipher), + models.WithAvailableTopics(testutil.TestTopics), + ) + + input := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("rabbitmq"), + testutil.DestinationFactory.WithTopics([]string{"user.created", "user.updated"}), + testutil.DestinationFactory.WithConfig(map[string]string{ + "server_url": "localhost:5672", + "exchange": "events", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "guest", + "password": "guest", + }), + ) + + err := entityStore.UpsertDestination(ctx, input) + require.NoError(t, err) + + // Access Redis directly to verify encryption (implementation detail) + keyFormat := "tenant:{%s}:destination:%s" + actual, err := redisClient.HGetAll(ctx, fmt.Sprintf(keyFormat, input.TenantID, input.ID)).Result() + require.NoError(t, err) + + // Verify credentials are encrypted (not plaintext) + assert.NotEqual(t, input.Credentials, actual["credentials"]) + + // Verify we can decrypt back to original + decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) + require.NoError(t, err) + jsonCredentials, _ := json.Marshal(input.Credentials) + assert.Equal(t, string(jsonCredentials), string(decryptedCredentials)) +} diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go index 2f9eb6cc..f953af92 100644 --- a/internal/models/entitysuite_test.go +++ b/internal/models/entitysuite_test.go @@ -2,8 +2,6 @@ package models_test import ( "context" - "encoding/json" - "fmt" "testing" "time" @@ -223,43 +221,6 @@ func (s *EntityTestSuite) TestDeleteTenantAndAssociatedDestinations() { } } -func (s *EntityTestSuite) TestDestinationCredentialsEncryption() { - cipher := models.NewAESCipher("secret") - - input := models.Destination{ - ID: uuid.New().String(), - Type: "rabbitmq", - Topics: []string{"user.created", "user.updated"}, - Config: map[string]string{ - "server_url": "localhost:5672", - "exchange": "events", - }, - Credentials: map[string]string{ - "username": "guest", - "password": "guest", - }, - CreatedAt: time.Now(), - DisabledAt: nil, - TenantID: uuid.New().String(), - } - - err := s.entityStore.UpsertDestination(s.ctx, input) - require.NoError(s.T(), err) - - // Build key format based on deploymentID - keyFormat := "tenant:{%s}:destination:%s" - if s.deploymentID != "" { - keyFormat = fmt.Sprintf("deployment:%s:tenant:{%%s}:destination:%%s", s.deploymentID) - } - - actual, err := s.redisClient.HGetAll(s.ctx, fmt.Sprintf(keyFormat, input.TenantID, input.ID)).Result() - require.NoError(s.T(), err) - assert.NotEqual(s.T(), input.Credentials, actual["credentials"]) - decryptedCredentials, err := cipher.Decrypt([]byte(actual["credentials"])) - require.NoError(s.T(), err) - jsonCredentials, _ := json.Marshal(input.Credentials) - assert.Equal(s.T(), string(jsonCredentials), string(decryptedCredentials)) -} // Helper struct for multi-destination tests type multiDestinationData struct { From 4817a6cc758d22ae2fde804a6b200b6f02463e20 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 6 Oct 2025 23:53:29 +0700 Subject: [PATCH 06/20] test: move max destination out of suite --- internal/models/entity_test.go | 52 +++++++++++++++++++++++++++++ internal/models/entitysuite_test.go | 47 -------------------------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index 7e55fa80..60801373 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -5,7 +5,9 @@ import ( "encoding/json" "fmt" "testing" + "time" + "github.com/google/uuid" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" @@ -71,3 +73,53 @@ func TestDestinationCredentialsEncryption(t *testing.T) { jsonCredentials, _ := json.Marshal(input.Credentials) assert.Equal(t, string(jsonCredentials), string(decryptedCredentials)) } + +// TestMaxDestinationsPerTenant verifies that the entity store properly enforces +// the maximum destinations per tenant limit. +func TestMaxDestinationsPerTenant(t *testing.T) { + t.Parallel() + + ctx := context.Background() + redisClient := testutil.CreateTestRedisClient(t) + maxDestinations := 2 + + limitedStore := models.NewEntityStore(redisClient, + models.WithCipher(models.NewAESCipher("secret")), + models.WithAvailableTopics(testutil.TestTopics), + models.WithMaxDestinationsPerTenant(maxDestinations), + ) + + tenant := models.Tenant{ + ID: uuid.New().String(), + CreatedAt: time.Now(), + } + require.NoError(t, limitedStore.UpsertTenant(ctx, tenant)) + + // Should be able to create up to maxDestinations + for i := 0; i < maxDestinations; i++ { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err := limitedStore.CreateDestination(ctx, destination) + require.NoError(t, err, "Should be able to create destination %d", i+1) + } + + // Should fail when trying to create one more + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err := limitedStore.CreateDestination(ctx, destination) + require.Error(t, err) + require.ErrorIs(t, err, models.ErrMaxDestinationsPerTenantReached) + + // Should be able to create after deleting one + destinations, err := limitedStore.ListDestinationByTenant(ctx, tenant.ID) + require.NoError(t, err) + require.NoError(t, limitedStore.DeleteDestination(ctx, tenant.ID, destinations[0].ID)) + + destination = testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithTenantID(tenant.ID), + ) + err = limitedStore.CreateDestination(ctx, destination) + require.NoError(t, err, "Should be able to create destination after deleting one") +} diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go index f953af92..5764d7b6 100644 --- a/internal/models/entitysuite_test.go +++ b/internal/models/entitysuite_test.go @@ -637,50 +637,3 @@ func (s *EntityTestSuite) TestMultiSuiteDeleteAndMatch() { }) } -func (s *EntityTestSuite) TestMaxDestinationsPerTenant() { - // Create a new entity store with limited max destinations - maxDestinations := 2 - opts := []models.EntityStoreOption{ - models.WithCipher(models.NewAESCipher("secret")), - models.WithAvailableTopics(testutil.TestTopics), - models.WithMaxDestinationsPerTenant(maxDestinations), - } - if s.deploymentID != "" { - opts = append(opts, models.WithDeploymentID(s.deploymentID)) - } - limitedStore := models.NewEntityStore(s.redisClient, opts...) - - tenant := models.Tenant{ - ID: uuid.New().String(), - CreatedAt: time.Now(), - } - require.NoError(s.T(), limitedStore.UpsertTenant(s.ctx, tenant)) - - // Should be able to create up to maxDestinations - for i := 0; i < maxDestinations; i++ { - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err := limitedStore.CreateDestination(s.ctx, destination) - require.NoError(s.T(), err, "Should be able to create destination %d", i+1) - } - - // Should fail when trying to create one more - destination := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err := limitedStore.CreateDestination(s.ctx, destination) - require.Error(s.T(), err) - require.ErrorIs(s.T(), err, models.ErrMaxDestinationsPerTenantReached) - - // Should be able to create after deleting one - destinations, err := limitedStore.ListDestinationByTenant(s.ctx, tenant.ID) - require.NoError(s.T(), err) - require.NoError(s.T(), limitedStore.DeleteDestination(s.ctx, tenant.ID, destinations[0].ID)) - - destination = testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithTenantID(tenant.ID), - ) - err = limitedStore.CreateDestination(s.ctx, destination) - require.NoError(s.T(), err, "Should be able to create destination after deleting one") -} From 4fd47360bd5b4082b9def63bb38e3903f19034e9 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 00:00:57 +0700 Subject: [PATCH 07/20] test: deployment isolation --- internal/models/entity_test.go | 86 ++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/internal/models/entity_test.go b/internal/models/entity_test.go index 60801373..7c89980f 100644 --- a/internal/models/entity_test.go +++ b/internal/models/entity_test.go @@ -123,3 +123,89 @@ func TestMaxDestinationsPerTenant(t *testing.T) { err = limitedStore.CreateDestination(ctx, destination) require.NoError(t, err, "Should be able to create destination after deleting one") } + +// TestDeploymentIsolation verifies that entity stores with different deployment IDs +// are completely isolated from each other, even when sharing the same Redis instance. +func TestDeploymentIsolation(t *testing.T) { + t.Parallel() + + ctx := context.Background() + redisClient := testutil.CreateTestRedisClient(t) + + // Create two entity stores with different deployment IDs + store1 := models.NewEntityStore(redisClient, + models.WithCipher(models.NewAESCipher("secret")), + models.WithAvailableTopics(testutil.TestTopics), + models.WithDeploymentID("dp_001"), + ) + + store2 := models.NewEntityStore(redisClient, + models.WithCipher(models.NewAESCipher("secret")), + models.WithAvailableTopics(testutil.TestTopics), + models.WithDeploymentID("dp_002"), + ) + + // Use the SAME tenant ID and destination ID for both deployments + tenantID := uuid.New().String() + destinationID := uuid.New().String() + + // Create tenant in both deployments + tenant := models.Tenant{ + ID: tenantID, + CreatedAt: time.Now(), + } + require.NoError(t, store1.UpsertTenant(ctx, tenant)) + require.NoError(t, store2.UpsertTenant(ctx, tenant)) + + // Create destination with same ID but different config in each deployment + destination1 := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID(destinationID), + testutil.DestinationFactory.WithTenantID(tenantID), + testutil.DestinationFactory.WithConfig(map[string]string{ + "deployment": "dp_001", + }), + ) + destination2 := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID(destinationID), + testutil.DestinationFactory.WithTenantID(tenantID), + testutil.DestinationFactory.WithConfig(map[string]string{ + "deployment": "dp_002", + }), + ) + + require.NoError(t, store1.CreateDestination(ctx, destination1)) + require.NoError(t, store2.CreateDestination(ctx, destination2)) + + // Verify store1 only sees its own data + retrieved1, err := store1.RetrieveDestination(ctx, tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, "dp_001", retrieved1.Config["deployment"], "Store 1 should see its own data") + + // Verify store2 only sees its own data + retrieved2, err := store2.RetrieveDestination(ctx, tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, "dp_002", retrieved2.Config["deployment"], "Store 2 should see its own data") + + // Verify list operations are also isolated + list1, err := store1.ListDestinationByTenant(ctx, tenantID) + require.NoError(t, err) + require.Len(t, list1, 1, "Store 1 should only see 1 destination") + assert.Equal(t, "dp_001", list1[0].Config["deployment"]) + + list2, err := store2.ListDestinationByTenant(ctx, tenantID) + require.NoError(t, err) + require.Len(t, list2, 1, "Store 2 should only see 1 destination") + assert.Equal(t, "dp_002", list2[0].Config["deployment"]) + + // Verify deleting from one deployment doesn't affect the other + require.NoError(t, store1.DeleteDestination(ctx, tenantID, destinationID)) + + // Store1 should not find the destination + _, err = store1.RetrieveDestination(ctx, tenantID, destinationID) + require.ErrorIs(t, err, models.ErrDestinationDeleted) + + // Store2 should still have its destination + retrieved2Again, err := store2.RetrieveDestination(ctx, tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, "dp_002", retrieved2Again.Config["deployment"], "Store 2 data should be unaffected") +} From aee233e56b3444b71c35e1f525d9533181637bb0 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 00:09:29 +0700 Subject: [PATCH 08/20] feat: deployment id support to alert store --- internal/alert/monitor.go | 20 +++++++--- internal/alert/store.go | 19 +++++++-- internal/alert/store_test.go | 77 +++++++++++++++++++++++++++++++++++- 3 files changed, 104 insertions(+), 12 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 156de7f0..42f6dfe9 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -73,6 +73,13 @@ func WithLogger(logger *logging.Logger) AlertOption { } } +// WithDeploymentID sets the deployment ID for the monitor +func WithDeploymentID(deploymentID string) AlertOption { + return func(m *alertMonitor) { + m.deploymentID = deploymentID + } +} + // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { Success bool @@ -83,11 +90,12 @@ type DeliveryAttempt struct { } type alertMonitor struct { - logger *logging.Logger - store AlertStore - evaluator AlertEvaluator - notifier AlertNotifier - disabler DestinationDisabler + logger *logging.Logger + store AlertStore + evaluator AlertEvaluator + notifier AlertNotifier + disabler DestinationDisabler + deploymentID string // autoDisableFailureCount is the number of consecutive failures before auto-disabling autoDisableFailureCount int @@ -119,7 +127,7 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ... } if alertMonitor.store == nil { - alertMonitor.store = NewRedisAlertStore(redisClient) + alertMonitor.store = NewRedisAlertStore(redisClient, alertMonitor.deploymentID) } if alertMonitor.evaluator == nil { diff --git a/internal/alert/store.go b/internal/alert/store.go index ca84ae29..8df529cb 100644 --- a/internal/alert/store.go +++ b/internal/alert/store.go @@ -20,12 +20,16 @@ type AlertStore interface { } type redisAlertStore struct { - client redis.Cmdable + client redis.Cmdable + deploymentID string } // NewRedisAlertStore creates a new Redis-backed alert store -func NewRedisAlertStore(client redis.Cmdable) AlertStore { - return &redisAlertStore{client: client} +func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore { + return &redisAlertStore{ + client: client, + deploymentID: deploymentID, + } } func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) { @@ -57,6 +61,13 @@ func (s *redisAlertStore) ResetConsecutiveFailureCount(ctx context.Context, tena return s.client.Del(ctx, s.getFailuresKey(destinationID)).Err() } +func (s *redisAlertStore) deploymentPrefix() string { + if s.deploymentID == "" { + return "" + } + return fmt.Sprintf("deployment:%s:", s.deploymentID) +} + func (s *redisAlertStore) getFailuresKey(destinationID string) string { - return fmt.Sprintf("%s:%s:%s", keyPrefixAlert, destinationID, keyFailures) + return fmt.Sprintf("%s%s:%s:%s", s.deploymentPrefix(), keyPrefixAlert, destinationID, keyFailures) } diff --git a/internal/alert/store_test.go b/internal/alert/store_test.go index 59f2598b..f70a0189 100644 --- a/internal/alert/store_test.go +++ b/internal/alert/store_test.go @@ -16,7 +16,7 @@ func TestRedisAlertStore(t *testing.T) { t.Run("increment consecutive failures", func(t *testing.T) { t.Parallel() redisClient := testutil.CreateTestRedisClient(t) - store := alert.NewRedisAlertStore(redisClient) + store := alert.NewRedisAlertStore(redisClient, "") // First increment count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") @@ -32,7 +32,7 @@ func TestRedisAlertStore(t *testing.T) { t.Run("reset consecutive failures", func(t *testing.T) { t.Parallel() redisClient := testutil.CreateTestRedisClient(t) - store := alert.NewRedisAlertStore(redisClient) + store := alert.NewRedisAlertStore(redisClient, "") // Set up initial failures count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2") @@ -49,3 +49,76 @@ func TestRedisAlertStore(t *testing.T) { assert.Equal(t, 1, count) }) } + +func TestRedisAlertStore_WithDeploymentID(t *testing.T) { + t.Parallel() + + redisClient := testutil.CreateTestRedisClient(t) + store := alert.NewRedisAlertStore(redisClient, "dp_test_001") + + // Test increment with deployment ID + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + require.NoError(t, err) + assert.Equal(t, 1, count) + + // Second increment + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + require.NoError(t, err) + assert.Equal(t, 2, count) + + // Test reset with deployment ID + err = store.ResetConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + require.NoError(t, err) + + // Verify counter is reset + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + require.NoError(t, err) + assert.Equal(t, 1, count) +} + +func TestAlertStoreIsolation(t *testing.T) { + t.Parallel() + + redisClient := testutil.CreateTestRedisClient(t) + + // Create two stores with different deployment IDs + store1 := alert.NewRedisAlertStore(redisClient, "dp_001") + store2 := alert.NewRedisAlertStore(redisClient, "dp_002") + + // Use same tenant/destination IDs for both + tenantID := "tenant_shared" + destinationID := "dest_shared" + + // Increment in store1 + count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 1, count1) + + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 2, count1) + + // Increment in store2 - should start at 1 (isolated from store1) + count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 1, count2, "Store 2 should have its own counter") + + // Increment store1 again - should continue from 2 + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 3, count1, "Store 1 counter should be unaffected by store 2") + + // Reset store1 - should not affect store2 + err = store1.ResetConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + + // Verify store1 is reset + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 1, count1, "Store 1 should be reset") + + // Verify store2 is unaffected + count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + require.NoError(t, err) + assert.Equal(t, 2, count2, "Store 2 should be unaffected by store 1 reset") +} From 7bc0dd54d8b149f7a7b3f91b391a5ec0e79927c4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 00:40:31 +0700 Subject: [PATCH 09/20] feat: namespace rsmq with deployment id --- internal/deliverymq/retry.go | 34 ++++++++++++++++-- internal/scheduler/scheduler.go | 53 +--------------------------- internal/scheduler/scheduler_test.go | 29 ++++++++++++--- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index 8232e2e4..c07d7fde 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -3,14 +3,43 @@ package deliverymq import ( "context" "encoding/json" + "fmt" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/redis" + "github.com/hookdeck/outpost/internal/rsmq" "github.com/hookdeck/outpost/internal/scheduler" ) -func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, logger *logging.Logger) scheduler.Scheduler { +func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) scheduler.Scheduler { + // Create Redis client for RSMQ + ctx := context.Background() + redisClient, err := redis.NewClient(ctx, redisConfig) + if err != nil { + panic(fmt.Sprintf("Failed to create Redis client for retry scheduler: %v", err)) + } + + // Create RSMQ adapter + adapter := rsmq.NewRedisAdapter(redisClient) + + // Construct RSMQ namespace with deployment prefix if provided + // This creates keys like: deployment:dp_001:rsmq:QUEUES, deployment:dp_001:rsmq:deliverymq-retry:Q + // Without deployment ID: rsmq:QUEUES, rsmq:deliverymq-retry:Q + namespace := "rsmq" + if deploymentID != "" { + namespace = fmt.Sprintf("deployment:%s:rsmq", deploymentID) + } + + // Create RSMQ client with deployment-aware namespace + var rsmqClient *rsmq.RedisSMQ + if logger != nil { + rsmqClient = rsmq.NewRedisSMQ(adapter, namespace, logger) + } else { + rsmqClient = rsmq.NewRedisSMQ(adapter, namespace) + } + + // Define execution function exec := func(ctx context.Context, msg string) error { retryMessage := RetryMessage{} if err := retryMessage.FromString(msg); err != nil { @@ -22,7 +51,8 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, l } return nil } - return scheduler.New("deliverymq-retry", redisConfig, exec, scheduler.WithLogger(logger)) + + return scheduler.New("deliverymq-retry", rsmqClient, exec) } type RetryMessage struct { diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 404ffc40..5d4a4ed7 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -2,13 +2,9 @@ package scheduler import ( "context" - "fmt" "time" - "github.com/hookdeck/outpost/internal/logging" - iredis "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/rsmq" - "go.uber.org/zap" ) type ScheduleOption func(*ScheduleOptions) @@ -40,7 +36,6 @@ type schedulerImpl struct { type config struct { visibilityTimeout uint - logger *logging.Logger } func WithVisibilityTimeout(vt uint) func(*config) { @@ -49,14 +44,7 @@ func WithVisibilityTimeout(vt uint) func(*config) { } } -func WithLogger(logger *logging.Logger) func(*config) { - return func(c *config) { - c.logger = logger - } -} - -func New(name string, redisConfig *iredis.RedisConfig, exec func(context.Context, string) error, opts ...func(*config)) Scheduler { - // Extract configuration including logger first +func New(name string, rsmqClient *rsmq.RedisSMQ, exec func(context.Context, string) error, opts ...func(*config)) Scheduler { config := &config{ visibilityTimeout: rsmq.UnsetVt, } @@ -64,45 +52,6 @@ func New(name string, redisConfig *iredis.RedisConfig, exec func(context.Context opt(config) } - var rsmqClient *rsmq.RedisSMQ - ctx := context.Background() - - // Create a new Redis client for this scheduler instance - // Each scheduler should have its own connection, not share the singleton - redisClient, err := iredis.NewClient(ctx, redisConfig) - if err != nil { - if config.logger != nil { - config.logger.Error("Redis client creation failed", - zap.Error(err), - zap.String("host", redisConfig.Host), - zap.Int("port", redisConfig.Port), - zap.Bool("tls", redisConfig.TLSEnabled)) - } - panic(fmt.Sprintf("Redis client creation failed: %v", err)) - } - - // Create adapter to make v9 client compatible with RSMQ - adapter := rsmq.NewRedisAdapter(redisClient) - - if config.logger != nil { - logFields := []zap.Field{ - zap.String("host", redisConfig.Host), - zap.Int("port", redisConfig.Port), - zap.Bool("tls", redisConfig.TLSEnabled), - } - if !redisConfig.ClusterEnabled { - logFields = append(logFields, zap.Int("database", redisConfig.Database)) - } - config.logger.Info("Redis client initialized successfully", logFields...) - } - - // Create RSMQ client with the adapter - if config.logger != nil { - rsmqClient = rsmq.NewRedisSMQ(adapter, "rsmq", config.logger) - } else { - rsmqClient = rsmq.NewRedisSMQ(adapter, "rsmq") - } - return &schedulerImpl{ rsmqClient: rsmqClient, config: config, diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index c1892945..55378079 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -3,19 +3,33 @@ package scheduler_test import ( "context" "errors" + "fmt" "testing" "time" "github.com/google/uuid" + iredis "github.com/hookdeck/outpost/internal/redis" + "github.com/hookdeck/outpost/internal/rsmq" "github.com/hookdeck/outpost/internal/scheduler" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/require" ) +// createRSMQClient creates an RSMQ client for testing +func createRSMQClient(t *testing.T, redisConfig *iredis.RedisConfig) *rsmq.RedisSMQ { + ctx := context.Background() + redisClient, err := iredis.NewClient(ctx, redisConfig) + require.NoError(t, err) + + adapter := rsmq.NewRedisAdapter(redisClient) + return rsmq.NewRedisSMQ(adapter, "rsmq") +} + func TestScheduler_Basic(t *testing.T) { t.Parallel() redisConfig := testutil.CreateTestRedisConfig(t) + rsmqClient := createRSMQClient(t, redisConfig) msgs := []string{} exec := func(_ context.Context, id string) error { @@ -24,7 +38,7 @@ func TestScheduler_Basic(t *testing.T) { } ctx := context.Background() - s := scheduler.New("scheduler", redisConfig, exec) + s := scheduler.New("scheduler", rsmqClient, exec) require.NoError(t, s.Init(ctx)) defer s.Shutdown() go s.Monitor(ctx) @@ -57,6 +71,7 @@ func TestScheduler_ParallelMonitor(t *testing.T) { t.Parallel() redisConfig := testutil.CreateTestRedisConfig(t) + rsmqClient := createRSMQClient(t, redisConfig) msgs := []string{} exec := func(_ context.Context, id string) error { @@ -65,7 +80,7 @@ func TestScheduler_ParallelMonitor(t *testing.T) { } ctx := context.Background() - s := scheduler.New("scheduler", redisConfig, exec) + s := scheduler.New("scheduler", rsmqClient, exec) require.NoError(t, s.Init(ctx)) defer s.Shutdown() @@ -101,6 +116,7 @@ func TestScheduler_VisibilityTimeout(t *testing.T) { t.Parallel() redisConfig := testutil.CreateTestRedisConfig(t) + rsmqClient := createRSMQClient(t, redisConfig) msgs := []string{} exec := func(_ context.Context, id string) error { @@ -110,7 +126,7 @@ func TestScheduler_VisibilityTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) defer cancel() - s := scheduler.New("scheduler", redisConfig, exec, scheduler.WithVisibilityTimeout(1)) + s := scheduler.New("scheduler", rsmqClient, exec, scheduler.WithVisibilityTimeout(1)) require.NoError(t, s.Init(ctx)) defer s.Shutdown() @@ -139,7 +155,8 @@ func TestScheduler_CustomID(t *testing.T) { return nil } - s := scheduler.New(uuid.New().String(), redisConfig, exec) + rsmqClient := createRSMQClient(t, redisConfig, "") + s := scheduler.New(uuid.New().String(), rsmqClient, exec) require.NoError(t, s.Init(ctx)) go s.Monitor(ctx) @@ -243,7 +260,8 @@ func TestScheduler_Cancel(t *testing.T) { return nil } - s := scheduler.New(uuid.New().String(), redisConfig, exec) + rsmqClient := createRSMQClient(t, redisConfig, "") + s := scheduler.New(uuid.New().String(), rsmqClient, exec) require.NoError(t, s.Init(ctx)) go s.Monitor(ctx) @@ -281,3 +299,4 @@ func TestScheduler_Cancel(t *testing.T) { require.NoError(t, s.Cancel(ctx, id)) }) } + From a082f85c801d98118df22fe4ab0029f2df07579e Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 00:53:30 +0700 Subject: [PATCH 10/20] chore: makefile --- Makefile | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index da22db13..567d0dac 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ TEST?=$$(go list ./...) +RUN?= # Build targets .PHONY: build @@ -122,13 +123,17 @@ test/setup: @echo "" test: - go test $(TEST) $(TESTARGS) + @if [ "$(RUN)" != "" ]; then \ + $(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "$(RUN)"; \ + else \ + $(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS); \ + fi test/unit: - go test $(TEST) $(TESTARGS) -short + $(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -short test/integration: - go test $(TEST) $(TESTARGS) -run "Integration" + $(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "Integration" test/e2e/rediscluster: @echo "Running Redis cluster e2e tests in Docker container..." From dcfdcc7d74ed0bc599348e7101c90fe2d1cbd5b2 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 01:03:01 +0700 Subject: [PATCH 11/20] chore: application integration --- internal/deliverymq/retry_test.go | 2 +- internal/scheduler/scheduler_test.go | 5 ++--- internal/services/api/api.go | 3 ++- internal/services/delivery/delivery.go | 4 +++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 1be5be14..ea341af0 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -45,7 +45,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { require.NoError(t, err) // Setup retry scheduler - retryScheduler := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), testutil.CreateTestLogger(t)) + retryScheduler := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t)) require.NoError(t, retryScheduler.Init(s.ctx)) go retryScheduler.Monitor(s.ctx) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 55378079..7b20325c 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -3,7 +3,6 @@ package scheduler_test import ( "context" "errors" - "fmt" "testing" "time" @@ -155,7 +154,7 @@ func TestScheduler_CustomID(t *testing.T) { return nil } - rsmqClient := createRSMQClient(t, redisConfig, "") + rsmqClient := createRSMQClient(t, redisConfig) s := scheduler.New(uuid.New().String(), rsmqClient, exec) require.NoError(t, s.Init(ctx)) go s.Monitor(ctx) @@ -260,7 +259,7 @@ func TestScheduler_Cancel(t *testing.T) { return nil } - rsmqClient := createRSMQClient(t, redisConfig, "") + rsmqClient := createRSMQClient(t, redisConfig) s := scheduler.New(uuid.New().String(), rsmqClient, exec) require.NoError(t, s.Init(ctx)) go s.Monitor(ctx) diff --git a/internal/services/api/api.go b/internal/services/api/api.go index 62e91669..76d2e9bf 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -118,6 +118,7 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log models.WithCipher(models.NewAESCipher(cfg.AESEncryptionSecret)), models.WithAvailableTopics(cfg.Topics), models.WithMaxDestinationsPerTenant(cfg.MaxDestinationsPerTenant), + models.WithDeploymentID(cfg.DeploymentID), ) logger.Debug("creating event handler and router") @@ -143,7 +144,7 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log // deliverymqRetryScheduler logger.Debug("creating delivery MQ retry scheduler") - deliverymqRetryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), logger) + deliverymqRetryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) logger.Debug("initializing delivery MQ retry scheduler - this may perform Redis operations") if err := deliverymqRetryScheduler.Init(ctx); err != nil { logger.Error("delivery MQ retry scheduler initialization failed", diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 635ed0d8..55cbaefc 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -94,6 +94,7 @@ func NewService(ctx context.Context, models.WithCipher(models.NewAESCipher(cfg.AESEncryptionSecret)), models.WithAvailableTopics(cfg.Topics), models.WithMaxDestinationsPerTenant(cfg.MaxDestinationsPerTenant), + models.WithDeploymentID(cfg.DeploymentID), ) logstoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ @@ -111,7 +112,7 @@ func NewService(ctx context.Context, return nil, err } - retryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), logger) + retryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) if err := retryScheduler.Init(ctx); err != nil { return nil, err } @@ -133,6 +134,7 @@ func NewService(ctx context.Context, alert.WithNotifier(alertNotifier), alert.WithDisabler(destinationDisabler), alert.WithAutoDisableFailureCount(cfg.Alert.ConsecutiveFailureCount), + alert.WithDeploymentID(cfg.DeploymentID), ) handler = deliverymq.NewMessageHandler( From c57d81838fbe72c3d3db2b17f492d1ec9c90f9b4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 01:07:44 +0700 Subject: [PATCH 12/20] docs: add comments to clarify migration tool --- cmd/outpost-migrate-redis/README.md | 12 ++++-- .../migration/001_hash_tags/README.md | 41 ++++++++++++++++++- .../migration/001_hash_tags/hash_tags.go | 12 +++++- 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/cmd/outpost-migrate-redis/README.md b/cmd/outpost-migrate-redis/README.md index 3d7144ad..0988bc16 100644 --- a/cmd/outpost-migrate-redis/README.md +++ b/cmd/outpost-migrate-redis/README.md @@ -109,9 +109,15 @@ Migrates Redis keys from legacy format to hash-tagged format for Redis Cluster c **Purpose:** Ensures all keys for a tenant are routed to the same Redis Cluster node by using hash tags. **Key Transformations:** -- `tenant::*` → `{tenant:}:*` -- `destination_summary::` → `{tenant:}:destination_summary:` -- Individual destination keys are properly hash-tagged by tenant +- `tenant:123` → `tenant:{123}:tenant` +- `tenant:123:destinations` → `tenant:{123}:destinations` +- `tenant:123:destination:abc` → `tenant:{123}:destination:abc` + +**Deployment Mode Note:** If you are using `DEPLOYMENT_ID` configuration, this migration is **not needed**. Deployment-scoped keys already include hash tags: +- `deployment:dp_001:tenant:{123}:tenant` (already has hash tags) +- `deployment:dp_001:tenant:{123}:destinations` (already has hash tags) + +See [001_hash_tags/README.md](./migration/001_hash_tags/README.md) for details. **Safety:** This migration preserves original keys. Use the cleanup command after verification to remove old keys. diff --git a/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md b/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md index 23cd6e7e..4a4cb218 100644 --- a/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md +++ b/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md @@ -44,8 +44,47 @@ After verification, removes all legacy keys: - Requires confirmation unless `-force` flag is used - Processes deletions in batches of 100 keys +## Deployment Mode Compatibility + +### When This Migration is NOT Needed + +If you are using the `DEPLOYMENT_ID` configuration option (or `deployment_id` in YAML), **you can skip this migration entirely**. Deployments using deployment IDs already have keys in the correct format: + +``` +deployment:dp_001:tenant:{123}:tenant +deployment:dp_001:tenant:{123}:destinations +deployment:dp_001:tenant:{123}:destination:abc +``` + +These keys already include hash tags `{123}` and are Redis Cluster compatible. + +### When This Migration IS Needed + +This migration is only required for legacy deployments that: +1. Started before hash tag support was added +2. Are **NOT** using `DEPLOYMENT_ID` configuration +3. Have keys in the old format without curly braces: + ``` + tenant:123 + tenant:123:destinations + tenant:123:destination:abc + ``` + +### Checking If You Need This Migration + +Run the migration planner to check: +```bash +outpost-migrate-redis plan +``` + +If the output shows `0 tenants to migrate`, your deployment either: +- Already has hash tags (you're good!) +- Is using deployment IDs (you're good!) +- Has no data yet (you're good!) + ## Notes - Original keys are preserved during Apply phase for rollback safety - Migration is idempotent - can be run multiple times safely -- Skips tenants that are already migrated \ No newline at end of file +- Skips tenants that are already migrated +- Does not touch deployment-prefixed keys (`deployment:*`) \ No newline at end of file diff --git a/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go b/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go index cd70ec62..75dc1c9d 100644 --- a/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go +++ b/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go @@ -10,7 +10,17 @@ import ( "github.com/hookdeck/outpost/internal/redis" ) -// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format ({tenant}:*) +// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format (tenant:{ID}:*) +// +// NOTE: This migration only handles non-deployment-prefixed keys. +// If you are using DEPLOYMENT_ID configuration, your keys already have the correct format +// with deployment prefixes (deployment:{ID}:tenant:{TENANT_ID}:*) and hash tags are already +// in place. In that case, this migration can be safely skipped. +// +// This migration is only needed for legacy deployments that: +// - Started before hash tag support was added +// - Are NOT using DEPLOYMENT_ID configuration +// - Have keys in the old format: tenant:ID:* (without curly braces) type HashTagsMigration struct { client redis.Client logger migration.Logger From a6be167d6d4d90bd35c88eebeac7219f2a0863bd Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 02:19:18 +0700 Subject: [PATCH 13/20] test: deployment id e2e suite --- cmd/e2e/configs/basic.go | 6 ++++-- cmd/e2e/suites_test.go | 18 ++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index 18d9a6b3..fe7f1fa5 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -25,8 +25,9 @@ const ( ) type BasicOpts struct { - LogStorage LogStorageType - RedisConfig *redis.RedisConfig // Optional Redis config override + LogStorage LogStorageType + RedisConfig *redis.RedisConfig // Optional Redis config override + DeploymentID string // Optional deployment ID for multi-tenancy testing } func Basic(t *testing.T, opts BasicOpts) config.Config { @@ -83,6 +84,7 @@ func Basic(t *testing.T, opts BasicOpts) config.Config { c.RetryMaxLimit = 3 c.LogBatchThresholdSeconds = 1 c.LogBatchSize = 100 + c.DeploymentID = opts.DeploymentID // Setup cleanup t.Cleanup(func() { diff --git a/cmd/e2e/suites_test.go b/cmd/e2e/suites_test.go index 9f00548c..78db3f37 100644 --- a/cmd/e2e/suites_test.go +++ b/cmd/e2e/suites_test.go @@ -126,6 +126,7 @@ type basicSuite struct { e2eSuite logStorageType configs.LogStorageType redisConfig *redis.RedisConfig // Optional Redis config override + deploymentID string // Optional deployment ID alertServer *alert.AlertMockServer } @@ -143,8 +144,9 @@ func (suite *basicSuite) SetupSuite() { // Configure alert callback URL cfg := configs.Basic(t, configs.BasicOpts{ - LogStorage: suite.logStorageType, - RedisConfig: suite.redisConfig, + LogStorage: suite.logStorageType, + RedisConfig: suite.redisConfig, + DeploymentID: suite.deploymentID, }) cfg.Alert.CallbackURL = alertServer.GetCallbackURL() @@ -204,3 +206,15 @@ func TestRedisClusterBasicSuite(t *testing.T) { redisConfig: redisConfig, }) } + +func TestBasicSuiteWithDeploymentID(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip("skipping e2e test") + } + + suite.Run(t, &basicSuite{ + logStorageType: configs.LogStorageTypePostgres, + deploymentID: "dp_e2e_test", + }) +} From 85c1f4f74e056c792db87a515429a6faf01d2ff2 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 02:52:40 +0700 Subject: [PATCH 14/20] fix: remove redis singleton pattern --- cmd/e2e/configs/basic.go | 2 +- internal/app/installation_test.go | 4 +- internal/deliverymq/retry.go | 2 +- internal/infra/infra_test.go | 6 +-- internal/redis/redis.go | 59 +++++++--------------------- internal/scheduler/scheduler_test.go | 2 +- 6 files changed, 23 insertions(+), 52 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index fe7f1fa5..e765e251 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -134,7 +134,7 @@ func CreateRedisClusterConfig(t *testing.T) *redis.RedisConfig { // Test Redis connection before returning t.Logf("Testing Redis cluster connection to %s:%d", redisHost, redisPort) testCtx := context.Background() - _, err := redis.NewClient(testCtx, redisConfig) + _, err := redis.New(testCtx, redisConfig) if err != nil { t.Fatalf("Failed to create Redis client: %v", err) } diff --git a/internal/app/installation_test.go b/internal/app/installation_test.go index 23a8892a..03c9f8ac 100644 --- a/internal/app/installation_test.go +++ b/internal/app/installation_test.go @@ -17,7 +17,7 @@ func TestGetInstallationAtomic(t *testing.T) { ctx := context.Background() redisConfig := testutil.CreateTestRedisConfig(t) - redisClient, err := redis.NewClient(ctx, redisConfig) + redisClient, err := redis.New(ctx, redisConfig) require.NoError(t, err) // Clear any existing installation ID @@ -51,7 +51,7 @@ func TestGetInstallationConcurrency(t *testing.T) { // This test simulates concurrent access to verify atomicity ctx := context.Background() redisConfig := testutil.CreateTestRedisConfig(t) - redisClient, err := redis.NewClient(ctx, redisConfig) + redisClient, err := redis.New(ctx, redisConfig) require.NoError(t, err) // Clear any existing installation ID diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index c07d7fde..92676442 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -15,7 +15,7 @@ import ( func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) scheduler.Scheduler { // Create Redis client for RSMQ ctx := context.Background() - redisClient, err := redis.NewClient(ctx, redisConfig) + redisClient, err := redis.New(ctx, redisConfig) if err != nil { panic(fmt.Sprintf("Failed to create Redis client for retry scheduler: %v", err)) } diff --git a/internal/infra/infra_test.go b/internal/infra/infra_test.go index 6ef380f6..5537b15b 100644 --- a/internal/infra/infra_test.go +++ b/internal/infra/infra_test.go @@ -64,7 +64,7 @@ func newTestInfra(t *testing.T, provider infra.InfraProvider, lockKey string) *i redisConfig := testutil.CreateTestRedisConfig(t) ctx := context.Background() - client, err := redis.NewClient(ctx, redisConfig) + client, err := redis.New(ctx, redisConfig) require.NoError(t, err) return newTestInfraWithRedis(t, provider, lockKey, client) @@ -126,7 +126,7 @@ func TestInfra_ConcurrentNodes(t *testing.T) { lockKey := "test:lock:" + uuid.New().String() redisConfig := testutil.CreateTestRedisConfig(t) - client, err := redis.NewClient(ctx, redisConfig) + client, err := redis.New(ctx, redisConfig) require.NoError(t, err) const numNodes = 10 @@ -179,7 +179,7 @@ func TestInfra_LockExpiry(t *testing.T) { Password: "", Database: 0, } - client, err := redis.NewClient(ctx, redisConfig) + client, err := redis.New(ctx, redisConfig) require.NoError(t, err) // Create and acquire lock with 1 second TTL diff --git a/internal/redis/redis.go b/internal/redis/redis.go index bb13f2bf..815ccc33 100644 --- a/internal/redis/redis.go +++ b/internal/redis/redis.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "strings" - "sync" "github.com/redis/go-redis/extra/redisotel/v9" r "github.com/redis/go-redis/v9" @@ -32,36 +31,25 @@ const ( TxFailedErr = r.TxFailedErr ) -var ( - once sync.Once - client Client - initializationError error -) - -func New(ctx context.Context, config *RedisConfig) (r.Cmdable, error) { - once.Do(func() { - initializeClient(ctx, config) - if initializationError == nil { - initializationError = instrumentOpenTelemetry() - } - }) +func New(ctx context.Context, config *RedisConfig) (Client, error) { + var client Client + var err error - // Ensure we never return nil client without an error - if client == nil && initializationError == nil { - initializationError = fmt.Errorf("redis client initialization failed: unexpected state") + if config.ClusterEnabled { + client, err = createClusterClient(ctx, config) + } else { + client, err = createRegularClient(ctx, config) } - return client, initializationError -} + if err != nil { + return nil, err + } -// NewClient creates a new Redis client without using the singleton -// This should be used by components that need their own Redis connection, -// such as libraries or in test scenarios where isolation is required -func NewClient(ctx context.Context, config *RedisConfig) (r.Cmdable, error) { - if config.ClusterEnabled { - return createClusterClient(ctx, config) + if err := instrumentOpenTelemetry(client); err != nil { + return nil, err } - return createRegularClient(ctx, config) + + return client, nil } func createClusterClient(ctx context.Context, config *RedisConfig) (Client, error) { @@ -127,7 +115,7 @@ func createRegularClient(ctx context.Context, config *RedisConfig) (Client, erro return regularClient, nil } -func instrumentOpenTelemetry() error { +func instrumentOpenTelemetry(client Client) error { // OpenTelemetry instrumentation requires a concrete client type for type assertions if concreteClient, ok := client.(*r.Client); ok { if err := redisotel.InstrumentTracing(concreteClient); err != nil { @@ -140,20 +128,3 @@ func instrumentOpenTelemetry() error { } return nil } - -func initializeClient(ctx context.Context, config *RedisConfig) { - var err error - if config.ClusterEnabled { - client, err = createClusterClient(ctx, config) - if err != nil { - initializationError = fmt.Errorf("redis cluster connection failed: %w", err) - return - } - } else { - client, err = createRegularClient(ctx, config) - if err != nil { - initializationError = fmt.Errorf("redis regular client connection failed: %w", err) - return - } - } -} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 7b20325c..69066ee6 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -17,7 +17,7 @@ import ( // createRSMQClient creates an RSMQ client for testing func createRSMQClient(t *testing.T, redisConfig *iredis.RedisConfig) *rsmq.RedisSMQ { ctx := context.Background() - redisClient, err := iredis.NewClient(ctx, redisConfig) + redisClient, err := iredis.New(ctx, redisConfig) require.NoError(t, err) adapter := rsmq.NewRedisAdapter(redisClient) From d5628ac15d582395d015bce23a8748e544e0388b Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 11:50:26 +0700 Subject: [PATCH 15/20] docs: config generate --- docs/pages/references/configuration.mdx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index ba54360d..e7d77e02 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -43,6 +43,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes | | `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No | | `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No | +| `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No | | `DESTINATIONS_AWS_KINESIS_METADATA_IN_PAYLOAD` | If true, includes Outpost metadata (event ID, topic, etc.) within the Kinesis record payload. | `true` | No | | `DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP` | If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging. | `false` | No | | `DESTINATIONS_METADATA_PATH` | Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set. | `config/outpost/destinations` | No | @@ -163,6 +164,9 @@ delivery_max_concurrency: 1 # Timeout in seconds for HTTP requests made during event delivery to webhook destinations. delivery_timeout_seconds: 5 +# Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. +deployment_id: "" + # Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set. destination_metadata_path: "" From 3c34f853bc69bd484154fce313d52840da71f26e Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 12:39:15 +0700 Subject: [PATCH 16/20] style: gofmt --- cmd/e2e/configs/basic.go | 2 +- cmd/e2e/suites_test.go | 2 +- internal/app/app.go | 8 ++++---- internal/destregistry/testing/publisher_suite.go | 4 ++-- internal/models/entitysuite_test.go | 2 -- internal/scheduler/scheduler_test.go | 1 - 6 files changed, 8 insertions(+), 11 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index e765e251..ae624ec8 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -27,7 +27,7 @@ const ( type BasicOpts struct { LogStorage LogStorageType RedisConfig *redis.RedisConfig // Optional Redis config override - DeploymentID string // Optional deployment ID for multi-tenancy testing + DeploymentID string // Optional deployment ID for multi-tenancy testing } func Basic(t *testing.T, opts BasicOpts) config.Config { diff --git a/cmd/e2e/suites_test.go b/cmd/e2e/suites_test.go index 78db3f37..d76471fe 100644 --- a/cmd/e2e/suites_test.go +++ b/cmd/e2e/suites_test.go @@ -126,7 +126,7 @@ type basicSuite struct { e2eSuite logStorageType configs.LogStorageType redisConfig *redis.RedisConfig // Optional Redis config override - deploymentID string // Optional deployment ID + deploymentID string // Optional deployment ID alertServer *alert.AlertMockServer } diff --git a/internal/app/app.go b/internal/app/app.go index a2d0ecf5..b65047dd 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -198,10 +198,10 @@ func constructServices( // MIGRATION LOCK BEHAVIOR: // - Database locks are only acquired when migrations need to be performed // - When multiple nodes start simultaneously and migrations are pending: -// 1. One node acquires the lock and performs migrations (ideally < 5 seconds) -// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock") -// 3. Failed nodes wait 5 seconds and retry -// 4. On retry, migrations are complete and nodes proceed successfully +// 1. One node acquires the lock and performs migrations (ideally < 5 seconds) +// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock") +// 3. Failed nodes wait 5 seconds and retry +// 4. On retry, migrations are complete and nodes proceed successfully // // RETRY STRATEGY: // - Max 3 attempts with 5-second delays between retries diff --git a/internal/destregistry/testing/publisher_suite.go b/internal/destregistry/testing/publisher_suite.go index 9e852b74..ad8f3316 100644 --- a/internal/destregistry/testing/publisher_suite.go +++ b/internal/destregistry/testing/publisher_suite.go @@ -51,8 +51,8 @@ func AssertTimestampIsUnixSeconds(t TestingT, timestampStr string, msgAndArgs .. // Current time in seconds: ~1,700,000,000 (2023-2024) // Current time in millis: ~1,700,000,000,000 - minUnixSeconds := int64(946684800) // Jan 1, 2000 - maxUnixSeconds := int64(4102444800) // Jan 1, 2100 + minUnixSeconds := int64(946684800) // Jan 1, 2000 + maxUnixSeconds := int64(4102444800) // Jan 1, 2100 if timestampInt < minUnixSeconds || timestampInt > maxUnixSeconds { // Likely milliseconds - check if dividing by 1000 gives a reasonable timestamp diff --git a/internal/models/entitysuite_test.go b/internal/models/entitysuite_test.go index 5764d7b6..01756ade 100644 --- a/internal/models/entitysuite_test.go +++ b/internal/models/entitysuite_test.go @@ -221,7 +221,6 @@ func (s *EntityTestSuite) TestDeleteTenantAndAssociatedDestinations() { } } - // Helper struct for multi-destination tests type multiDestinationData struct { tenant models.Tenant @@ -636,4 +635,3 @@ func (s *EntityTestSuite) TestMultiSuiteDeleteAndMatch() { } }) } - diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 69066ee6..8ef07f15 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -298,4 +298,3 @@ func TestScheduler_Cancel(t *testing.T) { require.NoError(t, s.Cancel(ctx, id)) }) } - From bd4175610cbc7475e2ff4dcd591a5d8f78116134 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 19:33:47 +0700 Subject: [PATCH 17/20] fix: avoid panic --- internal/deliverymq/retry.go | 6 +++--- internal/deliverymq/retry_test.go | 3 ++- internal/services/api/api.go | 6 +++++- internal/services/delivery/delivery.go | 5 ++++- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index 92676442..a82ad6cb 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -12,12 +12,12 @@ import ( "github.com/hookdeck/outpost/internal/scheduler" ) -func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) scheduler.Scheduler { +func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, deploymentID string, logger *logging.Logger) (scheduler.Scheduler, error) { // Create Redis client for RSMQ ctx := context.Background() redisClient, err := redis.New(ctx, redisConfig) if err != nil { - panic(fmt.Sprintf("Failed to create Redis client for retry scheduler: %v", err)) + return nil, fmt.Errorf("failed to create Redis client for retry scheduler: %w", err) } // Create RSMQ adapter @@ -52,7 +52,7 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d return nil } - return scheduler.New("deliverymq-retry", rsmqClient, exec) + return scheduler.New("deliverymq-retry", rsmqClient, exec), nil } type RetryMessage struct { diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index ea341af0..334d4ee8 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -45,7 +45,8 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { require.NoError(t, err) // Setup retry scheduler - retryScheduler := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t)) + retryScheduler, err := deliverymq.NewRetryScheduler(s.deliveryMQ, testutil.CreateTestRedisConfig(t), "", testutil.CreateTestLogger(t)) + require.NoError(t, err) require.NoError(t, retryScheduler.Init(s.ctx)) go retryScheduler.Monitor(s.ctx) diff --git a/internal/services/api/api.go b/internal/services/api/api.go index 76d2e9bf..e8e06913 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -144,7 +144,11 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log // deliverymqRetryScheduler logger.Debug("creating delivery MQ retry scheduler") - deliverymqRetryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + if err != nil { + logger.Error("failed to create delivery MQ retry scheduler", zap.Error(err)) + return nil, err + } logger.Debug("initializing delivery MQ retry scheduler - this may perform Redis operations") if err := deliverymqRetryScheduler.Init(ctx); err != nil { logger.Error("delivery MQ retry scheduler initialization failed", diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 55cbaefc..4dabfe20 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -112,7 +112,10 @@ func NewService(ctx context.Context, return nil, err } - retryScheduler := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, logger) + if err != nil { + return nil, err + } if err := retryScheduler.Init(ctx); err != nil { return nil, err } From a2f655d2aaceaa9bce75c68e06dd6093d2927b1f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 19:38:01 +0700 Subject: [PATCH 18/20] chore: deployment validation --- internal/config/config.go | 1 + internal/config/validation.go | 31 +++++++++++++++ internal/config/validation_test.go | 63 ++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/internal/config/config.go b/internal/config/config.go index 1d5a771f..e82b9142 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -107,6 +107,7 @@ var ( ErrMissingMQs = errors.New("config validation error: message queue configuration is required") ErrMissingAESSecret = errors.New("config validation error: AES encryption secret is required") ErrInvalidPortalProxyURL = errors.New("config validation error: invalid portal proxy url") + ErrInvalidDeploymentID = errors.New("config validation error: deployment_id must contain only alphanumeric characters, hyphens, and underscores (max 64 characters)") ) func (c *Config) InitDefaults() { diff --git a/internal/config/validation.go b/internal/config/validation.go index 7d464b1a..b0a4cc2a 100644 --- a/internal/config/validation.go +++ b/internal/config/validation.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "regexp" ) // Validate checks if the configuration is valid @@ -36,6 +37,10 @@ func (c *Config) Validate(flags Flags) error { return err } + if err := c.validateDeploymentID(); err != nil { + return err + } + if err := c.OpenTelemetry.Validate(); err != nil { return err } @@ -136,3 +141,29 @@ func (c *Config) validatePortal() error { } return nil } + +// validateDeploymentID validates the deployment ID format +// Empty string is allowed (optional field) +// If provided, must contain only alphanumeric characters, hyphens, and underscores +// Maximum length is 64 characters to prevent excessive key sizes +func (c *Config) validateDeploymentID() error { + if c.DeploymentID == "" { + return nil + } + + // Check length + if len(c.DeploymentID) > 64 { + return ErrInvalidDeploymentID + } + + // Check format: alphanumeric, hyphens, and underscores only + matched, err := regexp.MatchString(`^[a-zA-Z0-9_-]+$`, c.DeploymentID) + if err != nil { + return fmt.Errorf("failed to validate deployment_id: %w", err) + } + if !matched { + return ErrInvalidDeploymentID + } + + return nil +} diff --git a/internal/config/validation_test.go b/internal/config/validation_test.go index 1fcac429..423e1fe5 100644 --- a/internal/config/validation_test.go +++ b/internal/config/validation_test.go @@ -261,6 +261,69 @@ func TestMisc(t *testing.T) { }(), wantErr: config.ErrInvalidPortalProxyURL, }, + { + name: "empty deployment id is valid", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "" + return c + }(), + wantErr: nil, + }, + { + name: "valid deployment id with alphanumeric", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "deployment123" + return c + }(), + wantErr: nil, + }, + { + name: "valid deployment id with hyphens and underscores", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "deployment_123-abc" + return c + }(), + wantErr: nil, + }, + { + name: "invalid deployment id with colon", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "deployment:123" + return c + }(), + wantErr: config.ErrInvalidDeploymentID, + }, + { + name: "invalid deployment id with asterisk", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "deployment*" + return c + }(), + wantErr: config.ErrInvalidDeploymentID, + }, + { + name: "invalid deployment id with braces", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "deployment{123}" + return c + }(), + wantErr: config.ErrInvalidDeploymentID, + }, + { + name: "invalid deployment id exceeds max length", + config: func() *config.Config { + c := validConfig() + c.DeploymentID = "a123456789012345678901234567890123456789012345678901234567890123456" + return c + }(), + wantErr: config.ErrInvalidDeploymentID, + }, } for _, tt := range tests { From 1930e893851477e5fc1fe8ea9df9cc3757cdd0e4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 19:39:44 +0700 Subject: [PATCH 19/20] chore: include deployment id on startup log --- internal/app/app.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index b65047dd..51d8cf4a 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -47,10 +47,14 @@ func run(mainContext context.Context, cfg *config.Config) error { } defer logger.Sync() - logger.Info("starting outpost", + logFields := []zap.Field{ zap.String("config_path", cfg.ConfigFilePath()), zap.String("service", cfg.MustGetService().String()), - ) + } + if cfg.DeploymentID != "" { + logFields = append(logFields, zap.String("deployment_id", cfg.DeploymentID)) + } + logger.Info("starting outpost", logFields...) if err := runMigration(mainContext, cfg, logger); err != nil { return err From 02aa491c004116a03238df4569657289118e24a5 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 7 Oct 2025 20:36:37 +0700 Subject: [PATCH 20/20] chore: remove deployment prefix --- cmd/outpost-migrate-redis/README.md | 4 ++-- .../migration/001_hash_tags/README.md | 8 ++++---- .../migration/001_hash_tags/hash_tags.go | 2 +- internal/alert/store.go | 2 +- internal/deliverymq/retry.go | 4 ++-- internal/models/entity.go | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/outpost-migrate-redis/README.md b/cmd/outpost-migrate-redis/README.md index 0988bc16..3484306b 100644 --- a/cmd/outpost-migrate-redis/README.md +++ b/cmd/outpost-migrate-redis/README.md @@ -114,8 +114,8 @@ Migrates Redis keys from legacy format to hash-tagged format for Redis Cluster c - `tenant:123:destination:abc` → `tenant:{123}:destination:abc` **Deployment Mode Note:** If you are using `DEPLOYMENT_ID` configuration, this migration is **not needed**. Deployment-scoped keys already include hash tags: -- `deployment:dp_001:tenant:{123}:tenant` (already has hash tags) -- `deployment:dp_001:tenant:{123}:destinations` (already has hash tags) +- `dp_001:tenant:{123}:tenant` (already has hash tags) +- `dp_001:tenant:{123}:destinations` (already has hash tags) See [001_hash_tags/README.md](./migration/001_hash_tags/README.md) for details. diff --git a/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md b/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md index 4a4cb218..79b847d6 100644 --- a/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md +++ b/cmd/outpost-migrate-redis/migration/001_hash_tags/README.md @@ -51,9 +51,9 @@ After verification, removes all legacy keys: If you are using the `DEPLOYMENT_ID` configuration option (or `deployment_id` in YAML), **you can skip this migration entirely**. Deployments using deployment IDs already have keys in the correct format: ``` -deployment:dp_001:tenant:{123}:tenant -deployment:dp_001:tenant:{123}:destinations -deployment:dp_001:tenant:{123}:destination:abc +dp_001:tenant:{123}:tenant +dp_001:tenant:{123}:destinations +dp_001:tenant:{123}:destination:abc ``` These keys already include hash tags `{123}` and are Redis Cluster compatible. @@ -87,4 +87,4 @@ If the output shows `0 tenants to migrate`, your deployment either: - Original keys are preserved during Apply phase for rollback safety - Migration is idempotent - can be run multiple times safely - Skips tenants that are already migrated -- Does not touch deployment-prefixed keys (`deployment:*`) \ No newline at end of file +- Does not touch deployment-prefixed keys (e.g., `dp_001:*`) \ No newline at end of file diff --git a/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go b/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go index 75dc1c9d..d4b503d6 100644 --- a/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go +++ b/cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go @@ -14,7 +14,7 @@ import ( // // NOTE: This migration only handles non-deployment-prefixed keys. // If you are using DEPLOYMENT_ID configuration, your keys already have the correct format -// with deployment prefixes (deployment:{ID}:tenant:{TENANT_ID}:*) and hash tags are already +// with deployment prefixes (dp_001:tenant:{TENANT_ID}:*) and hash tags are already // in place. In that case, this migration can be safely skipped. // // This migration is only needed for legacy deployments that: diff --git a/internal/alert/store.go b/internal/alert/store.go index 8df529cb..9aaf2424 100644 --- a/internal/alert/store.go +++ b/internal/alert/store.go @@ -65,7 +65,7 @@ func (s *redisAlertStore) deploymentPrefix() string { if s.deploymentID == "" { return "" } - return fmt.Sprintf("deployment:%s:", s.deploymentID) + return fmt.Sprintf("%s:", s.deploymentID) } func (s *redisAlertStore) getFailuresKey(destinationID string) string { diff --git a/internal/deliverymq/retry.go b/internal/deliverymq/retry.go index a82ad6cb..d99636a4 100644 --- a/internal/deliverymq/retry.go +++ b/internal/deliverymq/retry.go @@ -24,11 +24,11 @@ func NewRetryScheduler(deliverymq *DeliveryMQ, redisConfig *redis.RedisConfig, d adapter := rsmq.NewRedisAdapter(redisClient) // Construct RSMQ namespace with deployment prefix if provided - // This creates keys like: deployment:dp_001:rsmq:QUEUES, deployment:dp_001:rsmq:deliverymq-retry:Q + // This creates keys like: dp_001:rsmq:QUEUES, dp_001:rsmq:deliverymq-retry:Q // Without deployment ID: rsmq:QUEUES, rsmq:deliverymq-retry:Q namespace := "rsmq" if deploymentID != "" { - namespace = fmt.Sprintf("deployment:%s:rsmq", deploymentID) + namespace = fmt.Sprintf("%s:rsmq", deploymentID) } // Create RSMQ client with deployment-aware namespace diff --git a/internal/models/entity.go b/internal/models/entity.go index e5210924..a3c2f708 100644 --- a/internal/models/entity.go +++ b/internal/models/entity.go @@ -47,7 +47,7 @@ func (s *entityStoreImpl) deploymentPrefix() string { if s.deploymentID == "" { return "" } - return fmt.Sprintf("deployment:%s:", s.deploymentID) + return fmt.Sprintf("%s:", s.deploymentID) } // New cluster-compatible key formats with hash tags