Skip to content
6 changes: 5 additions & 1 deletion cmd/fleet/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/fleetdm/fleet/v4/server/pubsub"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Imported the redis_lock package to support distributed locking.

"github.com/fleetdm/fleet/v4/server/service"
"github.com/fleetdm/fleet/v4/server/service/async"
"github.com/fleetdm/fleet/v4/server/service/redis_lock"
"github.com/fleetdm/fleet/v4/server/service/redis_policy_set"
"github.com/fleetdm/fleet/v4/server/sso"
"github.com/fleetdm/fleet/v4/server/version"
Expand Down Expand Up @@ -691,6 +692,7 @@ the way that the Fleet server works.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Initialized a distributedLock variable to hold the lock instance.


var softwareInstallStore fleet.SoftwareInstallerStore
var distributedLock fleet.Lock
if license.IsPremium() {
profileMatcher := apple_mdm.NewProfileMatcher(redisPool)
if config.S3.SoftwareInstallersBucket != "" {
Expand Down Expand Up @@ -718,6 +720,7 @@ the way that the Fleet server works.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Created a new distributed lock using redis_lock.NewLock(redisPool).

}

distributedLock = redis_lock.NewLock(redisPool)
svc, err = eeservice.NewService(
svc,
ds,
Expand All @@ -730,6 +733,7 @@ the way that the Fleet server works.
ssoSessionStore,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Passed the distributedLock to the eeservice.NewService function.

profileMatcher,
softwareInstallStore,
distributedLock,
)
if err != nil {
initFatal(err, "initial Fleet Premium service")
Expand Down Expand Up @@ -870,7 +874,7 @@ the way that the Fleet server works.
} else {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Updated the calendar schedule to use the distributedLock.

config.Calendar.Periodicity = 5 * time.Minute
}
return cron.NewCalendarSchedule(ctx, instanceID, ds, config.Calendar, logger)
return cron.NewCalendarSchedule(ctx, instanceID, ds, distributedLock, config.Calendar, logger)
},
); err != nil {
initFatal(err, "failed to register calendar schedule")
Expand Down
218 changes: 213 additions & 5 deletions ee/server/service/calendar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/fleetdm/fleet/v4/server/authz"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/service/calendar"
"github.com/go-kit/log/level"
"sync"
"github.com/google/uuid"
)

var asyncCalendarProcessing bool
var asyncMutex sync.Mutex

func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error {

// We don't want the sender to cancel the context since we want to make sure we process the webhook.
ctx = context.WithoutCancel(ctx)
Comment on lines +21 to +22
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Using context.WithoutCancel ensures the webhook processing is not prematurely canceled, which is crucial for reliable processing.


appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
return fmt.Errorf("load app config: %w", err)
Expand All @@ -36,9 +44,9 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
svc.authz.SkipAuthorization(ctx)
if fleet.IsNotFound(err) {
// We could try to stop the channel callbacks here, but that may not be secure since we don't know if the request is legitimate
level.Warn(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid",
level.Info(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may occur when a calendar event is created, and another change to the calendar occurs at the same time, triggering a callback. The callback comes in before we saved the event in the DB (or before it replicated to the replica). This is OK since we don't expect the event to be immediately modified upon creation.

eventUUID, "channel_id", channelID)
return err
return nil
Comment on lines +47 to +49
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Changed log level from Warn to Info for missing event in the database. This reduces the severity of the log message.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return nil so that we don't get retries?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to happen often for legitimate reasons that it doesn't make sense to flag it as error. Google calendar doesn't seem to retry on error. Just don't want the load balancer to see a bunch of 4XX errors and customers start complaining about it.

}
return err
}
Expand All @@ -47,7 +55,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return fmt.Errorf("calendar event %s has no team ID", eventUUID)
}

localConfig := &calendar.CalendarConfig{
localConfig := &calendar.Config{
GoogleCalendarIntegration: *googleCalendarIntegrationConfig,
ServerURL: appConfig.ServerSettings.ServerURL,
}
Expand All @@ -63,6 +71,60 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return authz.ForbiddenWithInternal(fmt.Sprintf("calendar channel ID mismatch: %s != %s", savedChannelID, channelID), nil, nil, nil)
}

