Conversation
| if fleet.IsNotFound(err) { | ||
| // We could try to stop the channel callbacks here, but that may not be secure since we don't know if the request is legitimate | ||
| level.Warn(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid", | ||
| level.Info(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid", |
There was a problem hiding this comment.
This may occur when a calendar event is created, and another change to the calendar occurs at the same time, triggering a callback. The callback comes in before we saved the event in the DB (or before it replicated to the replica). This is OK since we don't expect the event to be immediately modified upon creation.
|
@getvictor I'm trying out the PR review bot on this. |
There was a problem hiding this comment.
PR Summary
- Introduced distributed lock mechanism for Google calendar callbacks (
cmd/fleet/serve.go,ee/server/service/calendar.go,server/cron/calendar_cron.go) - Added
distributedLockfield toServicestruct and updated initialization (ee/server/service/service.go) - Implemented Redis-based distributed lock (
server/service/redis_lock/redis_lock.go,server/service/redis_lock/redis_lock_test.go) - Updated tests to include distributed lock functionality (
ee/server/service/mdm_external_test.go,server/cron/calendar_cron_test.go) - Modified utility functions to support distributed lock (
server/service/testing_utils.go)
14 file(s) reviewed, 50 comment(s)
Edit PR Review Bot Settings
| @@ -50,6 +50,7 @@ import ( | |||
| "github.com/fleetdm/fleet/v4/server/pubsub" | |||
There was a problem hiding this comment.
ℹ️ info: Imported the redis_lock package to support distributed locking.
| @@ -691,6 +692,7 @@ the way that the Fleet server works. | |||
| } | |||
There was a problem hiding this comment.
ℹ️ info: Initialized a distributedLock variable to hold the lock instance.
| @@ -718,6 +720,7 @@ the way that the Fleet server works. | |||
| } | |||
There was a problem hiding this comment.
ℹ️ info: Created a new distributed lock using redis_lock.NewLock(redisPool).
| @@ -730,6 +733,7 @@ the way that the Fleet server works. | |||
| ssoSessionStore, | |||
There was a problem hiding this comment.
ℹ️ info: Passed the distributedLock to the eeservice.NewService function.
| @@ -870,7 +874,7 @@ the way that the Fleet server works. | |||
| } else { | |||
There was a problem hiding this comment.
ℹ️ info: Updated the calendar schedule to use the distributedLock.
| foundNewEvents := false | ||
| for _, item := range list.(*calendar.Events).Items { | ||
| created, err := time.Parse(time.RFC3339, item.Created) | ||
| if err != nil { | ||
| log.Fatalf("Unable to parse event created time: %v", err) | ||
| } | ||
| if created.After(now) { | ||
| // Found events created after we started deleting events, so we should stop | ||
| foundNewEvents = true | ||
| continue // Skip this event but finish the loop to make sure we don't miss something | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Added logic to check if an event was created after the script started running and skip its deletion.
| pageToken = list.(*calendar.Events).NextPageToken | ||
| if pageToken == "" || foundNewEvents { | ||
| break | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Handled NextPageToken to continue pagination until all events are processed or new events are found.
|
|
||
| numberMoved := 0 | ||
| var maxResults int64 = 1000 | ||
| pageToken := "" |
There was a problem hiding this comment.
ℹ️ info: Added pageToken to handle pagination for large sets of events.
| foundNewEvents := false | ||
| for _, item := range list.(*calendar.Events).Items { | ||
| created, err := time.Parse(time.RFC3339, item.Created) | ||
| if err != nil { | ||
| log.Fatalf("Unable to parse event created time: %v", err) | ||
| } | ||
| if created.After(now) { | ||
| // Found events created after we started moving events, so we should stop | ||
| foundNewEvents = true | ||
| continue // Skip this event but finish the loop to make sure we don't miss something | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Added logic to skip events created after the script started running to avoid moving new events.
| pageToken = list.(*calendar.Events).NextPageToken | ||
| if pageToken == "" || foundNewEvents { | ||
| break |
There was a problem hiding this comment.
ℹ️ info: Ensure pageToken is updated and check for new events to decide when to break the loop.
There was a problem hiding this comment.
PR Summary
(updates since last review)
- Integrated distributed lock mechanism for calendar callbacks in
server/cron/calendar_cron.go - Ensured lock acquisition and release around calendar event processing
- Updated cron job functions to handle lock reservation and event updates
1 file(s) reviewed, 3 comment(s)
Edit PR Review Bot Settings
| logger kitlog.Logger, | ||
| ) error { | ||
|
|
||
| // Try to acquire the lock. Lock is needed to ensure calendar callback is not processed for this event at the same time. |
There was a problem hiding this comment.
ℹ️ info: Acquiring the lock to ensure calendar callback is not processed concurrently.
|
|
||
| // Remove event from the queue so that we don't process this event again. | ||
| // Note: This item can be added back to the queue while we are processing it. | ||
| err = distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID) |
There was a problem hiding this comment.
ℹ️ info: Removing event from the queue to prevent reprocessing.
| return errors.New("could not acquire calendar lock in time") | ||
| } | ||
| } | ||
| defer func() { |
There was a problem hiding this comment.
ℹ️ info: Ensure locks are released after processing to avoid deadlocks.
|
Bot is chatty. Should I disable the "info" comments? |
There was a problem hiding this comment.
PR Summary
(updates since last review)
- Introduced distributed lock mechanism for Google Calendar callbacks in
ee/server/service/calendar.go - Ensured lock acquisition and release around event processing
- Added events to a queue if the lock is reserved
- Enabled asynchronous processing of events when needed
No major changes found since last review.
1 file(s) reviewed, 6 comment(s)
Edit PR Review Bot Settings
| lockValue, reserved, err := svc.getCalendarLock(ctx, eventUUID, true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // If lock has been reserved by cron, we will need to re-process this event in case the calendar event was changed after the cron job read it. | ||
| if lockValue == "" && !reserved { | ||
| // We did not get a lock, so there is nothing to do here | ||
| return nil |
There was a problem hiding this comment.
ℹ️ info: Ensures that if the lock is reserved, the event will be re-processed to handle any changes made after the cron job read it.
| // Remove event from the queue so that we don't process this event again. | ||
| // Note: This item can be added back to the queue while we are processing it. | ||
| err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "remove calendar event from queue") | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Removes the event from the queue to prevent re-processing while it is being handled.
| func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, lockValue string) { | ||
| ok, err := svc.distributedLock.ReleaseLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue) | ||
| if err != nil { | ||
| level.Error(svc.logger).Log("msg", "Failed to release calendar lock", "err", err) | ||
| } | ||
| if !ok { | ||
| // If the lock was not released, it will expire on its own. | ||
| level.Warn(svc.logger).Log("msg", "Failed to release calendar lock") | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Releases the distributed lock and handles potential errors.
| func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (lockValue string, reserved bool, err error) { | ||
| // Check if lock has been reserved, which means we can't have it. | ||
| reservedValue, err := svc.distributedLock.Get(ctx, calendar.ReservedLockKeyPrefix+eventUUID) | ||
| if err != nil { | ||
| return "", false, ctxerr.Wrap(ctx, err, "get calendar reserved lock") | ||
| } | ||
| reserved = reservedValue != nil | ||
| if reserved && !addToQueue { | ||
| // We flag the lock as reserved. | ||
| return "", reserved, nil | ||
| } |
There was a problem hiding this comment.
ℹ️ info: Acquires the distributed lock and adds the event to the queue if the lock is not available.
| func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) { | ||
| defer func() { | ||
| asyncMutex.Lock() | ||
| asyncCalendarProcessing = false | ||
| asyncMutex.Unlock() | ||
| }() |
There was a problem hiding this comment.
ℹ️ info: Processes calendar events asynchronously from the queue.
| func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID string) bool { | ||
| lockValue, _, err := svc.getCalendarLock(ctx, eventUUID, false) | ||
| if err != nil { | ||
| level.Error(svc.logger).Log("msg", "Failed to get calendar lock", "err", err) | ||
| return false | ||
| } | ||
| if lockValue == "" { | ||
| // We did not get a lock, so there is nothing to do here | ||
| return true | ||
| } | ||
| defer svc.releaseCalendarLock(ctx, eventUUID, lockValue) |
There was a problem hiding this comment.
ℹ️ info: Processes individual calendar events asynchronously, ensuring locks are managed correctly.
There was a problem hiding this comment.
PR Summary
(updates since last review)
- Introduced distributed lock mechanism in
server/cron/calendar_cron.go - Acquired and released locks around calendar event processing
- Handled lock acquisition timeouts
- Ensured events are not reprocessed
- Prevented race conditions and ensured data consistency
1 file(s) reviewed, 1 comment(s)
Edit PR Review Bot Settings
| case <-done: | ||
| // Lock was acquired. | ||
| if err != nil { | ||
| return fmt.Errorf("try to acquire calendar lock: %w", err) | ||
| } |
There was a problem hiding this comment.
🧠 logic: Ensure that err is properly checked before returning it to avoid potential nil pointer dereference.
| level.Info(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid", | ||
| eventUUID, "channel_id", channelID) | ||
| return err | ||
| return nil |
There was a problem hiding this comment.
Return nil so that we don't get retries?
There was a problem hiding this comment.
This seems to happen often for legitimate reasons that it doesn't make sense to flag it as error. Google calendar doesn't seem to retry on error. Just don't want the load balancer to see a bunch of 4XX errors and customers start complaining about it.
There was a problem hiding this comment.
PR Summary
(updates since last review)
- Introduced
Lockinterface in/server/fleet/calendar.gofor distributed lock management - Renamed parameter
nametokeyin/server/service/redis_lock/redis_lock.gofor consistency - Ensured synchronized operations for Google Calendar callbacks across distributed systems
2 file(s) reviewed, 4 comment(s)
Edit PR Review Bot Settings
| type Lock interface { | ||
| // AcquireLock attempts to acquire a lock with the given key. value is the value to set for the key, which is used to release the lock. | ||
| AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) | ||
| // ReleaseLock attempts to release a lock with the given key and value. If key does not exist or value does not match, the lock is not released. | ||
| ReleaseLock(ctx context.Context, key string, value string) (ok bool, err error) | ||
| // Get retrieves the value of the given key. If the key does not exist, nil is returned. | ||
| Get(ctx context.Context, key string) (*string, error) | ||
| // AddToSet adds the value to the set identified by the given key. | ||
| AddToSet(ctx context.Context, key string, value string) error | ||
| // RemoveFromSet removes the value from the set identified by the given key. | ||
| RemoveFromSet(ctx context.Context, key string, value string) error | ||
| // GetSet retrieves a slice of string values from the set identified by the given key. | ||
| GetSet(ctx context.Context, key string) ([]string, error) |
There was a problem hiding this comment.
ℹ️ info: Introduced a new Lock interface for managing distributed locks, which includes methods for acquiring, releasing, and managing locks and sets.
| return fleet.Lock(lock) | ||
| } | ||
|
|
||
| func (r *redisLock) AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) { |
There was a problem hiding this comment.
ℹ️ info: Renamed parameter name to key for consistency.
| return result, nil | ||
| } | ||
|
|
||
| func (r *redisLock) ReleaseLock(ctx context.Context, key string, value string) (ok bool, err error) { |
There was a problem hiding this comment.
ℹ️ info: Renamed parameter name to key for consistency.
| return members, nil | ||
| } | ||
|
|
||
| func (r *redisLock) Get(ctx context.Context, key string) (*string, error) { |
There was a problem hiding this comment.
ℹ️ info: Renamed parameter name to key for consistency.
|
Sorry for all the noise in the PR. I'm removing the bot review and will try to disable the "info" comments in the future. |
lucasmrod
left a comment
There was a problem hiding this comment.
LGTM!
Left a couple of questions.
| // If lock has been reserved by cron, we will need to re-process this event in case the calendar event was changed after the cron job read it. | ||
| if lockValue == "" && !reserved { | ||
| // We did not get a lock, so there is nothing to do here | ||
| return nil |
There was a problem hiding this comment.
What's this scenario? Should we log here?
There was a problem hiding this comment.
This is when our server could not get a lock (because another server has the lock). In the svc.getCalendarLock method, we added this event to the queue. So, the other server will re-process this event.
I could add a debug log here, but this might happen often in production.
| // Try to acquire the lock again in case it was released while we were adding the event to the queue. | ||
| result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0) | ||
| if err != nil { | ||
| return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock again") | ||
| } | ||
|
|
||
| if result == "" { | ||
| // We could not acquire the lock, so we are done here. | ||
| return "", reserved, nil | ||
| } |
There was a problem hiding this comment.
Is this optimization necessary?
There was a problem hiding this comment.
Yes, it catches the corner case where an event UUID was added to the queue, but no server actually processes the queue because all the locks have been released.
| // Lock interface for managing distributed locks. | ||
| type Lock interface { | ||
| // AcquireLock attempts to acquire a lock with the given key. value is the value to set for the key, which is used to release the lock. | ||
| AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) |
There was a problem hiding this comment.
Let's document what is expireMs and what does expireMs = 0 mean.
There was a problem hiding this comment.
ok, will add on the UUID branch
| // Now, we need to check if there are any events in the queue that need to be re-processed. | ||
| asyncMutex.Lock() | ||
| defer asyncMutex.Unlock() | ||
| if !asyncCalendarProcessing { | ||
| eventIDs, err := svc.distributedLock.GetSet(ctx, calendar.QueueKey) | ||
| if err != nil { | ||
| return ctxerr.Wrap(ctx, err, "get calendar event queue") | ||
| } | ||
| if len(eventIDs) > 0 { | ||
| asyncCalendarProcessing = true | ||
| go svc.processCalendarAsync(ctx, eventIDs) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Any issues with two Fleet instances running this async simultaneously and/or a Fleet instance running the cron simultaneously?
There was a problem hiding this comment.
It should work with multiple instances running it. They loop through the UUIDs and try to get a lock. If they don't get a lock, they try a different UUID. The load test should cover this.
| if err != nil { | ||
| if fleet.IsNotFound(err) { | ||
| // We found this event when the callback initially came in. So the event may have been removed from DB since then. | ||
| return true |
There was a problem hiding this comment.
Is this an issue? (Maybe processCalendarEventAsync should return an error?)
There was a problem hiding this comment.
This happens frequently because we create a new UUID when re-creating the event. The old UUID stuck in the queue will hit this. I'll update the comment to:
So the event may have been removed or re-created since then.
| // AcquireLock attempts to acquire a lock with the given key. value is the value to set for the key, which is used to release the lock. | ||
| AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) | ||
| // ReleaseLock attempts to release a lock with the given key and value. If key does not exist or value does not match, the lock is not released. | ||
| ReleaseLock(ctx context.Context, key string, value string) (ok bool, err error) |
There was a problem hiding this comment.
@@document that returns true if it was released, false otherwise (e.g. when value doesn't match).
|
|
||
| // Reference: https://redis.io/docs/latest/commands/sadd/ | ||
| _, err := conn.Do("SADD", r.testPrefix+key, value) | ||
| if err != nil { |
There was a problem hiding this comment.
Double checking: do you need to check errors.Is(err, redigo.ErrNil) here?
There was a problem hiding this comment.
No, redigo.ErrNil is only created when you use certain helper methods like redigo.String
|
|
||
| // Reference: https://redis.io/docs/latest/commands/srem/ | ||
| _, err := conn.Do("SREM", r.testPrefix+key, value) | ||
| if err != nil { |
There was a problem hiding this comment.
Double checking: do you need to check errors.Is(err, redigo.ErrNil) here?
|
|
||
| // Reference: https://redis.io/docs/latest/commands/smembers/ | ||
| members, err := redigo.Strings(conn.Do("SMEMBERS", r.testPrefix+key)) | ||
| if err != nil { |
There was a problem hiding this comment.
Double checking: do you need to check errors.Is(err, redigo.ErrNil) here?
There was a problem hiding this comment.
Yes, it is not documented, but I see the implementation can generate an ErrNil. I will add.
| // Lock interface for managing distributed locks. | ||
| type Lock interface { | ||
| // AcquireLock attempts to acquire a lock with the given key. value is the value to set for the key, which is used to release the lock. | ||
| AcquireLock(ctx context.Context, key string, value string, expireMs uint64) (result string, err error) |
There was a problem hiding this comment.
Yes, it looks like it should be bool. I'll work on it.
…20277) #19352 Fix for code review comment: #20156 (comment) Also includes changes from #20252 # Checklist for submitter If some of the following don't apply, delete the relevant line. <!-- Note that API documentation changes are now addressed by the product design team. --> - [x] Added/updated tests - [x] If database migrations are included, checked table schema to confirm autoupdate - For database migrations: - [x] Checked schema for all modified table for columns that will auto-update timestamps during migration. - [x] Confirmed that updating the timestamps is acceptable, and will not cause unwanted side effects. - [x] Ensured the correct collation is explicitly set for character columns (`COLLATE utf8mb4_unicode_ci`). - [x] Manual QA for all new/changed functionality
…20277) #19352 Fix for code review comment: #20156 (comment) Also includes changes from #20252 # Checklist for submitter If some of the following don't apply, delete the relevant line. <!-- Note that API documentation changes are now addressed by the product design team. --> - [x] Added/updated tests - [x] If database migrations are included, checked table schema to confirm autoupdate - For database migrations: - [x] Checked schema for all modified table for columns that will auto-update timestamps during migration. - [x] Confirmed that updating the timestamps is acceptable, and will not cause unwanted side effects. - [x] Ensured the correct collation is explicitly set for character columns (`COLLATE utf8mb4_unicode_ci`). - [x] Manual QA for all new/changed functionality (cherry picked from commit 7bcd61a)
|
This PR has been superseded by #20277 |
#19352
Adding distributed lock for Google calendar callback.
Checklist for submitter
If some of the following don't apply, delete the relevant line.