diff --git a/cmd/api/main.go b/cmd/api/main.go index ab406aa..5f95401 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -105,6 +105,8 @@ func main() { logger.Fatal("failed to init ProcessingRecoveryService", zap.Error(err)) } + cleanupSvc := services.NewCleanupService(redisStore, timeSvc) + svisor := supervisor.NewSupervisor( supervisor.WithLogger(logger), supervisor.WithMessageFetcher(messageFetcher), @@ -114,6 +116,7 @@ func main() { supervisor.WithProcessingResultsService(processingResultsSvc), supervisor.WithSchedulerService(schedulerSvc), supervisor.WithProcessingRecoveryService(processingRecoverySvc), + supervisor.WithCleanupService(cleanupSvc), ) wg.Add(1) diff --git a/pkg/lib/config.go b/pkg/lib/config.go index 1a28d49..f934ef1 100644 --- a/pkg/lib/config.go +++ b/pkg/lib/config.go @@ -30,11 +30,18 @@ type RedisConfig struct { // Supervisor queues handling settings type SupervisorConfig struct { - ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"` - ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"` + ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"` + ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"` + // interval between scheduler runs to move scheduled jobs to "ready for processing" queue SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"` // interval to move back stuck messages from processing to ready queue ProcessingRecoveryInterval time.Duration `env:"SUPERVISOR_PROCESSING_RECOVERY_INTERVAL,default=5m"` + // enables deleting done messages from the database after DoneQueueCleanupDelay + DoneQueueCleanupEnabled bool `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_ENABLED,default=false"` + // delay after which done messages are deleted from the database. Default 14 days = 336 hours + DoneQueueCleanupDelay time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_DELAY,default=336h"` + // interval between done queue cleanup runs + DoneQueueCleanupInterval time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_INTERVAL,default=60m"` } type HTTPClientConfig struct { diff --git a/pkg/services/cleanup_service.go b/pkg/services/cleanup_service.go new file mode 100644 index 0000000..43b5aab --- /dev/null +++ b/pkg/services/cleanup_service.go @@ -0,0 +1,62 @@ +package services + +import ( + "context" + "time" + + "github.com/didil/inhooks/pkg/lib" + "github.com/didil/inhooks/pkg/models" + "github.com/pkg/errors" +) + +type CleanupService interface { + CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) +} + +func NewCleanupService(redisStore RedisStore, timeSvc TimeService) CleanupService { + return &cleanupService{ + redisStore: redisStore, + timeSvc: timeSvc, + } +} + +type cleanupService struct { + redisStore RedisStore + timeSvc TimeService +} + +func (s *cleanupService) CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) { + doneQueueKey := queueKey(f.ID, sink.ID, models.QueueStatusDone) + + cutOffTimeEpoch := s.timeSvc.Now().Add(-doneQueueCleanupDelay).Unix() + mIDs, err := s.redisStore.ZRangeBelowScore(ctx, doneQueueKey, float64(cutOffTimeEpoch)) + if err != nil { + return 0, err + } + if err != nil { + return 0, errors.Wrapf(err, "failed to zrange below score") + } + if len(mIDs) == 0 { + // no messages do cleanup + return 0, nil + } + + // move message ids in chunks + chunkSize := 50 + mIDChunks := lib.ChunkSliceBy(mIDs, chunkSize) + + for i := 0; i < len(mIDChunks); i++ { + messageKeys := make([]string, 0, len(mIDChunks[i])) + for _, mId := range mIDChunks[i] { + mKey := messageKey(f.ID, sink.ID, mId) + messageKeys = append(messageKeys, mKey) + } + + err := s.redisStore.ZRemDel(ctx, doneQueueKey, mIDChunks[i], messageKeys) + if err != nil { + return 0, errors.Wrapf(err, "failed to zremdel") + } + } + + return len(mIDs), nil +} diff --git a/pkg/services/cleanup_service_test.go b/pkg/services/cleanup_service_test.go new file mode 100644 index 0000000..16143d7 --- /dev/null +++ b/pkg/services/cleanup_service_test.go @@ -0,0 +1,52 @@ +package services + +import ( + "context" + "testing" + "time" + + "github.com/didil/inhooks/pkg/models" + "github.com/didil/inhooks/pkg/testsupport/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestCleanUpServiceCleanupDoneQueue(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + redisStore := mocks.NewMockRedisStore(ctrl) + timeSvc := mocks.NewMockTimeService(ctrl) + + now := time.Date(2023, 05, 5, 8, 46, 20, 0, time.UTC) + timeSvc.EXPECT().Now().Return(now) + + ctx := context.Background() + + flowId := "flow-1" + sinkID := "sink-1" + + flow := &models.Flow{ + ID: flowId, + } + sink := &models.Sink{ + ID: sinkID, + } + + queueKey := "f:flow-1:s:sink-1:q:done" + + doneQueueCleanupDelay := 30 * time.Minute + cutoffTime := time.Date(2023, 05, 5, 8, 16, 20, 0, time.UTC) + + mIds := []string{"message-1", "message-2"} + messageKeys := []string{"f:flow-1:s:sink-1:m:message-1", "f:flow-1:s:sink-1:m:message-2"} + + redisStore.EXPECT().ZRangeBelowScore(ctx, queueKey, float64(cutoffTime.Unix())).Return(mIds, nil) + redisStore.EXPECT().ZRemDel(ctx, queueKey, mIds, messageKeys).Return(nil) + + s := NewCleanupService(redisStore, timeSvc) + count, err := s.CleanupDoneQueue(ctx, flow, sink, doneQueueCleanupDelay) + assert.NoError(t, err) + + assert.Equal(t, 2, count) +} diff --git a/pkg/services/redis_store.go b/pkg/services/redis_store.go index 73dda2c..06395f2 100644 --- a/pkg/services/redis_store.go +++ b/pkg/services/redis_store.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "strconv" "time" "github.com/pkg/errors" @@ -22,6 +23,8 @@ type RedisStore interface { ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error LRangeAll(ctx context.Context, queueKey string) ([]string, error) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error + ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) + ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error } type redisStore struct { @@ -252,3 +255,36 @@ func (s *redisStore) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey return nil } + +func (s *redisStore) ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) { + queueKeyWithPrefix := s.keyWithPrefix(queueKey) + + count, err := s.client.ZRemRangeByScore(ctx, queueKeyWithPrefix, "-inf", strconv.Itoa(maxScore)).Result() + if err != nil { + return 0, errors.Wrapf(err, "failed to zremrangebyscore. queueKey: %s", queueKeyWithPrefix) + } + + return int(count), nil +} + +func (s *redisStore) ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error { + pipe := s.client.TxPipeline() + + queueKeyWithPrefix := s.keyWithPrefix(queueKey) + pipe.ZRem(ctx, queueKeyWithPrefix, messageIDs) + + messageKeysWithPrefix := []string{} + for _, messageKey := range messageKeys { + messageKeyWithPrefix := s.keyWithPrefix(messageKey) + messageKeysWithPrefix = append(messageKeysWithPrefix, messageKeyWithPrefix) + } + + pipe.Del(ctx, messageKeysWithPrefix...) + + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/services/redis_store_test.go b/pkg/services/redis_store_test.go index 2e8c086..7c66988 100644 --- a/pkg/services/redis_store_test.go +++ b/pkg/services/redis_store_test.go @@ -441,3 +441,87 @@ func (s *RedisStoreSuite) TestLRemRPush() { s.NoError(err) s.Equal([]string{`message-4`, "message-1", "message-3"}, results) } + +func (s *RedisStoreSuite) TestZRemRangeBelowScore() { + ctx := context.Background() + prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName) + defer func() { + err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix) + s.NoError(err) + }() + + now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC) + + queueKey := "q:scheduled" + queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey) + + m1ID := "message-1" + m2ID := "message-2" + m3ID := "message-3" + m4ID := "message-4" + + _, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result() + s.NoError(err) + + count, err := s.redisStore.ZRemRangeBelowScore(ctx, queueKey, int(now.Unix())) + s.NoError(err) + + s.Equal(2, count) + + queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result() + s.NoError(err) + + s.Equal([]string{"message-2", "message-4"}, queueResults) +} + +func (s *RedisStoreSuite) TestZRemDel() { + ctx := context.Background() + prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName) + defer func() { + err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix) + s.NoError(err) + }() + + now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC) + + queueKey := "q:scheduled" + queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey) + + m1ID := "message-1" + m2ID := "message-2" + m3ID := "message-3" + m4ID := "message-4" + + messageIDs := []string{m1ID, m3ID} + messageKeys := []string{fmt.Sprintf("m:%s", m1ID), fmt.Sprintf("m:%s", m3ID)} + + _, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result() + s.NoError(err) + + _, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result() + s.NoError(err) + + err = s.redisStore.ZRemDel(ctx, queueKey, messageIDs, messageKeys) + s.NoError(err) + + queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result() + s.NoError(err) + + s.Equal([]string{"message-2", "message-4"}, queueResults) + +} diff --git a/pkg/supervisor/done.go b/pkg/supervisor/done.go new file mode 100644 index 0000000..e0ae904 --- /dev/null +++ b/pkg/supervisor/done.go @@ -0,0 +1,33 @@ +package supervisor + +import ( + "time" + + "github.com/didil/inhooks/pkg/models" + "go.uber.org/zap" +) + +func (s *Supervisor) HandleDoneQueue(f *models.Flow, sink *models.Sink) { + logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) + for { + if s.appConf.Supervisor.DoneQueueCleanupEnabled { + count, err := s.cleanupSvc.CleanupDoneQueue(s.ctx, f, sink, s.appConf.Supervisor.DoneQueueCleanupDelay) + if err != nil { + logger.Error("failed to cleanup done queue", zap.Error(err)) + } + if count > 0 { + logger.Info("done queue cleanup ok. messages removed", zap.Int("messagesCount", count)) + } + } + + // wait before next check + timer := time.NewTimer(s.appConf.Supervisor.DoneQueueCleanupInterval) + + select { + case <-s.ctx.Done(): + return + case <-timer.C: + continue + } + } +} diff --git a/pkg/supervisor/done_test.go b/pkg/supervisor/done_test.go new file mode 100644 index 0000000..7450d28 --- /dev/null +++ b/pkg/supervisor/done_test.go @@ -0,0 +1,61 @@ +package supervisor + +import ( + "context" + "testing" + "time" + + "github.com/didil/inhooks/pkg/models" + "github.com/didil/inhooks/pkg/testsupport" + "github.com/didil/inhooks/pkg/testsupport/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestSupervisor_HandleDoneQueue(t *testing.T) { + appConf, err := testsupport.InitAppConfig(context.Background()) + assert.NoError(t, err) + + appConf.Supervisor.DoneQueueCleanupInterval = 45 * time.Second + appConf.Supervisor.DoneQueueCleanupDelay = 5 * time.Hour + appConf.Supervisor.DoneQueueCleanupEnabled = true + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + flowId1 := "flow-1" + sinkID1 := "sink-1" + + sink1 := &models.Sink{ + ID: sinkID1, + } + + flow1 := &models.Flow{ + ID: flowId1, + Sinks: []*models.Sink{sink1}, + } + + cleanupSvc := mocks.NewMockCleanupService(ctrl) + + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + + s := NewSupervisor( + WithCleanupService(cleanupSvc), + WithAppConfig(appConf), + WithLogger(logger), + ) + + count := 2 + cleanupSvc.EXPECT(). + CleanupDoneQueue(gomock.Any(), flow1, sink1, appConf.Supervisor.DoneQueueCleanupDelay). + DoAndReturn(func(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) { + s.Shutdown() + + return count, nil + }) + + s.HandleDoneQueue(flow1, sink1) + +} diff --git a/pkg/supervisor/processing.go b/pkg/supervisor/processing.go index f169cac..add8abd 100644 --- a/pkg/supervisor/processing.go +++ b/pkg/supervisor/processing.go @@ -1,7 +1,6 @@ package supervisor import ( - "context" "time" "github.com/didil/inhooks/pkg/models" @@ -9,10 +8,13 @@ import ( ) // move stuck messages from processing to ready queue periodically -func (s *Supervisor) HandleProcessingQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { +func (s *Supervisor) HandleProcessingQueue(f *models.Flow, sink *models.Sink) { logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) for { - movedMessageIds, err := s.MoveProcessingToReady(ctx, f, sink) + // cache keys for twice the processing recovery interval + // this avoids the recovery process from interfering with legitimate retry attempts + ttl := 2 * s.appConf.Supervisor.ProcessingRecoveryInterval + movedMessageIds, err := s.processingRecoverySvc.MoveProcessingToReady(s.ctx, f, sink, ttl) if err != nil { logger.Error("failed to move processing to ready", zap.Error(err)) } @@ -31,15 +33,3 @@ func (s *Supervisor) HandleProcessingQueue(ctx context.Context, f *models.Flow, } } } - -func (s *Supervisor) MoveProcessingToReady(ctx context.Context, f *models.Flow, sink *models.Sink) ([]string, error) { - // cache keys for twice the processing recovery interval - // this avoids the recovery process from interfering with legitimate retry attempts - ttl := 2 * s.appConf.Supervisor.ProcessingRecoveryInterval - movedMessageIds, err := s.processingRecoverySvc.MoveProcessingToReady(ctx, f, sink, ttl) - if err != nil { - return nil, err - } - - return movedMessageIds, nil -} diff --git a/pkg/supervisor/processing_test.go b/pkg/supervisor/processing_test.go index 34e7c43..28babfc 100644 --- a/pkg/supervisor/processing_test.go +++ b/pkg/supervisor/processing_test.go @@ -3,6 +3,7 @@ package supervisor import ( "context" "testing" + "time" "github.com/didil/inhooks/pkg/models" "github.com/didil/inhooks/pkg/testsupport" @@ -12,10 +13,8 @@ import ( "go.uber.org/zap" ) -func TestSupervisorMoveProcessingToReady(t *testing.T) { - ctx := context.Background() - - appConf, err := testsupport.InitAppConfig(ctx) +func TestSupervisorHandleProcessingQueue(t *testing.T) { + appConf, err := testsupport.InitAppConfig(context.Background()) assert.NoError(t, err) appConf.Supervisor.ErrSleepTime = 0 @@ -37,7 +36,6 @@ func TestSupervisorMoveProcessingToReady(t *testing.T) { processingRecoverySvc := mocks.NewMockProcessingRecoveryService(ctrl) movedMessageIds := []string{"message-1", "message-2"} - processingRecoverySvc.EXPECT().MoveProcessingToReady(ctx, flow1, sink1, 2*appConf.Supervisor.ProcessingRecoveryInterval).Return(movedMessageIds, nil) logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -48,7 +46,12 @@ func TestSupervisorMoveProcessingToReady(t *testing.T) { WithLogger(logger), ) - messageIds, err := s.MoveProcessingToReady(ctx, flow1, sink1) - assert.NoError(t, err) - assert.Equal(t, movedMessageIds, messageIds) + processingRecoverySvc.EXPECT().MoveProcessingToReady(gomock.Any(), flow1, sink1, 2*appConf.Supervisor.ProcessingRecoveryInterval). + DoAndReturn(func(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error) { + s.Shutdown() + return movedMessageIds, nil + }). + Return(movedMessageIds, nil) + + s.HandleProcessingQueue(flow1, sink1) } diff --git a/pkg/supervisor/ready.go b/pkg/supervisor/ready.go index 4327f10..72d5f64 100644 --- a/pkg/supervisor/ready.go +++ b/pkg/supervisor/ready.go @@ -9,11 +9,11 @@ import ( "go.uber.org/zap" ) -func (s *Supervisor) HandleReadyQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { +func (s *Supervisor) HandleReadyQueue(f *models.Flow, sink *models.Sink) { logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) for { - err := s.FetchAndProcess(ctx, f, sink) + err := s.FetchAndProcess(s.ctx, f, sink) if err != nil && !errors.Is(err, context.Canceled) { logger.Error("failed to fetch and processed", zap.Error(err)) // wait before retrying diff --git a/pkg/supervisor/scheduled.go b/pkg/supervisor/scheduled.go index da9d1d5..ad4faa5 100644 --- a/pkg/supervisor/scheduled.go +++ b/pkg/supervisor/scheduled.go @@ -1,17 +1,16 @@ package supervisor import ( - "context" "time" "github.com/didil/inhooks/pkg/models" "go.uber.org/zap" ) -func (s *Supervisor) HandleScheduledQueue(ctx context.Context, f *models.Flow, sink *models.Sink) { +func (s *Supervisor) HandleScheduledQueue(f *models.Flow, sink *models.Sink) { logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) for { - err := s.MoveDueScheduled(ctx, f, sink) + err := s.schedulerSvc.MoveDueScheduled(s.ctx, f, sink) if err != nil { logger.Error("failed to move due scheduled", zap.Error(err)) } @@ -27,12 +26,3 @@ func (s *Supervisor) HandleScheduledQueue(ctx context.Context, f *models.Flow, s } } } - -func (s *Supervisor) MoveDueScheduled(ctx context.Context, f *models.Flow, sink *models.Sink) error { - err := s.schedulerSvc.MoveDueScheduled(ctx, f, sink) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/supervisor/scheduled_test.go b/pkg/supervisor/scheduled_test.go index 6829c26..4777633 100644 --- a/pkg/supervisor/scheduled_test.go +++ b/pkg/supervisor/scheduled_test.go @@ -12,10 +12,8 @@ import ( "go.uber.org/zap" ) -func TestSupervisorMoveDueScheduled(t *testing.T) { - ctx := context.Background() - - appConf, err := testsupport.InitAppConfig(ctx) +func TestSupervisor_HandleScheduledQueue(t *testing.T) { + appConf, err := testsupport.InitAppConfig(context.Background()) assert.NoError(t, err) appConf.Supervisor.ErrSleepTime = 0 @@ -37,8 +35,6 @@ func TestSupervisorMoveDueScheduled(t *testing.T) { schedulerSvc := mocks.NewMockSchedulerService(ctrl) - schedulerSvc.EXPECT().MoveDueScheduled(ctx, flow1, sink1) - logger, err := zap.NewDevelopment() assert.NoError(t, err) @@ -48,6 +44,11 @@ func TestSupervisorMoveDueScheduled(t *testing.T) { WithLogger(logger), ) - err = s.MoveDueScheduled(ctx, flow1, sink1) - assert.NoError(t, err) + schedulerSvc.EXPECT().MoveDueScheduled(gomock.Any(), flow1, sink1). + Do(func(ctx context.Context, f *models.Flow, sink *models.Sink) error { + s.Shutdown() + return nil + }) + + s.HandleScheduledQueue(flow1, sink1) } diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 5bd583e..115aa0a 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -20,6 +20,7 @@ type Supervisor struct { processingResultsSvc services.ProcessingResultsService schedulerSvc services.SchedulerService processingRecoverySvc services.ProcessingRecoveryService + cleanupSvc services.CleanupService } type SupervisorOpt func(s *Supervisor) @@ -86,6 +87,12 @@ func WithProcessingRecoveryService(processingRecoverySvc services.ProcessingReco } } +func WithCleanupService(cleanupSvc services.CleanupService) SupervisorOpt { + return func(s *Supervisor) { + s.cleanupSvc = cleanupSvc + } +} + func (s *Supervisor) Start() { wg := &sync.WaitGroup{} flows := s.inhooksConfigSvc.GetFlows() @@ -96,25 +103,31 @@ func (s *Supervisor) Start() { sink := f.Sinks[j] logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID)) - wg.Add(3) + wg.Add(4) go func() { - s.HandleProcessingQueue(s.ctx, f, sink) + s.HandleProcessingQueue(f, sink) logger.Info("processing queue handler shutdown") wg.Done() }() go func() { - s.HandleReadyQueue(s.ctx, f, sink) + s.HandleReadyQueue(f, sink) logger.Info("ready queue handler shutdown") wg.Done() }() go func() { - s.HandleScheduledQueue(s.ctx, f, sink) + s.HandleScheduledQueue(f, sink) logger.Info("scheduled queue handler shutdown") wg.Done() }() + + go func() { + s.HandleDoneQueue(f, sink) + logger.Info("done queue handler shutdown") + wg.Done() + }() } } diff --git a/pkg/testsupport/mocks/gen_mocks.sh b/pkg/testsupport/mocks/gen_mocks.sh index 2acf4b8..d7c921a 100755 --- a/pkg/testsupport/mocks/gen_mocks.sh +++ b/pkg/testsupport/mocks/gen_mocks.sh @@ -12,6 +12,7 @@ services=( "scheduler_service" "retry_calculator" "processing_recovery_service" + "cleanup_service" ) for service in ${services[@]} diff --git a/pkg/testsupport/mocks/mock_cleanup_service.go b/pkg/testsupport/mocks/mock_cleanup_service.go new file mode 100644 index 0000000..7a7ce3f --- /dev/null +++ b/pkg/testsupport/mocks/mock_cleanup_service.go @@ -0,0 +1,52 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/services/cleanup_service.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + models "github.com/didil/inhooks/pkg/models" + gomock "github.com/golang/mock/gomock" +) + +// MockCleanupService is a mock of CleanupService interface. +type MockCleanupService struct { + ctrl *gomock.Controller + recorder *MockCleanupServiceMockRecorder +} + +// MockCleanupServiceMockRecorder is the mock recorder for MockCleanupService. +type MockCleanupServiceMockRecorder struct { + mock *MockCleanupService +} + +// NewMockCleanupService creates a new mock instance. +func NewMockCleanupService(ctrl *gomock.Controller) *MockCleanupService { + mock := &MockCleanupService{ctrl: ctrl} + mock.recorder = &MockCleanupServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCleanupService) EXPECT() *MockCleanupServiceMockRecorder { + return m.recorder +} + +// CleanupDoneQueue mocks base method. +func (m *MockCleanupService) CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanupDoneQueue", ctx, f, sink, doneQueueCleanupDelay) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CleanupDoneQueue indicates an expected call of CleanupDoneQueue. +func (mr *MockCleanupServiceMockRecorder) CleanupDoneQueue(ctx, f, sink, doneQueueCleanupDelay interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupDoneQueue", reflect.TypeOf((*MockCleanupService)(nil).CleanupDoneQueue), ctx, f, sink, doneQueueCleanupDelay) +} diff --git a/pkg/testsupport/mocks/mock_redis_store.go b/pkg/testsupport/mocks/mock_redis_store.go index 2d9181f..5022bb0 100644 --- a/pkg/testsupport/mocks/mock_redis_store.go +++ b/pkg/testsupport/mocks/mock_redis_store.go @@ -194,6 +194,35 @@ func (mr *MockRedisStoreMockRecorder) ZRangeBelowScore(ctx, queueKey, score inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ZRangeBelowScore", reflect.TypeOf((*MockRedisStore)(nil).ZRangeBelowScore), ctx, queueKey, score) } +// ZRemDel mocks base method. +func (m *MockRedisStore) ZRemDel(ctx context.Context, queueKey string, messageIDs, messageKeys []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ZRemDel", ctx, queueKey, messageIDs, messageKeys) + ret0, _ := ret[0].(error) + return ret0 +} + +// ZRemDel indicates an expected call of ZRemDel. +func (mr *MockRedisStoreMockRecorder) ZRemDel(ctx, queueKey, messageIDs, messageKeys interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ZRemDel", reflect.TypeOf((*MockRedisStore)(nil).ZRemDel), ctx, queueKey, messageIDs, messageKeys) +} + +// ZRemRangeBelowScore mocks base method. +func (m *MockRedisStore) ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ZRemRangeBelowScore", ctx, queueKey, maxScore) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ZRemRangeBelowScore indicates an expected call of ZRemRangeBelowScore. +func (mr *MockRedisStoreMockRecorder) ZRemRangeBelowScore(ctx, queueKey, maxScore interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ZRemRangeBelowScore", reflect.TypeOf((*MockRedisStore)(nil).ZRemRangeBelowScore), ctx, queueKey, maxScore) +} + // ZRemRpush mocks base method. func (m *MockRedisStore) ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey, destQueueKey string) error { m.ctrl.T.Helper()