lockValue, reserved, err := svc.getCalendarLock(ctx, eventUUID, true)
if err != nil {
return err
}
// If lock has been reserved by cron, we will need to re-process this event in case the calendar event was changed after the cron job read it.
if lockValue == "" && !reserved {
// We did not get a lock, so there is nothing to do here
return nil
Comment on lines +74 to +81
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Acquiring a distributed lock to ensure only one instance processes the event at a time.

Comment on lines +74 to +81
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Ensures that if the lock is reserved, the event will be re-processed to handle any changes made after the cron job read it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this scenario? Should we log here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when our server could not get a lock (because another server has the lock). In the svc.getCalendarLock method, we added this event to the queue. So, the other server will re-process this event.

I could add a debug log here, but this might happen often in production.

}

if !reserved {
unlocked := false
defer func() {
if !unlocked {
svc.releaseCalendarLock(ctx, eventUUID, lockValue)
}
}()

// Remove event from the queue so that we don't process this event again.
// Note: This item can be added back to the queue while we are processing it.
err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
return ctxerr.Wrap(ctx, err, "remove calendar event from queue")
}
Comment on lines +92 to +97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Removing the event from the queue to prevent re-processing while it is being handled.

Comment on lines +92 to +97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Removes the event from the queue to prevent re-processing while it is being handled.


err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar)
if err != nil {
return err
}
svc.releaseCalendarLock(ctx, eventUUID, lockValue)
unlocked = true
}

// Now, we need to check if there are any events in the queue that need to be re-processed.
asyncMutex.Lock()
defer asyncMutex.Unlock()
if !asyncCalendarProcessing {
eventIDs, err := svc.distributedLock.GetSet(ctx, calendar.QueueKey)
if err != nil {
return ctxerr.Wrap(ctx, err, "get calendar event queue")
}
if len(eventIDs) > 0 {
asyncCalendarProcessing = true
go svc.processCalendarAsync(ctx, eventIDs)
}
Comment on lines +107 to +118
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Checking and processing any events in the queue that need re-processing asynchronously.

return nil
}
Comment on lines +107 to +120
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any issues with two Fleet instances running this async simultaneously and/or a Fleet instance running the cron simultaneously?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work with multiple instances running it. They loop through the UUIDs and try to get a lock. If they don't get a lock, they try a different UUID. The load test should cover this.


return nil
}

func (svc *Service) processCalendarEvent(ctx context.Context, eventDetails *fleet.CalendarEventDetails,
googleCalendarIntegrationConfig *fleet.GoogleCalendarIntegration, userCalendar fleet.UserCalendar) error {

genBodyFn := func(conflict bool) (body string, ok bool, err error) {

// This function is called when a new event is being created.
Expand Down Expand Up @@ -113,7 +175,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return calendar.GenerateCalendarEventBody(ctx, svc.ds, team.Name, host, &sync.Map{}, conflict, svc.logger), true, nil
}

err = userCalendar.Configure(eventDetails.Email)
err := userCalendar.Configure(eventDetails.Email)
if err != nil {
return ctxerr.Wrap(ctx, err, "configure calendar")
}
Expand All @@ -129,5 +191,151 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return ctxerr.Wrap(ctx, err, "create or update calendar event")
}
}

return nil
}

func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, lockValue string) {
ok, err := svc.distributedLock.ReleaseLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to release calendar lock", "err", err)
}
if !ok {
// If the lock was not released, it will expire on its own.
level.Warn(svc.logger).Log("msg", "Failed to release calendar lock")
}
Comment on lines +198 to +206
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Releasing the distributed lock and handling potential errors.

Comment on lines +198 to +206
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Releases the distributed lock and handles potential errors.

}

func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (lockValue string, reserved bool, err error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Acquiring the distributed lock and adding the event to the queue if the lock is not available.

// Check if lock has been reserved, which means we can't have it.
reservedValue, err := svc.distributedLock.Get(ctx, calendar.ReservedLockKeyPrefix+eventUUID)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "get calendar reserved lock")
}
reserved = reservedValue != nil
if reserved && !addToQueue {
// We flag the lock as reserved.
return "", reserved, nil
}
Comment on lines +209 to +219
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Acquires the distributed lock and adds the event to the queue if the lock is not available.

var result string
if !reserved {
// Try to acquire the lock
lockValue = uuid.New().String()
result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock")
}
}
if (result == "" || reserved) && addToQueue {
// Could not acquire lock, so we are already processing this event. In this case, we add the event to
// the queue (actually a set) to indicate that we need to re-process the event.
err = svc.distributedLock.AddToSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "add calendar event to queue")
}

