From 1ed9cc81e0e93d46e8a28ceee9431fc4150fc8bc Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 5 Dec 2025 19:29:18 +0700 Subject: [PATCH] chore: remove unused service pkgs --- internal/services/api/api.go | 263 -------------------- internal/services/delivery/delivery.go | 237 ------------------ internal/services/delivery/delivery_test.go | 124 --------- internal/services/log/log.go | 207 --------------- 4 files changed, 831 deletions(-) delete mode 100644 internal/services/api/api.go delete mode 100644 internal/services/delivery/delivery.go delete mode 100644 internal/services/delivery/delivery_test.go delete mode 100644 internal/services/log/log.go diff --git a/internal/services/api/api.go b/internal/services/api/api.go deleted file mode 100644 index 730663bf..00000000 --- a/internal/services/api/api.go +++ /dev/null @@ -1,263 +0,0 @@ -package api - -import ( - "context" - "errors" - "fmt" - "net/http" - "sync" - "time" - - "github.com/hookdeck/outpost/internal/apirouter" - "github.com/hookdeck/outpost/internal/config" - "github.com/hookdeck/outpost/internal/consumer" - "github.com/hookdeck/outpost/internal/deliverymq" - "github.com/hookdeck/outpost/internal/destregistry" - destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" - "github.com/hookdeck/outpost/internal/eventtracer" - "github.com/hookdeck/outpost/internal/idempotence" - "github.com/hookdeck/outpost/internal/logging" - "github.com/hookdeck/outpost/internal/logstore" - "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/publishmq" - "github.com/hookdeck/outpost/internal/redis" - "github.com/hookdeck/outpost/internal/scheduler" - "github.com/hookdeck/outpost/internal/telemetry" - "go.uber.org/zap" -) - -type consumerOptions struct { - concurreny int -} - -type APIService struct { - cleanupFuncs []func(context.Context, *logging.LoggerWithCtx) - - registry destregistry.Registry - redisClient redis.Cmdable - server *http.Server - logger *logging.Logger - publishMQ *publishmq.PublishMQ - deliveryMQ *deliverymq.DeliveryMQ - entityStore models.EntityStore - eventHandler publishmq.EventHandler - deliverymqRetryScheduler scheduler.Scheduler - consumerOptions *consumerOptions -} - -func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, logger *logging.Logger, telemetry telemetry.Telemetry) (*APIService, error) { - wg.Add(1) - - var cleanupFuncs []func(context.Context, *logging.LoggerWithCtx) - - logger.Debug("initializing destination registry") - registry := destregistry.NewRegistry(&destregistry.Config{ - DestinationMetadataPath: cfg.Destinations.MetadataPath, - DeliveryTimeout: time.Duration(cfg.DeliveryTimeoutSeconds) * time.Second, - }, logger) - if err := destregistrydefault.RegisterDefault(registry, cfg.Destinations.ToConfig(cfg)); err != nil { - logger.Error("destination registry setup failed", zap.Error(err)) - return nil, err - } - - logger.Debug("configuring delivery message queue") - deliveryQueueConfig, err := cfg.MQs.ToQueueConfig(ctx, "deliverymq") - if err != nil { - logger.Error("delivery queue configuration failed", zap.Error(err)) - return nil, err - } - - logger.Debug("initializing delivery MQ connection", zap.String("mq_type", cfg.MQs.GetInfraType())) - deliveryMQ := deliverymq.New(deliverymq.WithQueue(deliveryQueueConfig)) - cleanupDeliveryMQ, err := deliveryMQ.Init(ctx) - if err != nil { - logger.Error("delivery MQ initialization failed", - zap.Error(err), - zap.String("mq_type", cfg.MQs.GetInfraType()), - zap.String("context", "This likely indicates message queue connectivity issues")) - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { cleanupDeliveryMQ() }) - - logger.Debug("initializing Redis client for API service") - // Create Redis client for all operations (now cluster-compatible) - redisClient, err := redis.New(ctx, cfg.Redis.ToConfig()) - if err != nil { - logger.Error("Redis client initialization failed in API service", zap.Error(err)) - return nil, err - } - - logger.Debug("configuring log store driver") - logStoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ - // ClickHouse: cfg.ClickHouse.ToConfig(), - Postgres: &cfg.PostgresURL, - }) - if err != nil { - logger.Error("log store driver configuration failed", zap.Error(err)) - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { - logStoreDriverOpts.Close() - }) - - logger.Debug("creating log store") - logStore, err := logstore.NewLogStore(ctx, logStoreDriverOpts) - if err != nil { - logger.Error("log store creation failed", zap.Error(err)) - return nil, err - } - - logger.Debug("setting up event tracer") - var eventTracer eventtracer.EventTracer - if cfg.OpenTelemetry.ToConfig() == nil { - eventTracer = eventtracer.NewNoopEventTracer() - } else { - eventTracer = eventtracer.NewEventTracer() - } - - logger.Debug("creating entity store with Redis client") - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher(cfg.AESEncryptionSecret)), - models.WithAvailableTopics(cfg.Topics), - models.WithMaxDestinationsPerTenant(cfg.MaxDestinationsPerTenant), - models.WithDeploymentID(cfg.DeploymentID), - ) - - logger.Debug("creating event handler and router") - publishIdempotence := idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second), - ) - eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, cfg.Topics, publishIdempotence) - router := apirouter.NewRouter( - apirouter.RouterConfig{ - ServiceName: cfg.OpenTelemetry.GetServiceName(), - APIKey: cfg.APIKey, - JWTSecret: cfg.APIJWTSecret, - Topics: cfg.Topics, - Registry: registry, - PortalConfig: cfg.GetPortalConfig(), - GinMode: cfg.GinMode, - }, - logger, - redisClient, - deliveryMQ, - entityStore, - logStore, - eventHandler, - telemetry, - ) - - // deliverymqRetryScheduler - logger.Debug("creating delivery MQ retry scheduler") - pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond - deliverymqRetryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger) - if err != nil { - logger.Error("failed to create delivery MQ retry scheduler", zap.Error(err)) - return nil, err - } - logger.Debug("initializing delivery MQ retry scheduler - this may perform Redis operations") - if err := deliverymqRetryScheduler.Init(ctx); err != nil { - logger.Error("delivery MQ retry scheduler initialization failed", - zap.Error(err), - zap.String("context", "This likely indicates Redis connectivity issues during actual operations")) - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { deliverymqRetryScheduler.Shutdown() }) - - logger.Debug("creating HTTP server") - httpServer := &http.Server{ - Addr: fmt.Sprintf(":%d", cfg.APIPort), - Handler: router, - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { - if err := httpServer.Shutdown(ctx); err != nil { - logger.Error("error shutting down http server", zap.Error(err)) - } - logger.Info("http server shutted down") - }) - - service := &APIService{} - service.logger = logger - service.redisClient = redisClient - service.server = httpServer - service.deliveryMQ = deliveryMQ - service.entityStore = entityStore - service.eventHandler = eventHandler - service.deliverymqRetryScheduler = deliverymqRetryScheduler - service.consumerOptions = &consumerOptions{ - concurreny: cfg.PublishMaxConcurrency, - } - service.cleanupFuncs = cleanupFuncs - if cfg.PublishMQ.GetQueueConfig() != nil { - service.publishMQ = publishmq.New(publishmq.WithQueue(cfg.PublishMQ.GetQueueConfig())) - } - - go func() { - defer wg.Done() - <-ctx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - service.Shutdown(shutdownCtx) - }() - - return service, nil -} - -func (s *APIService) Run(ctx context.Context) error { - logger := s.logger.Ctx(ctx) - logger.Info("service running", zap.String("service", "api")) - - go s.startHTTPServer(ctx) - go s.startRetrySchedulerMonitor(ctx) - if s.publishMQ != nil { - go s.startPublishMQConsumer(ctx) - } - - return nil -} - -func (s *APIService) Shutdown(ctx context.Context) { - logger := s.logger.Ctx(ctx) - logger.Info("service shutting down", zap.String("service", "api")) - for _, cleanupFunc := range s.cleanupFuncs { - cleanupFunc(ctx, &logger) - } - logger.Info("service shutdown", zap.String("service", "api")) -} - -func (s *APIService) startHTTPServer(ctx context.Context) { - logger := s.logger.Ctx(ctx) - logger.Info("http server listening", zap.String("addr", s.server.Addr)) - if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Error("error listening and serving", zap.Error(err)) - } -} - -func (s *APIService) startRetrySchedulerMonitor(ctx context.Context) { - logger := s.logger.Ctx(ctx) - logger.Info("retry scheduler monitor running") - if err := s.deliverymqRetryScheduler.Monitor(ctx); err != nil { - logger.Error("error starting retry scheduler monitor", zap.Error(err)) - return - } -} - -func (s *APIService) startPublishMQConsumer(ctx context.Context) { - logger := s.logger.Ctx(ctx) - logger.Info("publishmq consumer running") - subscription, err := s.publishMQ.Subscribe(ctx) - if err != nil { - logger.Error("error subscribing to publishmq", zap.Error(err)) - return - } - messageHandler := publishmq.NewMessageHandler(s.eventHandler) - csm := consumer.New(subscription, messageHandler, - consumer.WithName("publishmq"), - consumer.WithConcurrency(s.consumerOptions.concurreny), - ) - if err := csm.Run(ctx); !errors.Is(err, ctx.Err()) { - logger.Error("error running publishmq consumer", zap.Error(err)) - return - } -} diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go deleted file mode 100644 index 26f3eba6..00000000 --- a/internal/services/delivery/delivery.go +++ /dev/null @@ -1,237 +0,0 @@ -package delivery - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/hookdeck/outpost/internal/alert" - "github.com/hookdeck/outpost/internal/config" - "github.com/hookdeck/outpost/internal/consumer" - "github.com/hookdeck/outpost/internal/deliverymq" - "github.com/hookdeck/outpost/internal/destregistry" - destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" - "github.com/hookdeck/outpost/internal/eventtracer" - "github.com/hookdeck/outpost/internal/idempotence" - "github.com/hookdeck/outpost/internal/logging" - "github.com/hookdeck/outpost/internal/logmq" - "github.com/hookdeck/outpost/internal/logstore" - "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/redis" - "go.uber.org/zap" - _ "gocloud.dev/pubsub/mempubsub" -) - -type consumerOptions struct { - concurreny int -} - -type DeliveryService struct { - cleanupFuncs []func() - - consumerOptions *consumerOptions - Logger *logging.Logger - RedisClient redis.Cmdable - DeliveryMQ *deliverymq.DeliveryMQ - Handler consumer.MessageHandler -} - -func NewService(ctx context.Context, - wg *sync.WaitGroup, - cfg *config.Config, - logger *logging.Logger, - handler consumer.MessageHandler, -) (*DeliveryService, error) { - wg.Add(1) - - cleanupFuncs := []func(){} - - // Create Redis client for all operations (now cluster-compatible) - redisClient, err := redis.New(ctx, cfg.Redis.ToConfig()) - if err != nil { - return nil, err - } - - logmqConfig, err := cfg.MQs.ToQueueConfig(ctx, "logmq") - if err != nil { - return nil, err - } - deliverymqConfig, err := cfg.MQs.ToQueueConfig(ctx, "deliverymq") - if err != nil { - return nil, err - } - - logMQ := logmq.New(logmq.WithQueue(logmqConfig)) - cleanupLogMQ, err := logMQ.Init(ctx) - if err != nil { - return nil, err - } - cleanupFuncs = append(cleanupFuncs, cleanupLogMQ) - - deliveryMQ := deliverymq.New(deliverymq.WithQueue(deliverymqConfig)) - cleanupDeliveryMQ, err := deliveryMQ.Init(ctx) - if err != nil { - return nil, err - } - cleanupFuncs = append(cleanupFuncs, cleanupDeliveryMQ) - - if handler == nil { - registry := destregistry.NewRegistry(&destregistry.Config{ - DestinationMetadataPath: cfg.Destinations.MetadataPath, - DeliveryTimeout: time.Duration(cfg.DeliveryTimeoutSeconds) * time.Second, - }, logger) - if err := destregistrydefault.RegisterDefault(registry, cfg.Destinations.ToConfig(cfg)); err != nil { - return nil, err - } - var eventTracer eventtracer.EventTracer - if cfg.OpenTelemetry.ToConfig() == nil { - eventTracer = eventtracer.NewNoopEventTracer() - } else { - eventTracer = eventtracer.NewEventTracer() - } - entityStore := models.NewEntityStore(redisClient, - models.WithCipher(models.NewAESCipher(cfg.AESEncryptionSecret)), - models.WithAvailableTopics(cfg.Topics), - models.WithMaxDestinationsPerTenant(cfg.MaxDestinationsPerTenant), - models.WithDeploymentID(cfg.DeploymentID), - ) - - logstoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ - // ClickHouse: cfg.ClickHouse.ToConfig(), - Postgres: &cfg.PostgresURL, - }) - if err != nil { - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func() { - logstoreDriverOpts.Close() - }) - logStore, err := logstore.NewLogStore(ctx, logstoreDriverOpts) - if err != nil { - return nil, err - } - - pollBackoff := time.Duration(cfg.RetryPollBackoffMs) * time.Millisecond - retryScheduler, err := deliverymq.NewRetryScheduler(deliveryMQ, cfg.Redis.ToConfig(), cfg.DeploymentID, pollBackoff, logger) - if err != nil { - return nil, err - } - if err := retryScheduler.Init(ctx); err != nil { - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func() { - retryScheduler.Shutdown() - }) - - var alertNotifier alert.AlertNotifier - var destinationDisabler alert.DestinationDisabler - if cfg.Alert.CallbackURL != "" { - alertNotifier = alert.NewHTTPAlertNotifier(cfg.Alert.CallbackURL, alert.NotifierWithBearerToken(cfg.APIKey)) - } - if cfg.Alert.AutoDisableDestination { - destinationDisabler = newDestinationDisabler(entityStore) - } - alertMonitor := alert.NewAlertMonitor( - logger, - redisClient, - alert.WithNotifier(alertNotifier), - alert.WithDisabler(destinationDisabler), - alert.WithAutoDisableFailureCount(cfg.Alert.ConsecutiveFailureCount), - alert.WithDeploymentID(cfg.DeploymentID), - ) - - deliveryIdempotence := idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(time.Duration(cfg.DeliveryIdempotencyKeyTTL)*time.Second), - ) - - retryBackoff, retryMaxLimit := cfg.GetRetryBackoff() - - handler = deliverymq.NewMessageHandler( - logger, - logMQ, - entityStore, - logStore, - registry, - eventTracer, - retryScheduler, - retryBackoff, - retryMaxLimit, - alertMonitor, - deliveryIdempotence, - ) - } - - service := &DeliveryService{ - Logger: logger, - RedisClient: redisClient, - Handler: handler, - DeliveryMQ: deliveryMQ, - consumerOptions: &consumerOptions{ - concurreny: cfg.DeliveryMaxConcurrency, - }, - cleanupFuncs: cleanupFuncs, - } - - go func() { - defer wg.Done() - <-ctx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - service.Shutdown(shutdownCtx) - }() - - return service, nil -} - -func (s *DeliveryService) Run(ctx context.Context) error { - s.Logger.Ctx(ctx).Info("service running", zap.String("service", "delivery")) - - subscription, err := s.DeliveryMQ.Subscribe(ctx) - if err != nil { - s.Logger.Ctx(ctx).Error("failed to susbcribe to ingestion events", zap.Error(err)) - return err - } - - csm := consumer.New(subscription, s.Handler, - consumer.WithConcurrency(s.consumerOptions.concurreny), - consumer.WithName("deliverymq"), - ) - if err := csm.Run(ctx); !errors.Is(err, ctx.Err()) { - return err - } - return nil -} - -func (s *DeliveryService) Shutdown(ctx context.Context) { - logger := s.Logger.Ctx(ctx) - logger.Info("service shutting down", zap.String("service", "delivery")) - for _, cleanup := range s.cleanupFuncs { - cleanup() - } - logger.Info("service shutdown", zap.String("service", "delivery")) -} - -type destinationDisabler struct { - entityStore models.EntityStore -} - -func newDestinationDisabler(entityStore models.EntityStore) alert.DestinationDisabler { - return &destinationDisabler{ - entityStore: entityStore, - } -} - -func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - destination, err := d.entityStore.RetrieveDestination(ctx, tenantID, destinationID) - if err != nil { - return err - } - if destination == nil { - return nil - } - now := time.Now() - destination.DisabledAt = &now - return d.entityStore.UpsertDestination(ctx, *destination) -} diff --git a/internal/services/delivery/delivery_test.go b/internal/services/delivery/delivery_test.go deleted file mode 100644 index 2f7c8782..00000000 --- a/internal/services/delivery/delivery_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package delivery - -import ( - "context" - "testing" - "time" - - "github.com/hookdeck/outpost/internal/consumer" - "github.com/hookdeck/outpost/internal/deliverymq" - "github.com/hookdeck/outpost/internal/idgen" - "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/mqs" - "github.com/hookdeck/outpost/internal/util/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func setupTestDeliveryService(t *testing.T, - handler consumer.MessageHandler, - deliveryMQ *deliverymq.DeliveryMQ, -) *DeliveryService { - logger := testutil.CreateTestLogger(t) - redisClient := testutil.CreateTestRedisClient(t) - service := &DeliveryService{ - Logger: logger, - RedisClient: redisClient, - Handler: handler, - DeliveryMQ: deliveryMQ, - consumerOptions: &consumerOptions{ - concurreny: 1, - }, - } - return service -} - -func TestDeliveryService(t *testing.T) { - t.Parallel() - - t.Run("should run without error", func(t *testing.T) { - t.Parallel() - - deliveryMQ := deliverymq.New(deliverymq.WithQueue(&mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}})) - cleanup, err := deliveryMQ.Init(context.Background()) - require.Nil(t, err) - defer cleanup() - - service := setupTestDeliveryService(t, nil, deliveryMQ) - - errchan := make(chan error) - context, cancel := context.WithTimeout(context.Background(), time.Second/10) - defer cancel() - - go func() { - errchan <- service.Run(context) - }() - - err = <-errchan - - assert.Nil(t, err) - }) - - t.Run("should subscribe to ingest events", func(t *testing.T) { - t.Parallel() - - // Arrange - deliveryMQ := deliverymq.New(deliverymq.WithQueue(&mqs.QueueConfig{InMemory: &mqs.InMemoryConfig{Name: testutil.RandomString(5)}})) - cleanup, err := deliveryMQ.Init(context.Background()) - require.Nil(t, err) - defer cleanup() - - handler := new(MockEventHandler) - handler.On( - "Handle", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - mock.MatchedBy(func(i *mqs.Message) bool { return true }), - ).Return(nil) - service := setupTestDeliveryService(t, handler, deliveryMQ) - - errchan := make(chan error) - ctx, cancel := context.WithTimeout(context.Background(), time.Second/2) - defer cancel() - - go func() { - errchan <- service.Run(ctx) - }() - - // Act - time.Sleep(time.Second / 5) // wait for service to start - expectedID := idgen.Event() - deliveryMQ.Publish(ctx, models.NewDeliveryEvent(models.Event{ID: expectedID}, idgen.Destination())) - - // Assert - // wait til service has stopped - err = <-errchan - require.Nil(t, err) - - handler.AssertCalled(t, "Handle", - mock.MatchedBy(func(ctx context.Context) bool { return true }), - mock.MatchedBy(func(i interface{}) bool { - e, ok := i.(*mqs.Message) - if !ok { - return false - } - event := models.DeliveryEvent{} - if err := event.FromMessage(e); err != nil { - return false - } - return expectedID == event.Event.ID - }), - ) - }) -} - -type MockEventHandler struct { - mock.Mock -} - -var _ consumer.MessageHandler = (*MockEventHandler)(nil) - -func (h *MockEventHandler) Handle(ctx context.Context, msg *mqs.Message) error { - args := h.Called(ctx, msg) - return args.Error(0) -} diff --git a/internal/services/log/log.go b/internal/services/log/log.go deleted file mode 100644 index aae4cfb0..00000000 --- a/internal/services/log/log.go +++ /dev/null @@ -1,207 +0,0 @@ -package log - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/hookdeck/outpost/internal/config" - "github.com/hookdeck/outpost/internal/consumer" - "github.com/hookdeck/outpost/internal/logging" - "github.com/hookdeck/outpost/internal/logmq" - "github.com/hookdeck/outpost/internal/logstore" - "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/mqs" - "github.com/hookdeck/outpost/internal/redis" - "github.com/mikestefanello/batcher" - "go.uber.org/zap" -) - -type consumerOptions struct { - concurreny int -} - -type LogService struct { - cleanupFuncs []func(context.Context, *logging.LoggerWithCtx) - consumerOptions *consumerOptions - logger *logging.Logger - redisClient redis.Cmdable - logMQ *logmq.LogMQ - handler consumer.MessageHandler -} - -func NewService(ctx context.Context, - wg *sync.WaitGroup, - cfg *config.Config, - logger *logging.Logger, - handler consumer.MessageHandler, -) (*LogService, error) { - wg.Add(1) - var cleanupFuncs []func(context.Context, *logging.LoggerWithCtx) - - redisClient, err := redis.New(ctx, cfg.Redis.ToConfig()) - if err != nil { - return nil, err - } - - var eventBatcher *batcher.Batcher[*models.Event] - var deliveryBatcher *batcher.Batcher[*models.Delivery] - if handler == nil { - logstoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ - // ClickHouse: cfg.ClickHouse.ToConfig(), - Postgres: &cfg.PostgresURL, - }) - if err != nil { - return nil, err - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { - logstoreDriverOpts.Close() - }) - - logStore, err := logstore.NewLogStore(ctx, logstoreDriverOpts) - if err != nil { - return nil, err - } - - batcherCfg := batcherConfig{ - ItemCountThreshold: cfg.LogBatchSize, - DelayThreshold: time.Duration(cfg.LogBatchThresholdSeconds) * time.Second, - } - batcher, err := makeBatcher(ctx, logger, logStore, batcherCfg) - if err != nil { - return nil, err - } - - handler = logmq.NewMessageHandler(logger, &handlerBatcherImpl{batcher: batcher}) - } - cleanupFuncs = append(cleanupFuncs, func(ctx context.Context, logger *logging.LoggerWithCtx) { - if eventBatcher != nil { - eventBatcher.Shutdown() - } - if deliveryBatcher != nil { - deliveryBatcher.Shutdown() - } - }) - - logQueueConfig, err := cfg.MQs.ToQueueConfig(ctx, "logmq") - if err != nil { - return nil, err - } - - service := &LogService{} - service.logger = logger - service.redisClient = redisClient - service.logMQ = logmq.New(logmq.WithQueue(logQueueConfig)) - service.consumerOptions = &consumerOptions{ - concurreny: cfg.DeliveryMaxConcurrency, - } - service.handler = handler - service.cleanupFuncs = cleanupFuncs - - go func() { - defer wg.Done() - <-ctx.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - service.Shutdown(shutdownCtx) - }() - - return service, nil -} - -func (s *LogService) Run(ctx context.Context) error { - logger := s.logger.Ctx(ctx) - logger.Info("start service", zap.String("service", "log")) - - subscription, err := s.logMQ.Subscribe(ctx) - if err != nil { - logger.Error("failed to susbcribe to logmq", zap.Error(err)) - return err - } - - csm := consumer.New(subscription, s.handler, - consumer.WithConcurrency(s.consumerOptions.concurreny), - consumer.WithName("logmq"), - ) - if err := csm.Run(ctx); !errors.Is(err, ctx.Err()) { - logger.Error("failed to run logmq consumer", zap.Error(err)) - return err - } - return nil -} - -func (s *LogService) Shutdown(ctx context.Context) { - logger := s.logger.Ctx(ctx) - logger.Info("service shutting down", zap.String("service", "log")) - for _, cleanupFunc := range s.cleanupFuncs { - cleanupFunc(ctx, &logger) - } - logger.Info("service shutdown", zap.String("service", "log")) -} - -type batcherConfig struct { - ItemCountThreshold int - DelayThreshold time.Duration -} - -func makeBatcher(ctx context.Context, logger *logging.Logger, logStore logstore.LogStore, batcherCfg batcherConfig) (*batcher.Batcher[*mqs.Message], error) { - b, err := batcher.NewBatcher(batcher.Config[*mqs.Message]{ - GroupCountThreshold: 2, - ItemCountThreshold: batcherCfg.ItemCountThreshold, - DelayThreshold: batcherCfg.DelayThreshold, - NumGoroutines: 1, - Processor: func(_ string, msgs []*mqs.Message) { - logger := logger.Ctx(ctx) - logger.Info("processing batch", zap.Int("message_count", len(msgs))) - - nackAll := func() { - for _, msg := range msgs { - msg.Nack() - } - } - - deliveryEvents := make([]*models.DeliveryEvent, 0, len(msgs)) - for _, msg := range msgs { - deliveryEvent := models.DeliveryEvent{} - if err := deliveryEvent.FromMessage(msg); err != nil { - // TODO: consider nacking this individual message only - logger.Error("failed to parse delivery event", - zap.Error(err), - zap.String("message_id", msg.LoggableID)) - nackAll() - return - } - deliveryEvents = append(deliveryEvents, &deliveryEvent) - } - - if err := logStore.InsertManyDeliveryEvent(ctx, deliveryEvents); err != nil { - logger.Error("failed to insert delivery events", - zap.Error(err), - zap.Int("count", len(deliveryEvents))) - nackAll() - return - } - - logger.Info("batch processed successfully", zap.Int("count", len(msgs))) - - for _, msg := range msgs { - msg.Ack() - } - }, - }) - if err != nil { - logger.Ctx(ctx).Error("failed to create batcher", zap.Error(err)) - return nil, err - } - return b, nil -} - -type handlerBatcherImpl struct { - batcher *batcher.Batcher[*mqs.Message] -} - -func (b *handlerBatcherImpl) Add(ctx context.Context, msg *mqs.Message) error { - b.batcher.Add("", msg) - return nil -}