Skip to content

Commit

Permalink
feat: cleanup done queue
Browse files Browse the repository at this point in the history
  • Loading branch information
didil committed May 15, 2023
1 parent f3a9e42 commit 6a1515e
Show file tree
Hide file tree
Showing 17 changed files with 468 additions and 51 deletions.
3 changes: 3 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func main() {
logger.Fatal("failed to init ProcessingRecoveryService", zap.Error(err))
}

cleanupSvc := services.NewCleanupService(redisStore, timeSvc)

svisor := supervisor.NewSupervisor(
supervisor.WithLogger(logger),
supervisor.WithMessageFetcher(messageFetcher),
Expand All @@ -114,6 +116,7 @@ func main() {
supervisor.WithProcessingResultsService(processingResultsSvc),
supervisor.WithSchedulerService(schedulerSvc),
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
supervisor.WithCleanupService(cleanupSvc),
)

wg.Add(1)
Expand Down
11 changes: 9 additions & 2 deletions pkg/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ type RedisConfig struct {

// Supervisor queues handling settings
type SupervisorConfig struct {
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
// interval between scheduler runs to move scheduled jobs to "ready for processing" queue
SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"`
// interval to move back stuck messages from processing to ready queue
ProcessingRecoveryInterval time.Duration `env:"SUPERVISOR_PROCESSING_RECOVERY_INTERVAL,default=5m"`
// enables deleting done messages from the database after DoneQueueCleanupDelay
DoneQueueCleanupEnabled bool `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_ENABLED,default=false"`
// delay after which done messages are deleted from the database. Default 14 days = 336 hours
DoneQueueCleanupDelay time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_DELAY,default=336h"`
// interval between done queue cleanup runs
DoneQueueCleanupInterval time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_INTERVAL,default=60m"`
}

type HTTPClientConfig struct {
Expand Down
62 changes: 62 additions & 0 deletions pkg/services/cleanup_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package services

import (
"context"
"time"

"github.com/didil/inhooks/pkg/lib"
"github.com/didil/inhooks/pkg/models"
"github.com/pkg/errors"
)

type CleanupService interface {
CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error)
}

func NewCleanupService(redisStore RedisStore, timeSvc TimeService) CleanupService {
return &cleanupService{
redisStore: redisStore,
timeSvc: timeSvc,
}
}

type cleanupService struct {
redisStore RedisStore
timeSvc TimeService
}

func (s *cleanupService) CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) {
doneQueueKey := queueKey(f.ID, sink.ID, models.QueueStatusDone)

cutOffTimeEpoch := s.timeSvc.Now().Add(-doneQueueCleanupDelay).Unix()
mIDs, err := s.redisStore.ZRangeBelowScore(ctx, doneQueueKey, float64(cutOffTimeEpoch))
if err != nil {
return 0, err
}
if err != nil {
return 0, errors.Wrapf(err, "failed to zrange below score")
}
if len(mIDs) == 0 {
// no messages do cleanup
return 0, nil
}

// move message ids in chunks
chunkSize := 50
mIDChunks := lib.ChunkSliceBy(mIDs, chunkSize)

for i := 0; i < len(mIDChunks); i++ {
messageKeys := make([]string, 0, len(mIDChunks[i]))
for _, mId := range mIDChunks[i] {
mKey := messageKey(f.ID, sink.ID, mId)
messageKeys = append(messageKeys, mKey)
}

err := s.redisStore.ZRemDel(ctx, doneQueueKey, mIDChunks[i], messageKeys)
if err != nil {
return 0, errors.Wrapf(err, "failed to zremdel")
}
}

return len(mIDs), nil
}
52 changes: 52 additions & 0 deletions pkg/services/cleanup_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package services

import (
"context"
"testing"
"time"

"github.com/didil/inhooks/pkg/models"
"github.com/didil/inhooks/pkg/testsupport/mocks"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

func TestCleanUpServiceCleanupDoneQueue(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

redisStore := mocks.NewMockRedisStore(ctrl)
timeSvc := mocks.NewMockTimeService(ctrl)

now := time.Date(2023, 05, 5, 8, 46, 20, 0, time.UTC)
timeSvc.EXPECT().Now().Return(now)

ctx := context.Background()

flowId := "flow-1"
sinkID := "sink-1"

flow := &models.Flow{
ID: flowId,
}
sink := &models.Sink{
ID: sinkID,
}

queueKey := "f:flow-1:s:sink-1:q:done"

doneQueueCleanupDelay := 30 * time.Minute
cutoffTime := time.Date(2023, 05, 5, 8, 16, 20, 0, time.UTC)

mIds := []string{"message-1", "message-2"}
messageKeys := []string{"f:flow-1:s:sink-1:m:message-1", "f:flow-1:s:sink-1:m:message-2"}

redisStore.EXPECT().ZRangeBelowScore(ctx, queueKey, float64(cutoffTime.Unix())).Return(mIds, nil)
redisStore.EXPECT().ZRemDel(ctx, queueKey, mIds, messageKeys).Return(nil)

s := NewCleanupService(redisStore, timeSvc)
count, err := s.CleanupDoneQueue(ctx, flow, sink, doneQueueCleanupDelay)
assert.NoError(t, err)

assert.Equal(t, 2, count)
}
36 changes: 36 additions & 0 deletions pkg/services/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package services
import (
"context"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
Expand All @@ -22,6 +23,8 @@ type RedisStore interface {
ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error
LRangeAll(ctx context.Context, queueKey string) ([]string, error)
LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error
ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error)
ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error
}

type redisStore struct {
Expand Down Expand Up @@ -252,3 +255,36 @@ func (s *redisStore) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey

return nil
}

func (s *redisStore) ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) {
queueKeyWithPrefix := s.keyWithPrefix(queueKey)

count, err := s.client.ZRemRangeByScore(ctx, queueKeyWithPrefix, "-inf", strconv.Itoa(maxScore)).Result()
if err != nil {
return 0, errors.Wrapf(err, "failed to zremrangebyscore. queueKey: %s", queueKeyWithPrefix)
}

return int(count), nil
}

func (s *redisStore) ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error {
pipe := s.client.TxPipeline()

queueKeyWithPrefix := s.keyWithPrefix(queueKey)
pipe.ZRem(ctx, queueKeyWithPrefix, messageIDs)

messageKeysWithPrefix := []string{}
for _, messageKey := range messageKeys {
messageKeyWithPrefix := s.keyWithPrefix(messageKey)
messageKeysWithPrefix = append(messageKeysWithPrefix, messageKeyWithPrefix)
}

pipe.Del(ctx, messageKeysWithPrefix...)

_, err := pipe.Exec(ctx)
if err != nil {
return err
}

return nil
}
84 changes: 84 additions & 0 deletions pkg/services/redis_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,87 @@ func (s *RedisStoreSuite) TestLRemRPush() {
s.NoError(err)
s.Equal([]string{`message-4`, "message-1", "message-3"}, results)
}

func (s *RedisStoreSuite) TestZRemRangeBelowScore() {
ctx := context.Background()
prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName)
defer func() {
err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix)
s.NoError(err)
}()