if reserved {
// We flag the lock as reserved.
return "", reserved, nil
}

// Try to acquire the lock again in case it was released while we were adding the event to the queue.
result, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock again")
}

if result == "" {
// We could not acquire the lock, so we are done here.
return "", reserved, nil
}
Comment on lines +242 to +251
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this optimization necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it catches the corner case where an event UUID was added to the queue, but no server actually processes the queue because all the locks have been released.

}
return lockValue, false, nil
}

func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Processing calendar events asynchronously from the queue.

defer func() {
asyncMutex.Lock()
asyncCalendarProcessing = false
asyncMutex.Unlock()
}()
Comment on lines +256 to +261
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Processes calendar events asynchronously from the queue.

for {
if len(eventIDs) == 0 {
return
}
for _, eventUUID := range eventIDs {
if ok := svc.processCalendarEventAsync(ctx, eventUUID); !ok {
return
}
}

// Now we check whether there are any more events in the queue.
var err error
eventIDs, err = svc.distributedLock.GetSet(ctx, calendar.QueueKey)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to get calendar event queue", "err", err)
return
}
}
}

func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID string) bool {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Processing individual calendar events asynchronously, ensuring locks are managed correctly.

lockValue, _, err := svc.getCalendarLock(ctx, eventUUID, false)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to get calendar lock", "err", err)
return false
}
if lockValue == "" {
// We did not get a lock, so there is nothing to do here
return true
}
defer svc.releaseCalendarLock(ctx, eventUUID, lockValue)
Comment on lines +282 to +292
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Processes individual calendar events asynchronously, ensuring locks are managed correctly.


// Remove event from the queue so that we don't process this event again.
// Note: This item can be added back to the queue while we are processing it.
err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to remove calendar event from queue", "err", err)
return false
}

appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to load app config", "err", err)
return false
}

if len(appConfig.Integrations.GoogleCalendar) == 0 {
// Google Calendar integration is not configured
return true
}
googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0]

eventDetails, err := svc.ds.GetCalendarEventDetailsByUUID(ctx, eventUUID)
if err != nil {
if fleet.IsNotFound(err) {
// We found this event when the callback initially came in. So the event may have been removed from DB since then.
return true
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an issue? (Maybe processCalendarEventAsync should return an error?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens frequently because we create a new UUID when re-creating the event. The old UUID stuck in the queue will hit this. I'll update the comment to:

So the event may have been removed or re-created since then.

}
level.Error(svc.logger).Log("msg", "Failed to get calendar event details", "err", err)
return false
}
if eventDetails.TeamID == nil {
// Should not happen
level.Error(svc.logger).Log("msg", "Calendar event has no team ID", "uuid", eventUUID)
return false
}

localConfig := &calendar.Config{
GoogleCalendarIntegration: *googleCalendarIntegrationConfig,
ServerURL: appConfig.ServerSettings.ServerURL,
}
userCalendar := calendar.CreateUserCalendarFromConfig(ctx, localConfig, svc.logger)

err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to process calendar event", "err", err)
return false
}
return true
}
1 change: 1 addition & 0 deletions ee/server/service/mdm_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func setupMockDatastorePremiumService() (*mock.Store, *eeservice.Service, contex
nil,
nil,
nil,
nil,
)
if err != nil {
panic(err)
Expand Down
3 changes: 3 additions & 0 deletions ee/server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Service struct {
depService *apple_mdm.DEPService
profileMatcher fleet.ProfileMatcher
softwareInstallStore fleet.SoftwareInstallerStore
distributedLock fleet.Lock
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Added distributedLock field to the Service struct to support distributed locking.

}

func NewService(
Expand All @@ -42,6 +43,7 @@ func NewService(
sso sso.SessionStore,
profileMatcher fleet.ProfileMatcher,
softwareInstallStore fleet.SoftwareInstallerStore,
distributedLock fleet.Lock,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ info: Included distributedLock parameter in the NewService function to initialize the new field.

) (*Service, error) {
authorizer, err := authz.NewAuthorizer()
if err != nil {
Expand All @@ -61,6 +63,7 @@ func NewService(
depService: apple_mdm.NewDEPService(ds, depStorage, logger),
profileMatcher: profileMatcher,
softwareInstallStore: softwareInstallStore,
distributedLock: distributedLock,
}

// Override methods that can't be easily overriden via
Expand Down
Loading