From 63a4975082fa5c4e7268e679695802f627987ce0 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Sun, 7 Jul 2024 10:57:30 -0500 Subject: [PATCH 01/11] Initial work on distributed lock. --- cmd/fleet/serve.go | 2 + ee/server/service/calendar.go | 60 +++++++- ee/server/service/service.go | 3 + server/fleet/calendar.go | 9 ++ server/service/redis_lock/redis_lock.go | 128 ++++++++++++++++ server/service/redis_lock/redis_lock_test.go | 146 +++++++++++++++++++ 6 files changed, 343 insertions(+), 5 deletions(-) create mode 100644 server/service/redis_lock/redis_lock.go create mode 100644 server/service/redis_lock/redis_lock_test.go diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index d645f419b87..6d4b3a0989c 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -50,6 +50,7 @@ import ( "github.com/fleetdm/fleet/v4/server/pubsub" "github.com/fleetdm/fleet/v4/server/service" "github.com/fleetdm/fleet/v4/server/service/async" + "github.com/fleetdm/fleet/v4/server/service/redis_lock" "github.com/fleetdm/fleet/v4/server/service/redis_policy_set" "github.com/fleetdm/fleet/v4/server/sso" "github.com/fleetdm/fleet/v4/server/version" @@ -730,6 +731,7 @@ the way that the Fleet server works. ssoSessionStore, profileMatcher, softwareInstallStore, + redis_lock.NewLock(redisPool), ) if err != nil { initFatal(err, "initial Fleet Premium service") diff --git a/ee/server/service/calendar.go b/ee/server/service/calendar.go index eee0e84fa48..7d998c631be 100644 --- a/ee/server/service/calendar.go +++ b/ee/server/service/calendar.go @@ -11,8 +11,50 @@ import ( "sync" ) +const calendarCallbackKeyPrefix = "calendar:callback:" + func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error { + if resourceState == "sync" { + // This is a sync notification, not a real event + svc.authz.SkipAuthorization(ctx) + return nil + } + + lockValue := "1" + result, err := svc.distributedLock.AcquireLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue, 0) + if err != nil { + svc.authz.SkipAuthorization(ctx) + return ctxerr.Wrap(ctx, err, "acquire calendar lock") + } + if result == "" { + // Could not acquire lock, so we are already processing this event. In this case, we increment the lock value to indicate + // that we should re-process the event. + incrementResult, err := svc.distributedLock.Increment(ctx, calendarCallbackKeyPrefix+eventUUID, 0) + if err != nil { + return ctxerr.Wrap(ctx, err, "increment calendar lock") + } + // An increment result of 1 indicates that we acquired the lock, so we will continue processing the event. + if incrementResult != 1 { + svc.authz.SkipAuthorization(ctx) + return nil + } + } + + unlocked := false + defer func() { + if !unlocked { + ok, err := svc.distributedLock.ReleaseLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue) + if err != nil { + level.Warn(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) + } + if !ok { + // If the lock was not released, it will expire on its own. + level.Warn(svc.logger).Log("msg", "Failed to release calendar lock") + } + } + }() + appConfig, err := svc.ds.AppConfig(ctx) if err != nil { return fmt.Errorf("load app config: %w", err) @@ -25,11 +67,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann } googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0] - if resourceState == "sync" { - // This is a sync notification, not a real event - svc.authz.SkipAuthorization(ctx) - return nil - } + fmt.Printf("VICTOR callback - eventUUID: %s, channelID: %s\n", eventUUID, channelID) eventDetails, err := svc.ds.GetCalendarEventDetailsByUUID(ctx, eventUUID) if err != nil { @@ -129,5 +167,17 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann return ctxerr.Wrap(ctx, err, "create or update calendar event") } } + + // Release the lock + ok, err := svc.distributedLock.ReleaseLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue) + if err != nil { + level.Warn(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) + } + if !ok { + // TODO: Do another loop + // If the lock was not released, it will expire on its own. + level.Warn(svc.logger).Log("msg", "Failed to release calendar lock") + } + return nil } diff --git a/ee/server/service/service.go b/ee/server/service/service.go index 0a0eab68613..3a1ac66a161 100644 --- a/ee/server/service/service.go +++ b/ee/server/service/service.go @@ -28,6 +28,7 @@ type Service struct { depService *apple_mdm.DEPService profileMatcher fleet.ProfileMatcher softwareInstallStore fleet.SoftwareInstallerStore + distributedLock fleet.Lock } func NewService( @@ -42,6 +43,7 @@ func NewService( sso sso.SessionStore, profileMatcher fleet.ProfileMatcher, softwareInstallStore fleet.SoftwareInstallerStore, + distributedLock fleet.Lock, ) (*Service, error) { authorizer, err := authz.NewAuthorizer() if err != nil { @@ -61,6 +63,7 @@ func NewService( depService: apple_mdm.NewDEPService(ds, depStorage, logger), profileMatcher: profileMatcher, softwareInstallStore: softwareInstallStore, + distributedLock: distributedLock, } // Override methods that can't be easily overriden via diff --git a/server/fleet/calendar.go b/server/fleet/calendar.go index b9184f52ebf..6117336c74d 100644 --- a/server/fleet/calendar.go +++ b/server/fleet/calendar.go @@ -41,6 +41,15 @@ type UserCalendar interface { Get(event *CalendarEvent, key string) (interface{}, error) } +// Lock interface for managing distributed locks. +type Lock interface { + AcquireLock(ctx context.Context, name string, value string, expireMs uint64) (result string, err error) + ReleaseLock(ctx context.Context, name string, value string) (ok bool, err error) + // Increment increments a counter stored in Redis. Creates a counter if it doesn't exist. Returns the new value of the counter. + Increment(ctx context.Context, name string, expireMs uint64) (int64, error) + Get(ctx context.Context, name string) (*string, error) +} + type CalendarWebhookPayload struct { Timestamp time.Time `json:"timestamp"` HostID uint `json:"host_id"` diff --git a/server/service/redis_lock/redis_lock.go b/server/service/redis_lock/redis_lock.go new file mode 100644 index 00000000000..fab8f31a513 --- /dev/null +++ b/server/service/redis_lock/redis_lock.go @@ -0,0 +1,128 @@ +package redis_lock + +import ( + "context" + "fmt" + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/redis" + "github.com/fleetdm/fleet/v4/server/fleet" +) + +// This package implements a distributed lock using Redis. The lock can be used +// to prevent multiple Fleet servers from accessing a shared resource. + +const ( + defaultExpireMs = 60 * 1000 +) + +type redisLock struct { + pool fleet.RedisPool + testPrefix string // for tests, the key prefix to use to avoid conflicts +} + +func NewLock(pool fleet.RedisPool) fleet.Lock { + lock := &redisLock{ + pool: pool, + } + return fleet.Lock(lock) +} + +func (r *redisLock) AcquireLock(ctx context.Context, name string, value string, expireMs uint64) (result string, err error) { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() + + if expireMs == 0 { + expireMs = defaultExpireMs + } + + // Reference: https://redis.io/docs/latest/commands/set/ + // NX -- Only set the key if it does not already exist. + res, err := conn.Do("SET", r.testPrefix+name, value, "NX", "PX", expireMs) + if err != nil { + return "", ctxerr.Wrap(ctx, err, "redis acquire lock") + } + var ok bool + result, ok = res.(string) + if !ok { + return "", nil + } + + return result, nil +} + +func (r *redisLock) ReleaseLock(ctx context.Context, name string, value string) (ok bool, err error) { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() + + const unlockScript = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + ` + + // Reference: https://redis.io/docs/latest/commands/set/ + // Only release the lock if the value matches. + res, err := conn.Do("EVAL", unlockScript, 1, r.testPrefix+name, value) + if err != nil { + return false, ctxerr.Wrap(ctx, err, "redis release lock") + } + var result int64 + var castOk bool + if result, castOk = res.(int64); !castOk { + return false, nil + } + + return result > 0, nil +} + +func (r *redisLock) Increment(ctx context.Context, name string, expireMs uint64) (int64, error) { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() + + res, err := conn.Do("INCR", r.testPrefix+name) + if err != nil { + return 0, ctxerr.Wrap(ctx, err, "redis increment") + } + + var result int64 + var ok bool + if result, ok = res.(int64); !ok { + return 0, ctxerr.Errorf(ctx, "redis increment: unexpected result type %T", res) + } + + // A result of 1 indicates that the key was created. So we must also add an expiration to it. + if result == 1 { + if expireMs == 0 { + expireMs = defaultExpireMs + } + // Reference: https://redis.io/docs/latest/commands/pexpire/ + expireResult, err := conn.Do("PEXPIRE", r.testPrefix+name, expireMs) + if err != nil { + return 0, ctxerr.Wrap(ctx, err, "redis increment expire") + } + if expireResult != int64(1) { + return 0, ctxerr.Errorf(ctx, "redis increment expire: unexpected result %v", expireResult) + } + } + + return result, nil +} + +func (r *redisLock) Get(ctx context.Context, name string) (*string, error) { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() + + res, err := conn.Do("GET", r.testPrefix+name) + if err != nil { + return nil, ctxerr.Wrap(ctx, err, "redis get") + } + + if res == nil { + return nil, nil + } + + result := fmt.Sprintf("%s", res) + return &result, nil +} diff --git a/server/service/redis_lock/redis_lock_test.go b/server/service/redis_lock/redis_lock_test.go new file mode 100644 index 00000000000..ecdeaa083c6 --- /dev/null +++ b/server/service/redis_lock/redis_lock_test.go @@ -0,0 +1,146 @@ +package redis_lock + +import ( + "context" + "github.com/fleetdm/fleet/v4/server/datastore/redis/redistest" + "github.com/fleetdm/fleet/v4/server/fleet" + "github.com/fleetdm/fleet/v4/server/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestRedisLock(t *testing.T) { + for _, f := range []func(*testing.T, fleet.Lock){ + testRedisAcquireLock, + testRedisIncrement, + } { + t.Run(test.FunctionName(f), func(t *testing.T) { + t.Run("standalone", func(t *testing.T) { + lock := setupRedis(t, false, false) + f(t, lock) + }) + + t.Run("cluster", func(t *testing.T) { + lock := setupRedis(t, true, true) + f(t, lock) + }) + + t.Run("cluster-no-redir", func(t *testing.T) { + lock := setupRedis(t, true, false) + f(t, lock) + }) + }) + } +} + +func setupRedis(t testing.TB, cluster, redir bool) fleet.Lock { + pool := redistest.SetupRedis(t, t.Name(), cluster, redir, true) + return NewLockTest(t, pool) +} + +type TestName interface { + Name() string +} + +// NewFailingTest creates a redis policy set for failing policies to be used +// only in tests. +func NewLockTest(t TestName, pool fleet.RedisPool) fleet.Lock { + lock := &redisLock{ + pool: pool, + testPrefix: t.Name() + ":", + } + return fleet.Lock(lock) +} + +func testRedisAcquireLock(t *testing.T, lock fleet.Lock) { + ctx := context.Background() + result, err := lock.AcquireLock(ctx, "test", "1", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + // Try to acquire the same lock + result, err = lock.AcquireLock(ctx, "test", "1", 0) + assert.NoError(t, err) + assert.Equal(t, "", result) + + // Try to release the lock with a wrong value + ok, err := lock.ReleaseLock(ctx, "test", "2") + require.NoError(t, err) + assert.False(t, ok) + + // Try to release the lock with the wrong key + ok, err = lock.ReleaseLock(ctx, "bad", "1") + require.NoError(t, err) + assert.False(t, ok) + + // Try to release the lock with the correct key/value + ok, err = lock.ReleaseLock(ctx, "test", "1") + require.NoError(t, err) + assert.True(t, ok) + + // Acquire the lock again + result, err = lock.AcquireLock(ctx, "test", "1", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + // Try to set lock with expiration + var expire uint64 = 10 + result, err = lock.AcquireLock(ctx, "testE", "1", expire) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + // Try to acquire the same lock after waiting + duration := time.Duration(expire+1) * time.Millisecond + time.Sleep(duration) + result, err = lock.AcquireLock(ctx, "testE", "1", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) +} + +func testRedisIncrement(t *testing.T, lock fleet.Lock) { + ctx := context.Background() + + // Increment non-existent key + var expire uint64 = 10 + num, err := lock.Increment(ctx, "newKey", expire) + assert.NoError(t, err) + assert.Equal(t, int64(1), num) + + // Try to acquire the same lock after waiting + duration := time.Duration(expire+1) * time.Millisecond + time.Sleep(duration) + result, err := lock.AcquireLock(ctx, "newKey", "1", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + // Get non-existent key + getResult, err := lock.Get(ctx, "test") + assert.NoError(t, err) + assert.Nil(t, getResult) + + // Save non-integer value + result, err = lock.AcquireLock(ctx, "test", "foo", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + // Increment non-integer value + _, err = lock.Increment(ctx, "test", 0) + assert.Error(t, err) + + // Save an integer value + result, err = lock.AcquireLock(ctx, "testI", "1", 0) + require.NoError(t, err) + assert.Equal(t, "OK", result) + + num, err = lock.Increment(ctx, "testI", 0) + assert.NoError(t, err) + assert.Equal(t, int64(2), num) + + getResult, err = lock.Get(ctx, "testI") + assert.NoError(t, err) + require.NotNil(t, getResult) + assert.Equal(t, "2", *getResult) + +} From 483dd2300de7d857ddec4a9be7a9061a1596c0c6 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Sun, 7 Jul 2024 15:00:11 -0500 Subject: [PATCH 02/11] Service lock implementation done. --- ee/server/service/calendar.go | 237 +++++++++++++++---- server/fleet/calendar.go | 5 +- server/service/redis_lock/redis_lock.go | 55 +++-- server/service/redis_lock/redis_lock_test.go | 71 +++--- 4 files changed, 264 insertions(+), 104 deletions(-) diff --git a/ee/server/service/calendar.go b/ee/server/service/calendar.go index 7d998c631be..a763d734ff1 100644 --- a/ee/server/service/calendar.go +++ b/ee/server/service/calendar.go @@ -8,52 +8,20 @@ import ( "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/service/calendar" "github.com/go-kit/log/level" + "github.com/google/uuid" "sync" ) -const calendarCallbackKeyPrefix = "calendar:callback:" - -func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error { - - if resourceState == "sync" { - // This is a sync notification, not a real event - svc.authz.SkipAuthorization(ctx) - return nil - } +const ( + calendarLockKeyPrefix = "calendar:lock:" + calendarReservedLockKeyPrefix = "calendar:reserved:" + calendarQueueKey = "calendar:queue" +) - lockValue := "1" - result, err := svc.distributedLock.AcquireLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue, 0) - if err != nil { - svc.authz.SkipAuthorization(ctx) - return ctxerr.Wrap(ctx, err, "acquire calendar lock") - } - if result == "" { - // Could not acquire lock, so we are already processing this event. In this case, we increment the lock value to indicate - // that we should re-process the event. - incrementResult, err := svc.distributedLock.Increment(ctx, calendarCallbackKeyPrefix+eventUUID, 0) - if err != nil { - return ctxerr.Wrap(ctx, err, "increment calendar lock") - } - // An increment result of 1 indicates that we acquired the lock, so we will continue processing the event. - if incrementResult != 1 { - svc.authz.SkipAuthorization(ctx) - return nil - } - } +var asyncCalendarProcessing bool +var asyncMutex sync.Mutex - unlocked := false - defer func() { - if !unlocked { - ok, err := svc.distributedLock.ReleaseLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue) - if err != nil { - level.Warn(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) - } - if !ok { - // If the lock was not released, it will expire on its own. - level.Warn(svc.logger).Log("msg", "Failed to release calendar lock") - } - } - }() +func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error { appConfig, err := svc.ds.AppConfig(ctx) if err != nil { @@ -69,6 +37,12 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann fmt.Printf("VICTOR callback - eventUUID: %s, channelID: %s\n", eventUUID, channelID) + if resourceState == "sync" { + // This is a sync notification, not a real event + svc.authz.SkipAuthorization(ctx) + return nil + } + eventDetails, err := svc.ds.GetCalendarEventDetailsByUUID(ctx, eventUUID) if err != nil { svc.authz.SkipAuthorization(ctx) @@ -101,6 +75,50 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann return authz.ForbiddenWithInternal(fmt.Sprintf("calendar channel ID mismatch: %s != %s", savedChannelID, channelID), nil, nil, nil) } + lockValue, err := svc.getCalendarLock(ctx, eventUUID, true) + if err != nil { + return err + } + if lockValue == "" { + // We did not get a lock, so there is nothing to do here + return nil + } + + unlocked := false + defer func() { + if !unlocked { + svc.releaseCalendarLock(ctx, eventUUID, lockValue) + } + }() + + err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar) + if err != nil { + return err + } + svc.releaseCalendarLock(ctx, eventUUID, lockValue) + unlocked = true + + // Now, we need to check if there are any events in the queue that need to be re-processed. + asyncMutex.Lock() + defer asyncMutex.Unlock() + if !asyncCalendarProcessing { + eventIDs, err := svc.distributedLock.GetSet(ctx, calendarQueueKey) + if err != nil { + return ctxerr.Wrap(ctx, err, "get calendar event queue") + } + if len(eventIDs) > 0 { + asyncCalendarProcessing = true + go svc.processCalendarAsync(context.WithoutCancel(ctx), eventIDs) + } + return nil + } + + return nil +} + +func (svc *Service) processCalendarEvent(ctx context.Context, eventDetails *fleet.CalendarEventDetails, + googleCalendarIntegrationConfig *fleet.GoogleCalendarIntegration, userCalendar fleet.UserCalendar) error { + genBodyFn := func(conflict bool) (body string, ok bool, err error) { // This function is called when a new event is being created. @@ -151,7 +169,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann return calendar.GenerateCalendarEventBody(ctx, svc.ds, team.Name, host, &sync.Map{}, conflict, svc.logger), true, nil } - err = userCalendar.Configure(eventDetails.Email) + err := userCalendar.Configure(eventDetails.Email) if err != nil { return ctxerr.Wrap(ctx, err, "configure calendar") } @@ -168,16 +186,141 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann } } - // Release the lock - ok, err := svc.distributedLock.ReleaseLock(ctx, calendarCallbackKeyPrefix+eventUUID, lockValue) + return nil +} + +func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, lockValue string) { + ok, err := svc.distributedLock.ReleaseLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue) if err != nil { - level.Warn(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) + level.Error(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) } if !ok { - // TODO: Do another loop // If the lock was not released, it will expire on its own. level.Warn(svc.logger).Log("msg", "Failed to release calendar lock") } +} - return nil +func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (string, error) { + // Check if lock has been reserved, which means we can't have it. + reserved, err := svc.distributedLock.Get(ctx, calendarReservedLockKeyPrefix+eventUUID) + if err != nil { + return "", ctxerr.Wrap(ctx, err, "get calendar reserved lock") + } + if reserved != nil { + // We assume that the lock is reserved by cron, which will fully process this event. Nothing to do here. + return "", nil + } + // Try to acquire the lock + lockValue := uuid.New().String() + result, err := svc.distributedLock.AcquireLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue, 0) + if err != nil { + return "", ctxerr.Wrap(ctx, err, "acquire calendar lock") + } + if result == "" && addToQueue { + // Could not acquire lock, so we are already processing this event. In this case, we add the event to + // the queue (actually a set) to indicate that we need to re-process the event. + err = svc.distributedLock.AddToSet(ctx, calendarQueueKey, eventUUID) + if err != nil { + return "", ctxerr.Wrap(ctx, err, "add calendar event to queue") + } + + // Try to acquire the lock again in case it was released while we were adding the event to the queue. + result, err = svc.distributedLock.AcquireLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue, 0) + if err != nil { + return "", ctxerr.Wrap(ctx, err, "acquire calendar lock again") + } + + if result == "" { + // We could not acquire the lock, so we are done here. + return "", nil + } + } + return lockValue, nil +} + +func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) { + defer func() { + asyncMutex.Lock() + asyncCalendarProcessing = false + asyncMutex.Unlock() + }() + for { + if len(eventIDs) == 0 { + return + } + for _, eventUUID := range eventIDs { + if ok := svc.processCalendarEventAsync(ctx, eventUUID); !ok { + return + } + } + + // Now we check whether there are any more events in the queue. + var err error + eventIDs, err = svc.distributedLock.GetSet(ctx, calendarQueueKey) + if err != nil { + level.Error(svc.logger).Log("msg", "Failed to get calendar event queue", "err", err) + return + } + } +} + +func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID string) bool { + lockValue, err := svc.getCalendarLock(ctx, eventUUID, false) + if err != nil { + level.Error(svc.logger).Log("msg", "Failed to get calendar lock", "err", err) + return false + } + if lockValue == "" { + // We did not get a lock, so there is nothing to do here + return true + } + defer svc.releaseCalendarLock(ctx, eventUUID, lockValue) + + // Remove event from the queue so that we don't process this event again. + // Note: This item can be added back to the queue while we are processing it. + err = svc.distributedLock.RemoveFromSet(ctx, calendarQueueKey, eventUUID) + if err != nil { + level.Error(svc.logger).Log("msg", "Failed to remove calendar event from queue", "err", err) + return false + } + + appConfig, err := svc.ds.AppConfig(ctx) + if err != nil { + level.Error(svc.logger).Log("msg", "Failed to load app config", "err", err) + return false + } + + if len(appConfig.Integrations.GoogleCalendar) == 0 { + // Google Calendar integration is not configured + return true + } + googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0] + + eventDetails, err := svc.ds.GetCalendarEventDetailsByUUID(ctx, eventUUID) + if err != nil { + if fleet.IsNotFound(err) { + // We found this event when the callback initially came in. So the event may have been removed from DB since then. + return true + } + level.Error(svc.logger).Log("msg", "Failed to get calendar event details", "err", err) + return false + } + if eventDetails.TeamID == nil { + // Should not happen + level.Error(svc.logger).Log("msg", "Calendar event has no team ID", "uuid", eventUUID) + return false + } + + localConfig := &calendar.CalendarConfig{ + GoogleCalendarIntegration: *googleCalendarIntegrationConfig, + ServerURL: appConfig.ServerSettings.ServerURL, + } + userCalendar := calendar.CreateUserCalendarFromConfig(ctx, localConfig, svc.logger) + + err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar) + if err != nil { + level.Error(svc.logger).Log("msg", "Failed to process calendar event", "err", err) + return false + } + return true } diff --git a/server/fleet/calendar.go b/server/fleet/calendar.go index 6117336c74d..5521515f128 100644 --- a/server/fleet/calendar.go +++ b/server/fleet/calendar.go @@ -45,9 +45,10 @@ type UserCalendar interface { type Lock interface { AcquireLock(ctx context.Context, name string, value string, expireMs uint64) (result string, err error) ReleaseLock(ctx context.Context, name string, value string) (ok bool, err error) - // Increment increments a counter stored in Redis. Creates a counter if it doesn't exist. Returns the new value of the counter. - Increment(ctx context.Context, name string, expireMs uint64) (int64, error) Get(ctx context.Context, name string) (*string, error) + AddToSet(ctx context.Context, key string, value string) error + RemoveFromSet(ctx context.Context, key string, value string) error + GetSet(ctx context.Context, key string) ([]string, error) } type CalendarWebhookPayload struct { diff --git a/server/service/redis_lock/redis_lock.go b/server/service/redis_lock/redis_lock.go index fab8f31a513..3fff3420b4a 100644 --- a/server/service/redis_lock/redis_lock.go +++ b/server/service/redis_lock/redis_lock.go @@ -77,37 +77,48 @@ func (r *redisLock) ReleaseLock(ctx context.Context, name string, value string) return result > 0, nil } -func (r *redisLock) Increment(ctx context.Context, name string, expireMs uint64) (int64, error) { +func (r *redisLock) AddToSet(ctx context.Context, key string, value string) error { conn := redis.ConfigureDoer(r.pool, r.pool.Get()) defer conn.Close() - res, err := conn.Do("INCR", r.testPrefix+name) + // Reference: https://redis.io/docs/latest/commands/sadd/ + _, err := conn.Do("SADD", r.testPrefix+key, value) if err != nil { - return 0, ctxerr.Wrap(ctx, err, "redis increment") + return ctxerr.Wrap(ctx, err, "redis add to set") } + return nil +} - var result int64 - var ok bool - if result, ok = res.(int64); !ok { - return 0, ctxerr.Errorf(ctx, "redis increment: unexpected result type %T", res) - } +func (r *redisLock) RemoveFromSet(ctx context.Context, key string, value string) error { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() - // A result of 1 indicates that the key was created. So we must also add an expiration to it. - if result == 1 { - if expireMs == 0 { - expireMs = defaultExpireMs - } - // Reference: https://redis.io/docs/latest/commands/pexpire/ - expireResult, err := conn.Do("PEXPIRE", r.testPrefix+name, expireMs) - if err != nil { - return 0, ctxerr.Wrap(ctx, err, "redis increment expire") - } - if expireResult != int64(1) { - return 0, ctxerr.Errorf(ctx, "redis increment expire: unexpected result %v", expireResult) - } + // Reference: https://redis.io/docs/latest/commands/srem/ + _, err := conn.Do("SREM", r.testPrefix+key, value) + if err != nil { + return ctxerr.Wrap(ctx, err, "redis add to set") } + return nil +} - return result, nil +func (r *redisLock) GetSet(ctx context.Context, key string) ([]string, error) { + conn := redis.ConfigureDoer(r.pool, r.pool.Get()) + defer conn.Close() + + // Reference: https://redis.io/docs/latest/commands/smembers/ + raw, err := conn.Do("SMEMBERS", r.testPrefix+key) + if err != nil { + return nil, ctxerr.Wrap(ctx, err, "redis get set") + } + rawMembers, ok := raw.([]interface{}) + if !ok { + return nil, ctxerr.Errorf(ctx, "redis get set: unexpected result type %T", raw) + } + var members []string + for _, member := range rawMembers { + members = append(members, fmt.Sprintf("%s", member)) + } + return members, nil } func (r *redisLock) Get(ctx context.Context, name string) (*string, error) { diff --git a/server/service/redis_lock/redis_lock_test.go b/server/service/redis_lock/redis_lock_test.go index ecdeaa083c6..de496092dfd 100644 --- a/server/service/redis_lock/redis_lock_test.go +++ b/server/service/redis_lock/redis_lock_test.go @@ -14,7 +14,7 @@ import ( func TestRedisLock(t *testing.T) { for _, f := range []func(*testing.T, fleet.Lock){ testRedisAcquireLock, - testRedisIncrement, + testRedisSet, } { t.Run(test.FunctionName(f), func(t *testing.T) { t.Run("standalone", func(t *testing.T) { @@ -85,6 +85,12 @@ func testRedisAcquireLock(t *testing.T, lock fleet.Lock) { require.NoError(t, err) assert.Equal(t, "OK", result) + // Get lock + getResult, err := lock.Get(ctx, "test") + assert.NoError(t, err) + require.NotNil(t, getResult) + assert.Equal(t, "1", *getResult) + // Try to set lock with expiration var expire uint64 = 10 result, err = lock.AcquireLock(ctx, "testE", "1", expire) @@ -97,50 +103,49 @@ func testRedisAcquireLock(t *testing.T, lock fleet.Lock) { result, err = lock.AcquireLock(ctx, "testE", "1", 0) require.NoError(t, err) assert.Equal(t, "OK", result) + + // Get non-existent key + getResult, err = lock.Get(ctx, "testNonExistent") + assert.NoError(t, err) + assert.Nil(t, getResult) + } -func testRedisIncrement(t *testing.T, lock fleet.Lock) { +func testRedisSet(t *testing.T, lock fleet.Lock) { ctx := context.Background() - // Increment non-existent key - var expire uint64 = 10 - num, err := lock.Increment(ctx, "newKey", expire) + // Get a non-existent set + result, err := lock.GetSet(ctx, "missingSet") assert.NoError(t, err) - assert.Equal(t, int64(1), num) + assert.Empty(t, result) - // Try to acquire the same lock after waiting - duration := time.Duration(expire+1) * time.Millisecond - time.Sleep(duration) - result, err := lock.AcquireLock(ctx, "newKey", "1", 0) - require.NoError(t, err) - assert.Equal(t, "OK", result) - - // Get non-existent key - getResult, err := lock.Get(ctx, "test") + // Add to a set + values := []string{"foo", "bar"} + err = lock.AddToSet(ctx, "testSet", values[0]) + assert.NoError(t, err) + err = lock.AddToSet(ctx, "testSet", values[1]) assert.NoError(t, err) - assert.Nil(t, getResult) - - // Save non-integer value - result, err = lock.AcquireLock(ctx, "test", "foo", 0) - require.NoError(t, err) - assert.Equal(t, "OK", result) - // Increment non-integer value - _, err = lock.Increment(ctx, "test", 0) - assert.Error(t, err) + // Get the set + result, err = lock.GetSet(ctx, "testSet") + assert.NoError(t, err) + assert.ElementsMatch(t, values, result) - // Save an integer value - result, err = lock.AcquireLock(ctx, "testI", "1", 0) - require.NoError(t, err) - assert.Equal(t, "OK", result) + // Remove from set + err = lock.RemoveFromSet(ctx, "testSet", values[0]) + assert.NoError(t, err) - num, err = lock.Increment(ctx, "testI", 0) + // Get the set + result, err = lock.GetSet(ctx, "testSet") assert.NoError(t, err) - assert.Equal(t, int64(2), num) + assert.Equal(t, []string{values[1]}, result) - getResult, err = lock.Get(ctx, "testI") + // Remove from set + err = lock.RemoveFromSet(ctx, "testSet", values[1]) assert.NoError(t, err) - require.NotNil(t, getResult) - assert.Equal(t, "2", *getResult) + // Get the set + result, err = lock.GetSet(ctx, "testSet") + assert.NoError(t, err) + assert.Empty(t, result) } From 77044f7895d8b4eb5040a9eed25574cd9ddd3c3b Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 11:27:20 -0500 Subject: [PATCH 03/11] Fixed compile issues. --- cmd/fleet/serve.go | 6 +- ee/server/service/calendar.go | 101 ++++++++-------- ee/server/service/mdm_external_test.go | 1 + server/cron/calendar_cron.go | 108 +++++++++++++++--- server/cron/calendar_cron_test.go | 24 ++-- ...0707134035_AddUUIDToCalendarEvents_test.go | 2 +- server/service/calendar/calendar.go | 10 +- server/service/integration_enterprise_test.go | 12 +- server/service/testing_utils.go | 4 + 9 files changed, 187 insertions(+), 81 deletions(-) diff --git a/cmd/fleet/serve.go b/cmd/fleet/serve.go index 6d4b3a0989c..44bdd5fe831 100644 --- a/cmd/fleet/serve.go +++ b/cmd/fleet/serve.go @@ -692,6 +692,7 @@ the way that the Fleet server works. } var softwareInstallStore fleet.SoftwareInstallerStore + var distributedLock fleet.Lock if license.IsPremium() { profileMatcher := apple_mdm.NewProfileMatcher(redisPool) if config.S3.SoftwareInstallersBucket != "" { @@ -719,6 +720,7 @@ the way that the Fleet server works. } } + distributedLock = redis_lock.NewLock(redisPool) svc, err = eeservice.NewService( svc, ds, @@ -731,7 +733,7 @@ the way that the Fleet server works. ssoSessionStore, profileMatcher, softwareInstallStore, - redis_lock.NewLock(redisPool), + distributedLock, ) if err != nil { initFatal(err, "initial Fleet Premium service") @@ -872,7 +874,7 @@ the way that the Fleet server works. } else { config.Calendar.Periodicity = 5 * time.Minute } - return cron.NewCalendarSchedule(ctx, instanceID, ds, config.Calendar, logger) + return cron.NewCalendarSchedule(ctx, instanceID, ds, distributedLock, config.Calendar, logger) }, ); err != nil { initFatal(err, "failed to register calendar schedule") diff --git a/ee/server/service/calendar.go b/ee/server/service/calendar.go index a763d734ff1..452d3f54f20 100644 --- a/ee/server/service/calendar.go +++ b/ee/server/service/calendar.go @@ -3,19 +3,14 @@ package service import ( "context" "fmt" + "sync" + "github.com/fleetdm/fleet/v4/server/authz" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/service/calendar" "github.com/go-kit/log/level" "github.com/google/uuid" - "sync" -) - -const ( - calendarLockKeyPrefix = "calendar:lock:" - calendarReservedLockKeyPrefix = "calendar:reserved:" - calendarQueueKey = "calendar:queue" ) var asyncCalendarProcessing bool @@ -59,7 +54,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann return fmt.Errorf("calendar event %s has no team ID", eventUUID) } - localConfig := &calendar.CalendarConfig{ + localConfig := &calendar.Config{ GoogleCalendarIntegration: *googleCalendarIntegrationConfig, ServerURL: appConfig.ServerSettings.ServerURL, } @@ -75,34 +70,44 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann return authz.ForbiddenWithInternal(fmt.Sprintf("calendar channel ID mismatch: %s != %s", savedChannelID, channelID), nil, nil, nil) } - lockValue, err := svc.getCalendarLock(ctx, eventUUID, true) + lockValue, reserved, err := svc.getCalendarLock(ctx, eventUUID, true) if err != nil { return err } - if lockValue == "" { + // If lock has been reserved by cron, we will need to re-process this event in case the calendar event was changed after the cron job read it. + if lockValue == "" && !reserved { // We did not get a lock, so there is nothing to do here return nil } - unlocked := false - defer func() { - if !unlocked { - svc.releaseCalendarLock(ctx, eventUUID, lockValue) + if !reserved { + unlocked := false + defer func() { + if !unlocked { + svc.releaseCalendarLock(ctx, eventUUID, lockValue) + } + }() + + // Remove event from the queue so that we don't process this event again. + // Note: This item can be added back to the queue while we are processing it. + err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID) + if err != nil { + return ctxerr.Wrap(ctx, err, "remove calendar event from queue") } - }() - err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar) - if err != nil { - return err + err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar) + if err != nil { + return err + } + svc.releaseCalendarLock(ctx, eventUUID, lockValue) + unlocked = true } - svc.releaseCalendarLock(ctx, eventUUID, lockValue) - unlocked = true // Now, we need to check if there are any events in the queue that need to be re-processed. asyncMutex.Lock() defer asyncMutex.Unlock() if !asyncCalendarProcessing { - eventIDs, err := svc.distributedLock.GetSet(ctx, calendarQueueKey) + eventIDs, err := svc.distributedLock.GetSet(ctx, calendar.QueueKey) if err != nil { return ctxerr.Wrap(ctx, err, "get calendar event queue") } @@ -190,7 +195,7 @@ func (svc *Service) processCalendarEvent(ctx context.Context, eventDetails *flee } func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, lockValue string) { - ok, err := svc.distributedLock.ReleaseLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue) + ok, err := svc.distributedLock.ReleaseLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue) if err != nil { level.Error(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) } @@ -200,42 +205,46 @@ func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, l } } -func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (string, error) { +func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (lockValue string, reserved bool, err error) { // Check if lock has been reserved, which means we can't have it. - reserved, err := svc.distributedLock.Get(ctx, calendarReservedLockKeyPrefix+eventUUID) + reservedValue, err := svc.distributedLock.Get(ctx, calendar.ReservedLockKeyPrefix+eventUUID) if err != nil { - return "", ctxerr.Wrap(ctx, err, "get calendar reserved lock") - } - if reserved != nil { - // We assume that the lock is reserved by cron, which will fully process this event. Nothing to do here. - return "", nil - } - // Try to acquire the lock - lockValue := uuid.New().String() - result, err := svc.distributedLock.AcquireLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue, 0) - if err != nil { - return "", ctxerr.Wrap(ctx, err, "acquire calendar lock") + return "", false, ctxerr.Wrap(ctx, err, "get calendar reserved lock") + } + reserved = reservedValue != nil + if reserved && !addToQueue { + // We flag the lock as reserved. + return "", reserved, nil + } + var result string + if !reserved { + // Try to acquire the lock + lockValue = uuid.New().String() + result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) + if err != nil { + return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock") + } } - if result == "" && addToQueue { + if (result == "" || reserved) && addToQueue { // Could not acquire lock, so we are already processing this event. In this case, we add the event to // the queue (actually a set) to indicate that we need to re-process the event. - err = svc.distributedLock.AddToSet(ctx, calendarQueueKey, eventUUID) + err = svc.distributedLock.AddToSet(ctx, calendar.QueueKey, eventUUID) if err != nil { - return "", ctxerr.Wrap(ctx, err, "add calendar event to queue") + return "", false, ctxerr.Wrap(ctx, err, "add calendar event to queue") } // Try to acquire the lock again in case it was released while we were adding the event to the queue. - result, err = svc.distributedLock.AcquireLock(ctx, calendarLockKeyPrefix+eventUUID, lockValue, 0) + result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) if err != nil { - return "", ctxerr.Wrap(ctx, err, "acquire calendar lock again") + return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock again") } if result == "" { // We could not acquire the lock, so we are done here. - return "", nil + return "", reserved, nil } } - return lockValue, nil + return lockValue, false, nil } func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) { @@ -256,7 +265,7 @@ func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) // Now we check whether there are any more events in the queue. var err error - eventIDs, err = svc.distributedLock.GetSet(ctx, calendarQueueKey) + eventIDs, err = svc.distributedLock.GetSet(ctx, calendar.QueueKey) if err != nil { level.Error(svc.logger).Log("msg", "Failed to get calendar event queue", "err", err) return @@ -265,7 +274,7 @@ func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) } func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID string) bool { - lockValue, err := svc.getCalendarLock(ctx, eventUUID, false) + lockValue, _, err := svc.getCalendarLock(ctx, eventUUID, false) if err != nil { level.Error(svc.logger).Log("msg", "Failed to get calendar lock", "err", err) return false @@ -278,7 +287,7 @@ func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID str // Remove event from the queue so that we don't process this event again. // Note: This item can be added back to the queue while we are processing it. - err = svc.distributedLock.RemoveFromSet(ctx, calendarQueueKey, eventUUID) + err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID) if err != nil { level.Error(svc.logger).Log("msg", "Failed to remove calendar event from queue", "err", err) return false @@ -311,7 +320,7 @@ func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID str return false } - localConfig := &calendar.CalendarConfig{ + localConfig := &calendar.Config{ GoogleCalendarIntegration: *googleCalendarIntegrationConfig, ServerURL: appConfig.ServerSettings.ServerURL, } diff --git a/ee/server/service/mdm_external_test.go b/ee/server/service/mdm_external_test.go index 424d417ee6c..9c3c3fbeb09 100644 --- a/ee/server/service/mdm_external_test.go +++ b/ee/server/service/mdm_external_test.go @@ -82,6 +82,7 @@ func setupMockDatastorePremiumService() (*mock.Store, *eeservice.Service, contex nil, nil, nil, + nil, ) if err != nil { panic(err) diff --git a/server/cron/calendar_cron.go b/server/cron/calendar_cron.go index c1aaab2217b..81eda580186 100644 --- a/server/cron/calendar_cron.go +++ b/server/cron/calendar_cron.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/google/uuid" "slices" "sync" "time" @@ -27,6 +28,7 @@ func NewCalendarSchedule( ctx context.Context, instanceID string, ds fleet.Datastore, + distributedLock fleet.Lock, serverConfig config.CalendarConfig, logger kitlog.Logger, ) (*schedule.Schedule, error) { @@ -47,7 +49,7 @@ func NewCalendarSchedule( schedule.WithJob( "calendar_events", func(ctx context.Context) error { - return cronCalendarEvents(ctx, ds, serverConfig, logger) + return cronCalendarEvents(ctx, ds, distributedLock, serverConfig, logger) }, ), ) @@ -55,7 +57,8 @@ func NewCalendarSchedule( return s, nil } -func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, serverConfig config.CalendarConfig, logger kitlog.Logger) error { +func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, distributedLock fleet.Lock, serverConfig config.CalendarConfig, + logger kitlog.Logger) error { appConfig, err := ds.AppConfig(ctx) if err != nil { return fmt.Errorf("load app config: %w", err) @@ -78,14 +81,14 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, serverConfig co return fmt.Errorf("list teams: %w", err) } - localConfig := &calendar.CalendarConfig{ + localConfig := &calendar.Config{ CalendarConfig: serverConfig, GoogleCalendarIntegration: *googleCalendarIntegrationConfig, ServerURL: appConfig.ServerSettings.ServerURL, } for _, team := range teams { if err := cronCalendarEventsForTeam( - ctx, ds, localConfig, *team, appConfig.OrgInfo.OrgName, domain, logger, + ctx, ds, distributedLock, localConfig, *team, appConfig.OrgInfo.OrgName, domain, logger, ); err != nil { level.Info(logger).Log("msg", "events calendar cron", "team_id", team.ID, "err", err) } @@ -97,7 +100,8 @@ func cronCalendarEvents(ctx context.Context, ds fleet.Datastore, serverConfig co func cronCalendarEventsForTeam( ctx context.Context, ds fleet.Datastore, - calendarConfig *calendar.CalendarConfig, + distributedLock fleet.Lock, + calendarConfig *calendar.Config, team fleet.Team, orgName string, domain string, @@ -179,7 +183,7 @@ func cronCalendarEventsForTeam( // Process hosts that are failing calendar policies. start = time.Now() - processCalendarFailingHosts(ctx, ds, calendarConfig, orgName, failingHosts, logger) + processCalendarFailingHosts(ctx, ds, distributedLock, calendarConfig, orgName, failingHosts, logger) level.Debug(logger).Log( "msg", "failing_hosts", "took", time.Since(start), ) @@ -197,7 +201,8 @@ func cronCalendarEventsForTeam( func processCalendarFailingHosts( ctx context.Context, ds fleet.Datastore, - calendarConfig *calendar.CalendarConfig, + distributedLock fleet.Lock, + calendarConfig *calendar.Config, orgName string, hosts []fleet.HostPolicyMembershipData, logger kitlog.Logger, @@ -253,7 +258,8 @@ func processCalendarFailingHosts( switch { case err == nil && !expiredEvent: if err := processFailingHostExistingCalendarEvent( - ctx, ds, userCalendar, orgName, hostCalendarEvent, calendarEvent, host, &policyIDtoPolicy, calendarConfig, logger, + ctx, ds, distributedLock, userCalendar, orgName, hostCalendarEvent, calendarEvent, host, &policyIDtoPolicy, + calendarConfig, logger, ); err != nil { level.Info(logger).Log("msg", "process failing host existing calendar event", "err", err) continue // continue with next host @@ -303,15 +309,87 @@ func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.Ho func processFailingHostExistingCalendarEvent( ctx context.Context, ds fleet.Datastore, + distributedLock fleet.Lock, userCalendar fleet.UserCalendar, orgName string, hostCalendarEvent *fleet.HostCalendarEvent, calendarEvent *fleet.CalendarEvent, host fleet.HostPolicyMembershipData, policyIDtoPolicy *sync.Map, - calendarConfig *calendar.CalendarConfig, + calendarConfig *calendar.Config, logger kitlog.Logger, ) error { + + // Try to acquire the lock. Lock is needed to ensure calendar callback is not processed for this event at the same time. + eventUUID := calendarEvent.UUID + lockValue := uuid.New().String() + result, err := distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) + if err != nil { + return fmt.Errorf("acquire calendar lock: %w", err) + } + lockReserved := false + if result == "" { + // Lock was not acquired. We reserve the lock and try to acquire it until we do. + var timeoutMs uint64 = 2 * 60 * 1000 + result, err = distributedLock.AcquireLock(ctx, calendar.ReservedLockKeyPrefix+eventUUID, lockValue, timeoutMs) + if err != nil { + return fmt.Errorf("reserve calendar lock: %w", err) + } + if result == "" { + // Lock was not reserved. Another cron job is processing this event. This is not expected. + level.Warn(logger).Log("msg", "could not reserve calendar lock") + return nil + } + lockReserved = true + done := make(chan struct{}) + for { + // Keep trying to get the lock. + result, err = distributedLock.AcquireLock(ctx, calendar.ReservedLockKeyPrefix+eventUUID, lockValue, timeoutMs) + if err != nil { + return fmt.Errorf("reserve calendar lock: %w", err) + } + if result != "" { + break + } + time.Sleep(100 * time.Millisecond) + } + select { + case <-done: + // Lock was acquired. + case <-time.After(time.Duration(timeoutMs) * time.Millisecond): + // We couldn't acquire the lock in time. + return errors.New("could not acquire calendar lock in time") + } + } + defer func() { + // Release locks. + if lockReserved { + ok, err := distributedLock.ReleaseLock(ctx, calendar.ReservedLockKeyPrefix+eventUUID, lockValue) + if err != nil { + level.Error(logger).Log("msg", "Failed to release calendar reserve lock", "err", err) + } + if !ok { + // If the lock was not released, it will expire on its own. + level.Warn(logger).Log("msg", "Failed to release calendar reserve lock") + } + } + ok, err := distributedLock.ReleaseLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue) + if err != nil { + level.Error(logger).Log("msg", "Failed to release calendar lock", "err", err) + } + if !ok { + // If the lock was not released, it will expire on its own. + level.Warn(logger).Log("msg", "Failed to release calendar lock") + } + }() + + // Remove event from the queue so that we don't process this event again. + // Note: This item can be added back to the queue while we are processing it. + err = distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID) + if err != nil { + return fmt.Errorf("remove calendar event from queue: %w", err) + } + updatedEvent := calendarEvent updated := false now := time.Now() @@ -492,7 +570,7 @@ func addBusinessDay(date time.Time) time.Time { func removeCalendarEventsFromPassingHosts( ctx context.Context, ds fleet.Datastore, - calendarConfig *calendar.CalendarConfig, + calendarConfig *calendar.Config, hosts []fleet.HostPolicyMembershipData, logger kitlog.Logger, ) { @@ -601,9 +679,9 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k } var userCalendar fleet.UserCalendar - var calConfig *calendar.CalendarConfig + var calConfig *calendar.Config if len(appConfig.Integrations.GoogleCalendar) > 0 { - calConfig = &calendar.CalendarConfig{ + calConfig = &calendar.Config{ GoogleCalendarIntegration: *appConfig.Integrations.GoogleCalendar[0], ServerURL: appConfig.ServerSettings.ServerURL, } @@ -657,7 +735,7 @@ func cronCalendarEventsCleanup(ctx context.Context, ds fleet.Datastore, logger k func deleteAllCalendarEvents( ctx context.Context, ds fleet.Datastore, - calendarConfig *calendar.CalendarConfig, + calendarConfig *calendar.Config, teamID *uint, logger kitlog.Logger, ) error { @@ -670,7 +748,7 @@ func deleteAllCalendarEvents( } func deleteCalendarEventsInParallel( - ctx context.Context, ds fleet.Datastore, calendarConfig *calendar.CalendarConfig, calendarEvents []*fleet.CalendarEvent, + ctx context.Context, ds fleet.Datastore, calendarConfig *calendar.Config, calendarEvents []*fleet.CalendarEvent, logger kitlog.Logger, ) { if len(calendarEvents) > 0 { @@ -703,7 +781,7 @@ func deleteCalendarEventsInParallel( func cleanupTeamCalendarEvents( ctx context.Context, ds fleet.Datastore, - calendarConfig *calendar.CalendarConfig, + calendarConfig *calendar.Config, team fleet.Team, logger kitlog.Logger, ) error { diff --git a/server/cron/calendar_cron_test.go b/server/cron/calendar_cron_test.go index 85c4761e845..c4081371829 100644 --- a/server/cron/calendar_cron_test.go +++ b/server/cron/calendar_cron_test.go @@ -11,14 +11,15 @@ import ( "testing" "time" - "github.com/fleetdm/fleet/v4/server/config" - "github.com/fleetdm/fleet/v4/server/ptr" - "github.com/stretchr/testify/assert" - "github.com/fleetdm/fleet/v4/ee/server/calendar" + "github.com/fleetdm/fleet/v4/server/config" + "github.com/fleetdm/fleet/v4/server/datastore/redis/redistest" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mock" + "github.com/fleetdm/fleet/v4/server/ptr" + "github.com/fleetdm/fleet/v4/server/service/redis_lock" kitlog "github.com/go-kit/log" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -199,7 +200,8 @@ func TestEventForDifferentHost(t *testing.T) { return hcEvent, calEvent, nil } - err := cronCalendarEvents(ctx, ds, defaultCalendarConfig, logger) + pool := redistest.SetupRedis(t, t.Name(), false, false, false) + err := cronCalendarEvents(ctx, ds, redis_lock.NewLock(pool), defaultCalendarConfig, logger) require.NoError(t, err) } @@ -373,7 +375,8 @@ func TestCalendarEventsMultipleHosts(t *testing.T) { return nil, nil } - err := cronCalendarEvents(ctx, ds, defaultCalendarConfig, logger) + pool := redistest.SetupRedis(t, t.Name(), false, false, false) + err := cronCalendarEvents(ctx, ds, redis_lock.NewLock(pool), defaultCalendarConfig, logger) require.NoError(t, err) eventsMu.Lock() @@ -662,7 +665,9 @@ func TestCalendarEvents1KHosts(t *testing.T) { return nil, nil } - err := cronCalendarEvents(ctx, ds, defaultCalendarConfig, logger) + pool := redistest.SetupRedis(t, t.Name(), false, false, false) + distributedLock := redis_lock.NewLock(pool) + err := cronCalendarEvents(ctx, ds, distributedLock, defaultCalendarConfig, logger) require.NoError(t, err) createdCalendarEvents := calendar.ListGoogleMockEvents() @@ -699,7 +704,7 @@ func TestCalendarEvents1KHosts(t *testing.T) { return nil } - err = cronCalendarEvents(ctx, ds, defaultCalendarConfig, logger) + err = cronCalendarEvents(ctx, ds, distributedLock, defaultCalendarConfig, logger) require.NoError(t, err) createdCalendarEvents = calendar.ListGoogleMockEvents() @@ -941,7 +946,8 @@ func TestEventDescription(t *testing.T) { return nil, nil } - err := cronCalendarEvents(ctx, ds, defaultCalendarConfig, logger) + pool := redistest.SetupRedis(t, t.Name(), false, false, false) + err := cronCalendarEvents(ctx, ds, redis_lock.NewLock(pool), defaultCalendarConfig, logger) require.NoError(t, err) numberOfEvents := 7 diff --git a/server/datastore/mysql/migrations/tables/20240707134035_AddUUIDToCalendarEvents_test.go b/server/datastore/mysql/migrations/tables/20240707134035_AddUUIDToCalendarEvents_test.go index 851997f2510..4372053c53c 100644 --- a/server/datastore/mysql/migrations/tables/20240707134035_AddUUIDToCalendarEvents_test.go +++ b/server/datastore/mysql/migrations/tables/20240707134035_AddUUIDToCalendarEvents_test.go @@ -21,7 +21,7 @@ func TestUp_20240707134035(t *testing.T) { // Apply current migration. applyNext(t, db) - // check that it's NULL + // check that UUID is not NULL const selectUUIDStmt = `SELECT uuid FROM calendar_events WHERE id = ?` var uuid1, uuid2 string err := db.Get(&uuid1, selectUUIDStmt, event1ID) diff --git a/server/service/calendar/calendar.go b/server/service/calendar/calendar.go index 1e7eff60d0c..14f3f5f0056 100644 --- a/server/service/calendar/calendar.go +++ b/server/service/calendar/calendar.go @@ -16,13 +16,19 @@ import ( "github.com/go-kit/log/level" ) -type CalendarConfig struct { +const ( + LockKeyPrefix = "calendar:lock:" + ReservedLockKeyPrefix = "calendar:reserved:" + QueueKey = "calendar:queue" +) + +type Config struct { config.CalendarConfig fleet.GoogleCalendarIntegration ServerURL string } -func CreateUserCalendarFromConfig(ctx context.Context, config *CalendarConfig, logger kitlog.Logger) fleet.UserCalendar { +func CreateUserCalendarFromConfig(ctx context.Context, config *Config, logger kitlog.Logger) fleet.UserCalendar { googleCalendarConfig := calendar.GoogleCalendarConfig{ Context: ctx, IntegrationConfig: &config.GoogleCalendarIntegration, diff --git a/server/service/integration_enterprise_test.go b/server/service/integration_enterprise_test.go index f8fa76a59b7..132755f5aea 100644 --- a/server/service/integration_enterprise_test.go +++ b/server/service/integration_enterprise_test.go @@ -87,7 +87,7 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() { cronLog = kitlog.NewNopLogger() } calendarSchedule, err = cron.NewCalendarSchedule( - ctx, s.T().Name(), s.ds, config.CalendarConfig{Periodicity: 24 * time.Hour}, cronLog, + ctx, s.T().Name(), s.ds, nil, config.CalendarConfig{Periodicity: 24 * time.Hour}, cronLog, ) return calendarSchedule, err } @@ -10621,9 +10621,9 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { }, ) require.NoError(t, err) - team1Policy2, err := s.ds.NewTeamPolicy( + team1Policy2Calendar, err := s.ds.NewTeamPolicy( ctx, team1.ID, nil, fleet.PolicyPayload{ - Name: "team1Policy2", + Name: "team1Policy2Calendar", Query: "SELECT 2;", CalendarEventsEnabled: true, }, @@ -10674,7 +10674,7 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { host1Team1, map[uint]*bool{ team1Policy1Calendar.ID: ptr.Bool(false), - team1Policy2.ID: ptr.Bool(true), + team1Policy2Calendar.ID: ptr.Bool(true), globalPolicy.ID: nil, }, ), http.StatusOK, &distributedResp) @@ -10684,7 +10684,7 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { host2Team1, map[uint]*bool{ team1Policy1Calendar.ID: ptr.Bool(true), - team1Policy2.ID: ptr.Bool(false), + team1Policy2Calendar.ID: ptr.Bool(false), globalPolicy.ID: nil, }, ), http.StatusOK, &distributedResp) @@ -10841,7 +10841,7 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { host1Team1, map[uint]*bool{ team1Policy1Calendar.ID: ptr.Bool(true), - team1Policy2.ID: ptr.Bool(true), + team1Policy2Calendar.ID: ptr.Bool(true), globalPolicy.ID: nil, }, ), http.StatusOK, &distributedResp) diff --git a/server/service/testing_utils.go b/server/service/testing_utils.go index 73c0d495c04..d23574e842c 100644 --- a/server/service/testing_utils.go +++ b/server/service/testing_utils.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "github.com/fleetdm/fleet/v4/server/service/redis_lock" "io" "net/http" "net/http/httptest" @@ -69,6 +70,7 @@ func newTestServiceWithConfig(t *testing.T, ds fleet.Datastore, fleetConfig conf ssoStore sso.SessionStore profMatcher fleet.ProfileMatcher softwareInstallStore fleet.SoftwareInstallerStore + distributedLock fleet.Lock ) if len(opts) > 0 { if opts[0].Clock != nil { @@ -95,6 +97,7 @@ func newTestServiceWithConfig(t *testing.T, ds fleet.Datastore, fleetConfig conf if opts[0].Pool != nil { ssoStore = sso.NewSessionStore(opts[0].Pool) profMatcher = apple_mdm.NewProfileMatcher(opts[0].Pool) + distributedLock = redis_lock.NewLock(opts[0].Pool) } if opts[0].ProfileMatcher != nil { profMatcher = opts[0].ProfileMatcher @@ -194,6 +197,7 @@ func newTestServiceWithConfig(t *testing.T, ds fleet.Datastore, fleetConfig conf ssoStore, profMatcher, softwareInstallStore, + distributedLock, ) if err != nil { panic(err) From 2a81f63a5c6a4556add694ff85e4bab35e57ed00 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 13:31:50 -0500 Subject: [PATCH 04/11] Fixing null pointer in test. --- server/service/integration_enterprise_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/service/integration_enterprise_test.go b/server/service/integration_enterprise_test.go index 132755f5aea..1d4b5390d4a 100644 --- a/server/service/integration_enterprise_test.go +++ b/server/service/integration_enterprise_test.go @@ -35,6 +35,7 @@ import ( "github.com/fleetdm/fleet/v4/server/mdm" "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/pubsub" + "github.com/fleetdm/fleet/v4/server/service/redis_lock" "github.com/fleetdm/fleet/v4/server/service/schedule" "github.com/fleetdm/fleet/v4/server/test" "github.com/go-kit/log" @@ -87,7 +88,8 @@ func (s *integrationEnterpriseTestSuite) SetupSuite() { cronLog = kitlog.NewNopLogger() } calendarSchedule, err = cron.NewCalendarSchedule( - ctx, s.T().Name(), s.ds, nil, config.CalendarConfig{Periodicity: 24 * time.Hour}, cronLog, + ctx, s.T().Name(), s.ds, redis_lock.NewLock(s.redisPool), config.CalendarConfig{Periodicity: 24 * time.Hour}, + cronLog, ) return calendarSchedule, err } From f8427437071ff59f06367c6c9e3076df3580f451 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 15:02:26 -0500 Subject: [PATCH 05/11] Minor updates after manual testing. --- ee/server/service/calendar.go | 11 +++++----- tools/calendar/delete-events/delete-events.go | 20 ++++++++++++++++++- tools/calendar/move-events/move-events.go | 20 ++++++++++++++++++- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/ee/server/service/calendar.go b/ee/server/service/calendar.go index 452d3f54f20..10d08c877c3 100644 --- a/ee/server/service/calendar.go +++ b/ee/server/service/calendar.go @@ -18,6 +18,9 @@ var asyncMutex sync.Mutex func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error { + // We don't want the sender to cancel the context since we want to make sure we process the webhook. + ctx = context.WithoutCancel(ctx) + appConfig, err := svc.ds.AppConfig(ctx) if err != nil { return fmt.Errorf("load app config: %w", err) @@ -30,8 +33,6 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann } googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0] - fmt.Printf("VICTOR callback - eventUUID: %s, channelID: %s\n", eventUUID, channelID) - if resourceState == "sync" { // This is a sync notification, not a real event svc.authz.SkipAuthorization(ctx) @@ -43,9 +44,9 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann svc.authz.SkipAuthorization(ctx) if fleet.IsNotFound(err) { // We could try to stop the channel callbacks here, but that may not be secure since we don't know if the request is legitimate - level.Warn(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid", + level.Info(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid", eventUUID, "channel_id", channelID) - return err + return nil } return err } @@ -113,7 +114,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann } if len(eventIDs) > 0 { asyncCalendarProcessing = true - go svc.processCalendarAsync(context.WithoutCancel(ctx), eventIDs) + go svc.processCalendarAsync(ctx, eventIDs) } return nil } diff --git a/tools/calendar/delete-events/delete-events.go b/tools/calendar/delete-events/delete-events.go index 676da9d88f2..f3f1355548d 100644 --- a/tools/calendar/delete-events/delete-events.go +++ b/tools/calendar/delete-events/delete-events.go @@ -70,16 +70,20 @@ func main() { log.Fatalf("Unable to create Calendar service: %v", err) } numberDeleted := 0 + var maxResults int64 = 1000 + pageToken := "" + now := time.Now() for { list, err := withRetry( func() (any, error) { return service.Events.List("primary"). EventTypes("default"). - MaxResults(1000). + MaxResults(maxResults). OrderBy("startTime"). SingleEvents(true). ShowDeleted(false). Q(eventTitle). + PageToken(pageToken). Do() }, ) @@ -89,7 +93,17 @@ func main() { if len(list.(*calendar.Events).Items) == 0 { break } + foundNewEvents := false for _, item := range list.(*calendar.Events).Items { + created, err := time.Parse(time.RFC3339, item.Created) + if err != nil { + log.Fatalf("Unable to parse event created time: %v", err) + } + if created.After(now) { + // Found events created after we started deleting events, so we should stop + foundNewEvents = true + continue // Skip this event but finish the loop to make sure we don't miss something + } if item.Summary == eventTitle { _, err := withRetry( func() (any, error) { @@ -105,6 +119,10 @@ func main() { } } } + pageToken = list.(*calendar.Events).NextPageToken + if pageToken == "" || foundNewEvents { + break + } } log.Printf("DONE. Deleted %d events total for %s", numberDeleted, userEmail) }(userEmail) diff --git a/tools/calendar/move-events/move-events.go b/tools/calendar/move-events/move-events.go index 3e22032c99a..616418b1969 100644 --- a/tools/calendar/move-events/move-events.go +++ b/tools/calendar/move-events/move-events.go @@ -81,16 +81,20 @@ func main() { } numberMoved := 0 + var maxResults int64 = 1000 + pageToken := "" + now := time.Now() for { list, err := withRetry( func() (any, error) { return service.Events.List("primary").EventTypes("default"). - MaxResults(1000). + MaxResults(maxResults). OrderBy("startTime"). SingleEvents(true). ShowDeleted(false). TimeMin(dateTimeEndStr). Q(eventTitle). + PageToken(pageToken). Do() }, ) @@ -101,7 +105,17 @@ func main() { if len(list.(*calendar.Events).Items) == 0 { break } + foundNewEvents := false for _, item := range list.(*calendar.Events).Items { + created, err := time.Parse(time.RFC3339, item.Created) + if err != nil { + log.Fatalf("Unable to parse event created time: %v", err) + } + if created.After(now) { + // Found events created after we started moving events, so we should stop + foundNewEvents = true + continue // Skip this event but finish the loop to make sure we don't miss something + } if item.Summary == eventTitle { item.Start.DateTime = dateTime.Format(time.RFC3339) item.End.DateTime = dateTime.Add(30 * time.Minute).Format(time.RFC3339) @@ -120,6 +134,10 @@ func main() { } } + pageToken = list.(*calendar.Events).NextPageToken + if pageToken == "" || foundNewEvents { + break + } } log.Printf("DONE. Moved total %d events for %s", numberMoved, userEmail) }(userEmail) From ace15aba8210e6b0ad56310100a7cd99f9f27e45 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 15:21:08 -0500 Subject: [PATCH 06/11] Fix test --- server/service/integration_enterprise_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/service/integration_enterprise_test.go b/server/service/integration_enterprise_test.go index 1d4b5390d4a..a346dd2ddb6 100644 --- a/server/service/integration_enterprise_test.go +++ b/server/service/integration_enterprise_test.go @@ -10790,8 +10790,8 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { assert.NotEqual(t, event.EndTime, eventRecreated.EndTime) assert.Equal(t, 1, calendar.MockChannelsCount()) - // The previous event UUID should not work anymore - _ = s.DoRawWithHeaders("POST", "/api/v1/fleet/calendar/webhook/"+event.UUID, []byte(""), http.StatusNotFound, map[string]string{ + // The previous event UUID should not work anymore, but API returns OK because this is a common occurrence. + _ = s.DoRawWithHeaders("POST", "/api/v1/fleet/calendar/webhook/"+event.UUID, []byte(""), http.StatusOK, map[string]string{ "X-Goog-Channel-Id": details.ChannelID, "X-Goog-Resource-State": "exists", }) From 472fc478bcd0fb2f04e03c5c30bfa36352844786 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 16:00:22 -0500 Subject: [PATCH 07/11] Refactored redis implementation. --- server/service/redis_lock/redis_lock.go | 51 ++++++-------------- server/service/redis_lock/redis_lock_test.go | 6 --- server/service/testing_utils.go | 2 +- 3 files changed, 16 insertions(+), 43 deletions(-) diff --git a/server/service/redis_lock/redis_lock.go b/server/service/redis_lock/redis_lock.go index 3fff3420b4a..8c53d2cfec6 100644 --- a/server/service/redis_lock/redis_lock.go +++ b/server/service/redis_lock/redis_lock.go @@ -2,10 +2,12 @@ package redis_lock import ( "context" - "fmt" + "errors" + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" "github.com/fleetdm/fleet/v4/server/datastore/redis" "github.com/fleetdm/fleet/v4/server/fleet" + redigo "github.com/gomodule/redigo/redis" ) // This package implements a distributed lock using Redis. The lock can be used @@ -37,16 +39,10 @@ func (r *redisLock) AcquireLock(ctx context.Context, name string, value string, // Reference: https://redis.io/docs/latest/commands/set/ // NX -- Only set the key if it does not already exist. - res, err := conn.Do("SET", r.testPrefix+name, value, "NX", "PX", expireMs) - if err != nil { + result, err = redigo.String(conn.Do("SET", r.testPrefix+name, value, "NX", "PX", expireMs)) + if err != nil && !errors.Is(err, redigo.ErrNil) { return "", ctxerr.Wrap(ctx, err, "redis acquire lock") } - var ok bool - result, ok = res.(string) - if !ok { - return "", nil - } - return result, nil } @@ -64,17 +60,11 @@ func (r *redisLock) ReleaseLock(ctx context.Context, name string, value string) // Reference: https://redis.io/docs/latest/commands/set/ // Only release the lock if the value matches. - res, err := conn.Do("EVAL", unlockScript, 1, r.testPrefix+name, value) - if err != nil { + res, err := redigo.Int64(conn.Do("EVAL", unlockScript, 1, r.testPrefix+name, value)) + if err != nil && !errors.Is(err, redigo.ErrNil) { return false, ctxerr.Wrap(ctx, err, "redis release lock") } - var result int64 - var castOk bool - if result, castOk = res.(int64); !castOk { - return false, nil - } - - return result > 0, nil + return res > 0, nil } func (r *redisLock) AddToSet(ctx context.Context, key string, value string) error { @@ -106,17 +96,9 @@ func (r *redisLock) GetSet(ctx context.Context, key string) ([]string, error) { defer conn.Close() // Reference: https://redis.io/docs/latest/commands/smembers/ - raw, err := conn.Do("SMEMBERS", r.testPrefix+key) + members, err := redigo.Strings(conn.Do("SMEMBERS", r.testPrefix+key)) if err != nil { - return nil, ctxerr.Wrap(ctx, err, "redis get set") - } - rawMembers, ok := raw.([]interface{}) - if !ok { - return nil, ctxerr.Errorf(ctx, "redis get set: unexpected result type %T", raw) - } - var members []string - for _, member := range rawMembers { - members = append(members, fmt.Sprintf("%s", member)) + return nil, ctxerr.Wrap(ctx, err, "redis get set members") } return members, nil } @@ -125,15 +107,12 @@ func (r *redisLock) Get(ctx context.Context, name string) (*string, error) { conn := redis.ConfigureDoer(r.pool, r.pool.Get()) defer conn.Close() - res, err := conn.Do("GET", r.testPrefix+name) + res, err := redigo.String(conn.Do("GET", r.testPrefix+name)) + if errors.Is(err, redigo.ErrNil) { + return nil, nil + } if err != nil { return nil, ctxerr.Wrap(ctx, err, "redis get") } - - if res == nil { - return nil, nil - } - - result := fmt.Sprintf("%s", res) - return &result, nil + return &res, nil } diff --git a/server/service/redis_lock/redis_lock_test.go b/server/service/redis_lock/redis_lock_test.go index de496092dfd..5c5e9575cce 100644 --- a/server/service/redis_lock/redis_lock_test.go +++ b/server/service/redis_lock/redis_lock_test.go @@ -21,16 +21,10 @@ func TestRedisLock(t *testing.T) { lock := setupRedis(t, false, false) f(t, lock) }) - t.Run("cluster", func(t *testing.T) { lock := setupRedis(t, true, true) f(t, lock) }) - - t.Run("cluster-no-redir", func(t *testing.T) { - lock := setupRedis(t, true, false) - f(t, lock) - }) }) } } diff --git a/server/service/testing_utils.go b/server/service/testing_utils.go index d23574e842c..df5fa5c219f 100644 --- a/server/service/testing_utils.go +++ b/server/service/testing_utils.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/fleetdm/fleet/v4/server/service/redis_lock" "io" "net/http" "net/http/httptest" @@ -35,6 +34,7 @@ import ( "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/service/async" "github.com/fleetdm/fleet/v4/server/service/mock" + "github.com/fleetdm/fleet/v4/server/service/redis_lock" "github.com/fleetdm/fleet/v4/server/sso" "github.com/fleetdm/fleet/v4/server/test" kitlog "github.com/go-kit/log" From 38562ac3fa7ab84c5b269813dd3008a13dab8085 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Mon, 8 Jul 2024 16:57:49 -0500 Subject: [PATCH 08/11] Fixed minor lock-related bug in cron job. --- server/cron/calendar_cron.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/cron/calendar_cron.go b/server/cron/calendar_cron.go index 81eda580186..ad08b7fde9e 100644 --- a/server/cron/calendar_cron.go +++ b/server/cron/calendar_cron.go @@ -344,9 +344,9 @@ func processFailingHostExistingCalendarEvent( done := make(chan struct{}) for { // Keep trying to get the lock. - result, err = distributedLock.AcquireLock(ctx, calendar.ReservedLockKeyPrefix+eventUUID, lockValue, timeoutMs) + result, err = distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) if err != nil { - return fmt.Errorf("reserve calendar lock: %w", err) + return fmt.Errorf("try to acquire calendar lock: %w", err) } if result != "" { break From 52f69fac35542737342a9b9c9f790b1eac44ce33 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Tue, 9 Jul 2024 07:10:00 -0500 Subject: [PATCH 09/11] Fix bug found with WIP unit test. --- ee/server/service/calendar.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ee/server/service/calendar.go b/ee/server/service/calendar.go index 10d08c877c3..f9d1c27ba75 100644 --- a/ee/server/service/calendar.go +++ b/ee/server/service/calendar.go @@ -234,6 +234,11 @@ func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addTo return "", false, ctxerr.Wrap(ctx, err, "add calendar event to queue") } + if reserved { + // We flag the lock as reserved. + return "", reserved, nil + } + // Try to acquire the lock again in case it was released while we were adding the event to the queue. result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) if err != nil { From 5f4f5254b167b38d6715c5376937de9123b10b53 Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Tue, 9 Jul 2024 07:52:34 -0500 Subject: [PATCH 10/11] Fix another bug found with WIP unit test. --- server/cron/calendar_cron.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/server/cron/calendar_cron.go b/server/cron/calendar_cron.go index ad08b7fde9e..83c80cbf3ac 100644 --- a/server/cron/calendar_cron.go +++ b/server/cron/calendar_cron.go @@ -342,20 +342,23 @@ func processFailingHostExistingCalendarEvent( } lockReserved = true done := make(chan struct{}) - for { - // Keep trying to get the lock. - result, err = distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) - if err != nil { - return fmt.Errorf("try to acquire calendar lock: %w", err) - } - if result != "" { - break + go func() { + for { + // Keep trying to get the lock. + result, err = distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) + if err != nil || result != "" { + done <- struct{}{} + return + } + time.Sleep(100 * time.Millisecond) } - time.Sleep(100 * time.Millisecond) - } + }() select { case <-done: // Lock was acquired. + if err != nil { + return fmt.Errorf("try to acquire calendar lock: %w", err) + } case <-time.After(time.Duration(timeoutMs) * time.Millisecond): // We couldn't acquire the lock in time. return errors.New("could not acquire calendar lock in time") From f0ba1feccda2ed61eeac82eb7e5515d55727d91d Mon Sep 17 00:00:00 2001 From: Victor Lyuboslavsky Date: Tue, 9 Jul 2024 10:40:53 -0500 Subject: [PATCH 11/11] Added integration test and updated Lock interface documentation. --- server/fleet/calendar.go | 12 +- server/service/integration_enterprise_test.go | 148 +++++++++++++++++- server/service/redis_lock/redis_lock.go | 12 +- 3 files changed, 158 insertions(+), 14 deletions(-) diff --git a/server/fleet/calendar.go b/server/fleet/calendar.go index 5521515f128..f6e1a72b9a9 100644 --- a/server/fleet/calendar.go +++ b/server/fleet/calendar.go @@ -43,11 +43,17 @@ type UserCalendar interface { // Lock interface for managing distributed locks. type Lock interface { - AcquireLock(ctx context.Context, name string, value string, expireMs uint64) (result string, err error) - ReleaseLock(ctx context.Context, name string, value string) (ok bool, err error) - Get(ctx context.Context, name string) (*string, error) + // AcquireLock attempts to acquire a lock with the given key. value is the value to set for the key, which is used to release the lock. + AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) + // ReleaseLock attempts to release a lock with the given key and value. If key does not exist or value does not match, the lock is not released. + ReleaseLock(ctx context.Context, key string, value string) (ok bool, err error) + // Get retrieves the value of the given key. If the key does not exist, nil is returned. + Get(ctx context.Context, key string) (*string, error) + // AddToSet adds the value to the set identified by the given key. AddToSet(ctx context.Context, key string, value string) error + // RemoveFromSet removes the value from the set identified by the given key. RemoveFromSet(ctx context.Context, key string, value string) error + // GetSet retrieves a slice of string values from the set identified by the given key. GetSet(ctx context.Context, key string) ([]string, error) } diff --git a/server/service/integration_enterprise_test.go b/server/service/integration_enterprise_test.go index a346dd2ddb6..51624200b17 100644 --- a/server/service/integration_enterprise_test.go +++ b/server/service/integration_enterprise_test.go @@ -35,6 +35,7 @@ import ( "github.com/fleetdm/fleet/v4/server/mdm" "github.com/fleetdm/fleet/v4/server/ptr" "github.com/fleetdm/fleet/v4/server/pubsub" + commonCalendar "github.com/fleetdm/fleet/v4/server/service/calendar" "github.com/fleetdm/fleet/v4/server/service/redis_lock" "github.com/fleetdm/fleet/v4/server/service/schedule" "github.com/fleetdm/fleet/v4/server/test" @@ -10770,25 +10771,92 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { // Delete the event on the calendar calendar.ClearMockEvents() - // This callback should recreate the event + // Grab the distributed lock for this event + distributedLock := redis_lock.NewLock(s.redisPool) + lockValue := uuid.New().String() + result, err := distributedLock.AcquireLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue, 0) + require.NoError(t, err) + assert.NotEmpty(t, result) + + // This callback should put the event processing in a queue for async processing. It does not start async + // processing because it assumes another server is handling this webhook, and that server will start + // async processing. _ = s.DoRawWithHeaders("POST", "/api/v1/fleet/calendar/webhook/"+event.UUID, []byte(""), http.StatusOK, map[string]string{ "X-Goog-Channel-Id": details.ChannelID, "X-Goog-Resource-State": "exists", }) - team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + uuids, err := distributedLock.GetSet(ctx, commonCalendar.QueueKey) require.NoError(t, err) - require.Len(t, team1CalendarEvents, 1) + assert.ElementsMatch(t, []string{event.UUID}, uuids) + // The calendar should still be empty since event hasn't processed yet + assert.Zero(t, len(calendar.ListGoogleMockEvents())) + // We clear the queue + assert.NoError(t, distributedLock.RemoveFromSet(ctx, commonCalendar.QueueKey, event.UUID)) + + // We release the normal lock, but grab the reserve lock instead + ok, err := distributedLock.ReleaseLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue) + require.NoError(t, err) + assert.True(t, ok) + result, err = distributedLock.AcquireLock(ctx, commonCalendar.ReservedLockKeyPrefix+event.UUID, lockValue, 0) + require.NoError(t, err) + assert.NotEmpty(t, result) + + // This callback should put the event processing in a queue for async processing, AND start the async processing + _ = s.DoRawWithHeaders("POST", "/api/v1/fleet/calendar/webhook/"+event.UUID, []byte(""), http.StatusOK, map[string]string{ + "X-Goog-Channel-Id": details.ChannelID, + "X-Goog-Resource-State": "exists", + }) + + uuids, err = distributedLock.GetSet(ctx, commonCalendar.QueueKey) + require.NoError(t, err) + assert.ElementsMatch(t, []string{event.UUID}, uuids) + // The calendar should still be empty since event hasn't processed yet + assert.Zero(t, len(calendar.ListGoogleMockEvents())) + + // We grab the normal lock again. + lockValue2 := uuid.New().String() + result, err = distributedLock.AcquireLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue2, 0) + require.NoError(t, err) + assert.NotEmpty(t, result) + // We release the reserve lock. + ok, err = distributedLock.ReleaseLock(ctx, commonCalendar.ReservedLockKeyPrefix+event.UUID, lockValue) + require.NoError(t, err) + assert.True(t, ok) + // We release the normal lock. + ok, err = distributedLock.ReleaseLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue2) + require.NoError(t, err) + assert.True(t, ok) + + done := make(chan struct{}) + go func() { + for { + time.Sleep(100 * time.Millisecond) + team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + require.Len(t, team1CalendarEvents, 1) + if event.UUID != team1CalendarEvents[0].UUID { + done <- struct{}{} + return + } + } + }() + select { + case <-done: // All good + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for calendar event processing") + } + eventRecreated := team1CalendarEvents[0] assert.NotZero(t, eventRecreated.ID) assert.Equal(t, user1Email, eventRecreated.Email) assert.NotZero(t, eventRecreated.StartTime) assert.NotZero(t, eventRecreated.EndTime) assert.NotEmpty(t, eventRecreated.UUID) - assert.NotEqual(t, event.UUID, eventRecreated.UUID) assert.NotEqual(t, event.StartTime, eventRecreated.StartTime) assert.NotEqual(t, event.EndTime, eventRecreated.EndTime) assert.Equal(t, 1, calendar.MockChannelsCount()) + assert.Equal(t, 1, len(calendar.ListGoogleMockEvents())) // The previous event UUID should not work anymore, but API returns OK because this is a common occurrence. _ = s.DoRawWithHeaders("POST", "/api/v1/fleet/calendar/webhook/"+event.UUID, []byte(""), http.StatusOK, map[string]string{ @@ -10835,6 +10903,75 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { assert.Equal(t, eventRecreated.EndTime, eventUpdated.EndTime) assert.Equal(t, 1, calendar.MockChannelsCount()) + // Update the time of the event again + events = calendar.ListGoogleMockEvents() + require.Len(t, events, 1) + for _, e := range events { + st, err := time.Parse(time.RFC3339, e.Start.DateTime) + require.NoError(t, err) + newStartTime := st.Add(5 * time.Minute).Format(time.RFC3339) + e.Start.DateTime = newStartTime + } + + // Grab the lock + event = eventUpdated + lockValue = uuid.New().String() + result, err = distributedLock.AcquireLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue, 0) + require.NoError(t, err) + assert.NotEmpty(t, result) + + mysql.ExecAdhocSQL(t, s.ds, func(db sqlx.ExtContext) error { + // Update updated_at so the event gets updated (the event is updated regularly) + _, err := db.ExecContext(ctx, + `UPDATE calendar_events SET updated_at = DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 25 HOUR) WHERE id = ?`, event.ID) + return err + }) + + // Trigger the calendar cron async. It should wait for the lock and set reserve lock. + go triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 10*time.Second) + done = make(chan struct{}) + go func() { + for { + time.Sleep(100 * time.Millisecond) + reserveLock, err := distributedLock.Get(ctx, commonCalendar.ReservedLockKeyPrefix+event.UUID) + require.NoError(t, err) + if reserveLock != nil { + done <- struct{}{} + return + } + } + }() + select { + case <-done: // All good + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for cron to set reserve lock") + } + + // Release the normal lock + ok, err = distributedLock.ReleaseLock(ctx, commonCalendar.LockKeyPrefix+event.UUID, lockValue) + require.NoError(t, err) + assert.True(t, ok) + + // Wait for the event to update + done = make(chan struct{}) + go func() { + for { + time.Sleep(100 * time.Millisecond) + team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) + require.NoError(t, err) + if len(team1CalendarEvents) == 1 && team1CalendarEvents[0].UUID == event.UUID && + team1CalendarEvents[0].StartTime.After(event.StartTime) { + done <- struct{}{} + return + } + } + }() + select { + case <-done: // All good + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for event to update during cron") + } + // Delete the event on the calendar calendar.ClearMockEvents() @@ -10856,10 +10993,11 @@ func (s *integrationEnterpriseTestSuite) TestCalendarCallback() { }) assert.Equal(t, 0, calendar.MockChannelsCount()) + previousEvent := team1CalendarEvents[0] team1CalendarEvents, err = s.ds.ListCalendarEvents(ctx, &team1.ID) require.NoError(t, err) require.Len(t, team1CalendarEvents, 1) - assert.Equal(t, eventUpdated, team1CalendarEvents[0]) + assert.Equal(t, previousEvent, team1CalendarEvents[0]) // Trigger calendar should cleanup the events triggerAndWait(ctx, t, s.ds, s.calendarSchedule, 5*time.Second) diff --git a/server/service/redis_lock/redis_lock.go b/server/service/redis_lock/redis_lock.go index 8c53d2cfec6..b4bfbcfdbc8 100644 --- a/server/service/redis_lock/redis_lock.go +++ b/server/service/redis_lock/redis_lock.go @@ -29,7 +29,7 @@ func NewLock(pool fleet.RedisPool) fleet.Lock { return fleet.Lock(lock) } -func (r *redisLock) AcquireLock(ctx context.Context, name string, value string, expireMs uint64) (result string, err error) { +func (r *redisLock) AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) { conn := redis.ConfigureDoer(r.pool, r.pool.Get()) defer conn.Close() @@ -39,14 +39,14 @@ func (r *redisLock) AcquireLock(ctx context.Context, name string, value string, // Reference: https://redis.io/docs/latest/commands/set/ // NX -- Only set the key if it does not already exist. - result, err = redigo.String(conn.Do("SET", r.testPrefix+name, value, "NX", "PX", expireMs)) + result, err = redigo.String(conn.Do("SET", r.testPrefix+key, value, "NX", "PX", expireMs)) if err != nil && !errors.Is(err, redigo.ErrNil) { return "", ctxerr.Wrap(ctx, err, "redis acquire lock") } return result, nil } -func (r *redisLock) ReleaseLock(ctx context.Context, name string, value string) (ok bool, err error) { +func (r *redisLock) ReleaseLock(ctx context.Context, key string, value string) (ok bool, err error) { conn := redis.ConfigureDoer(r.pool, r.pool.Get()) defer conn.Close() @@ -60,7 +60,7 @@ func (r *redisLock) ReleaseLock(ctx context.Context, name string, value string) // Reference: https://redis.io/docs/latest/commands/set/ // Only release the lock if the value matches. - res, err := redigo.Int64(conn.Do("EVAL", unlockScript, 1, r.testPrefix+name, value)) + res, err := redigo.Int64(conn.Do("EVAL", unlockScript, 1, r.testPrefix+key, value)) if err != nil && !errors.Is(err, redigo.ErrNil) { return false, ctxerr.Wrap(ctx, err, "redis release lock") } @@ -103,11 +103,11 @@ func (r *redisLock) GetSet(ctx context.Context, key string) ([]string, error) { return members, nil } -func (r *redisLock) Get(ctx context.Context, name string) (*string, error) { +func (r *redisLock) Get(ctx context.Context, key string) (*string, error) { conn := redis.ConfigureDoer(r.pool, r.pool.Get()) defer conn.Close() - res, err := redigo.String(conn.Do("GET", r.testPrefix+name)) + res, err := redigo.String(conn.Do("GET", r.testPrefix+key)) if errors.Is(err, redigo.ErrNil) { return nil, nil }