Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on…
Browse files Browse the repository at this point in the history
… unwanted datasets

During full sync the provider may have state from a previous dataset. So
in the case that the user has changed dataset from users to devices or
vice versa the provider may publish already existing state in the entity
graph. This change adds conditional checks to ensure that unwanted
dataset records are not published.
  • Loading branch information
efd6 committed Oct 5, 2023
1 parent febe538 commit 99c1b96
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ is collected by it.
- Add CEL partial value debug function. {pull}36652[36652]
- Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713]
- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]

*Auditbeat*

Expand Down
105 changes: 59 additions & 46 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
return err
}

if len(state.users) != 0 || len(state.devices) != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if len(state.users) != 0 {
if len(state.users) != 0 && wantUsers {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

}

if len(state.devices) != 0 {
if len(state.devices) != 0 && wantDevices {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
Expand Down Expand Up @@ -212,10 +212,12 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
return err
}

if updatedUsers.Len() != 0 || updatedDevices.Len() != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (updatedUsers.Len() != 0 && wantUsers) || (updatedDevices.Len() != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

if updatedUsers.Len() != 0 {
if updatedUsers.Len() != 0 && wantUsers {
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
Expand All @@ -224,10 +226,9 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

}

if updatedDevices.Len() != 0 {
if updatedDevices.Len() != 0 && wantDevices {
updatedDevices.ForEach(func(id uuid.UUID) {
d, ok := state.devices[id]
if !ok {
Expand All @@ -236,7 +237,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
Expand Down Expand Up @@ -269,32 +269,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

var (
wantUsers = p.conf.wantUsers()
changedUsers []*fetcher.User
userLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "users":
if wantUsers {
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
default:
} else {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}

var (
wantDevices = p.conf.wantDevices()
changedDevices []*fetcher.Device
deviceLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "devices":
if wantDevices {
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
default:
} else {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}

Expand Down Expand Up @@ -337,6 +337,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
for _, member := range g.Members {
switch member.Type {
case fetcher.MemberGroup:
if !wantUsers {
break
}
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(member.ID) {
updatedUsers.Add(u.ID)
Expand All @@ -349,6 +352,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberUser:
if !wantUsers {
break
}
if u, ok := state.users[member.ID]; ok {
updatedUsers.Add(u.ID)
if member.Deleted {
Expand All @@ -359,6 +365,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberDevice:
if !wantDevices {
break
}
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
Expand All @@ -372,42 +381,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

// Expand user group memberships.
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}
if wantUsers {
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}

u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
})
})
})
}

// Expand device group memberships.
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}
if wantDevices {
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}

d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})
})
}

return updatedUsers, updatedDevices, nil
}
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func defaultConf() conf {
UpdateInterval: defaultUpdateInterval,
}
}

func (c *conf) wantUsers() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "users":
return true
default:
return false
}
}

func (c *conf) wantDevices() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "devices":
return true
default:
return false
}
}

0 comments on commit 99c1b96

Please sign in to comment.