Skip to content

Commit

Permalink
address pr comment
Browse files Browse the repository at this point in the history
Wrap update and sync publication of users and devices together into single
tracker/write marker operations.
  • Loading branch information
efd6 committed Jun 21, 2023
1 parent 2d21dd8 commit 6f4faa5
Showing 1 changed file with 35 additions and 40 deletions.
75 changes: 35 additions & 40 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Expand Up @@ -150,32 +150,25 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
return err
}

if len(state.users) != 0 {
p.logger.Debugw("publishing users", "count", len(state.devices))

if len(state.users) != 0 || len(state.devices) != 0 {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)

tracker.Wait()
}

if len(state.devices) != 0 {
p.logger.Debugw("publishing devices", "count", len(state.devices))
if len(state.users) != 0 {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

tracker := kvstore.NewTxTracker(ctx)
}

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
if len(state.devices) != 0 {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
}
}

end := time.Now()
Expand Down Expand Up @@ -218,30 +211,32 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
return err
}

if updatedUsers.Len() != 0 {
if updatedUsers.Len() != 0 || updatedDevices.Len() != 0 {
tracker := kvstore.NewTxTracker(ctx)
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
p.logger.Warnf("Unable to lookup user %q", id)
return
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

tracker.Wait()
}
if updatedUsers.Len() != 0 {
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
p.logger.Warnf("Unable to lookup user %q", id)
return
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

if updatedDevices.Len() != 0 {
tracker := kvstore.NewTxTracker(ctx)
updatedDevices.ForEach(func(id uuid.UUID) {
d, ok := state.devices[id]
if !ok {
p.logger.Warnf("Unable to lookup device %q", id)
return
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
}

if updatedDevices.Len() != 0 {
updatedDevices.ForEach(func(id uuid.UUID) {
d, ok := state.devices[id]
if !ok {
p.logger.Warnf("Unable to lookup device %q", id)
return
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
}
Expand Down

0 comments on commit 6f4faa5

Please sign in to comment.