now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC)

queueKey := "q:scheduled"
queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey)

m1ID := "message-1"
m2ID := "message-2"
m3ID := "message-3"
m4ID := "message-4"

_, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result()
s.NoError(err)

count, err := s.redisStore.ZRemRangeBelowScore(ctx, queueKey, int(now.Unix()))
s.NoError(err)

s.Equal(2, count)

queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result()
s.NoError(err)

s.Equal([]string{"message-2", "message-4"}, queueResults)
}

func (s *RedisStoreSuite) TestZRemDel() {
ctx := context.Background()
prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName)
defer func() {
err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix)
s.NoError(err)
}()

now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC)

queueKey := "q:scheduled"
queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey)

m1ID := "message-1"
m2ID := "message-2"
m3ID := "message-3"
m4ID := "message-4"

messageIDs := []string{m1ID, m3ID}
messageKeys := []string{fmt.Sprintf("m:%s", m1ID), fmt.Sprintf("m:%s", m3ID)}

_, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result()
s.NoError(err)

_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result()
s.NoError(err)

err = s.redisStore.ZRemDel(ctx, queueKey, messageIDs, messageKeys)
s.NoError(err)

queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result()
s.NoError(err)

s.Equal([]string{"message-2", "message-4"}, queueResults)

}
33 changes: 33 additions & 0 deletions pkg/supervisor/done.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package supervisor

import (
"time"

"github.com/didil/inhooks/pkg/models"
"go.uber.org/zap"
)

func (s *Supervisor) HandleDoneQueue(f *models.Flow, sink *models.Sink) {
logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID))
for {
if s.appConf.Supervisor.DoneQueueCleanupEnabled {
count, err := s.cleanupSvc.CleanupDoneQueue(s.ctx, f, sink, s.appConf.Supervisor.DoneQueueCleanupDelay)
if err != nil {
logger.Error("failed to cleanup done queue", zap.Error(err))
}
if count > 0 {
logger.Info("done queue cleanup ok. messages removed", zap.Int("messagesCount", count))
}
}

// wait before next check
timer := time.NewTimer(s.appConf.Supervisor.DoneQueueCleanupInterval)

select {
case <-s.ctx.Done():
return
case <-timer.C:
continue
}
}
}
61 changes: 61 additions & 0 deletions pkg/supervisor/done_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package supervisor

import (
"context"
"testing"
"time"

"github.com/didil/inhooks/pkg/models"
"github.com/didil/inhooks/pkg/testsupport"
"github.com/didil/inhooks/pkg/testsupport/mocks"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestSupervisor_HandleDoneQueue(t *testing.T) {
appConf, err := testsupport.InitAppConfig(context.Background())
assert.NoError(t, err)

appConf.Supervisor.DoneQueueCleanupInterval = 45 * time.Second
appConf.Supervisor.DoneQueueCleanupDelay = 5 * time.Hour
appConf.Supervisor.DoneQueueCleanupEnabled = true

ctrl := gomock.NewController(t)
defer ctrl.Finish()

flowId1 := "flow-1"
sinkID1 := "sink-1"

sink1 := &models.Sink{
ID: sinkID1,
}

flow1 := &models.Flow{
ID: flowId1,
Sinks: []*models.Sink{sink1},
}

cleanupSvc := mocks.NewMockCleanupService(ctrl)

logger, err := zap.NewDevelopment()
assert.NoError(t, err)

s := NewSupervisor(
WithCleanupService(cleanupSvc),
WithAppConfig(appConf),
WithLogger(logger),
)

count := 2
cleanupSvc.EXPECT().
CleanupDoneQueue(gomock.Any(), flow1, sink1, appConf.Supervisor.DoneQueueCleanupDelay).
DoAndReturn(func(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) {
s.Shutdown()

return count, nil
})

s.HandleDoneQueue(flow1, sink1)

}
Loading

0 comments on commit 6a1515e

Please sign in to comment.