Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/entityanalytics/provider/azuread/fetcher: add device handling #35807

Merged
merged 4 commits into from Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -322,6 +322,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520]
- Add Okta input package for entity analytics. {pull}35611[35611]
- Expose harvester metrics from filestream input {pull}35835[35835] {issue}33771[33771]
- Add device support for Azure AD entity analytics. {pull}35807[35807]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
99 changes: 75 additions & 24 deletions x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc
Expand Up @@ -54,6 +54,9 @@ provider to function properly:

|User.Read.All
|Application

|Device.Read.All
|Application
|===

For a full guide on how to set up the necessary App Registration, permission
Expand All @@ -66,50 +69,54 @@ granting, and secret configuration, follow this https://learn.microsoft.com/en-u
===== Overview

The Azure AD provider periodically contacts Azure Active Directory, retrieving
updates for users and groups, updates its internal cache of user metadata and
group membership information, and ships updated user metadata to Elasticsearch.

Fetching and shipping updates occurs in one of two processes: **full
synchronizations** and *incremental updates*. Full synchronizations will send the
entire list of users in state, along with write markers to indicate the start
and end of the synchronization event. Incremental updates will only send data
for changed users during that event. Changes on a user can come in many forms,
whether it be a change to the user's metadata, a user was added or deleted, or
group membership was changed (either direct or transitive).
updates for users, devices and groups, updates its internal cache of user and
device metadata and group membership information, and ships updated user metadata
to Elasticsearch.

Fetching and shipping updates occurs in one of two processes: *full
synchronizations* and *incremental updates*. Full synchronizations will send the
entire list of users and devices in state, along with write markers to indicate
the start and end of the synchronization event. Incremental updates will only
send data for changed users and devices during that event. Changes on a user or
device can come in many forms, whether it be a change to the user or device
metadata, a user/device was added or deleted, or group membership was changed
(either direct or transitive).

[float]
===== API Interactions

The provider periodically retrieves changes to user and group metadata from the
Microsoft Graph API for Azure Active Directory. This is done through calls to
two API endpoints:
The provider periodically retrieves changes to user, device and group metadata
from the Microsoft Graph API for Azure Active Directory. This is done through
calls to three API endpoints:

- https://learn.microsoft.com/en-us/graph/api/user-delta?view=graph-rest-1.0&tabs=http[/users/delta]
- https://learn.microsoft.com/en-us/graph/api/device-delta?view=graph-rest-1.0&tabs=http[/devices/delta]
- https://learn.microsoft.com/en-us/graph/api/group-delta?view=graph-rest-1.0&tabs=http[/groups/delta]

The `/delta` endpoint will provide changes that have occurred since the last
call, with state being tracked through a delta token. If the /delta endpoint is
called without a delta token, it will provide a full listing of users or groups,
similar to the non-delta endpoint. Since many results may be returned, there is
a paging mechanism that is used. In the response body, there are two fields that
may appear, `@odata.nextLink` and `@odata.deltaLink`.
called without a delta token, it will provide a full listing of users, devices
or groups, similar to the non-delta endpoint. Since many results may be returned,
there is a paging mechanism that is used. In the response body, there are two
fields that may appear, `@odata.nextLink` and `@odata.deltaLink`.

- If a `@odata.nextLink` is returned, then there are more results to fetch, and
the value of this field will contain the URL which should be immediately fetched.
- If a `@odata.deltaLink` is returned, then there are currently no more results,
and the value of this field (a URL) should be saved for the next time updates
need to be fetched (the delta token).

The group metadata will be used to enrich users with group membership information.
Direct memberships, along with transitive memberships, will provided on users.
The group metadata will be used to enrich users and devices with group membership
information. Direct memberships, along with transitive memberships, will be provided
for users and devices.

[float]
===== Sending User Metadata to Elasticsearch
===== Sending User and Device Metadata to Elasticsearch

During a full synchronization, all users stored in state will be sent to the
output, while incremental updates will only send users which have been updated.
Full synchronizations will be bounded on either side by write marker documents,
which will look something like this:
During a full synchronization, all users and devices stored in state will be sent
to the output, while incremental updates will only send users which have been
updated. Full synchronizations will be bounded on either side by write marker
documents, which will look something like this:

["source","json",subs="attributes"]
----
Expand Down Expand Up @@ -165,6 +172,50 @@ Example user document:
}
----

Device documents will show the current state of the device.

Example device document:
taylor-swanson marked this conversation as resolved.
Show resolved Hide resolved

["source","json",subs="attributes"]
----
{
"@timestamp": "2022-11-04T09:57:19.786056-05:00",
"event": {
"action": "device-discovered",
},
"azure_ad": {
"accountEnabled": true,
"displayName": "DESKTOP-LETW452G",
"operatingSystem": "Windows",
"operatingSystemVersion": "10.0.19043.1337",
"physicalIds": {
"extensionAttributes": {
"extensionAttribute1": "BYOD-Device"
}
},
"alternativeSecurityIds": [
{
"type": 2,
"identityProvider": null,
"key": "DGFSGHSGGTH345A...35DSFH0A"
},
]
},
"device": {
"id": "2fbbb8f9-ff67-4a21-b867-a344d18a4198",
"group": [
{
"id": "331676df-b8fd-4492-82ed-02b927f8dd80",
"name": "group1"
}
]
},
"labels": {
"identity_source": "azure-1"
}
}
----

[float]
==== Configuration

Expand Down
151 changes: 130 additions & 21 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Expand Up @@ -146,17 +146,29 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
if _, err = p.doFetch(ctx, state, true); err != nil {
if _, _, err = p.doFetch(ctx, state, true); err != nil {
return err
}

if len(state.users) != 0 {
if len(state.users) != 0 || len(state.devices) != 0 {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)

if len(state.users) != 0 {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering, do we want a single set of writer markers to bound both users and devices? If we want them separate, do we want to attach any sort of field to indicate what the write markers are associated with? (device or user).

Thoughts, @andrewkroh?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, either is acceptable to me. If it is two separate marks then someone who is querying would need to be know if it was associated to the users or devices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISTM a single set makes more sense. devices are users are really pretty much the same kind of entity (at the moment more so, with the TODO additions less so).

if len(state.devices) != 0 {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
}
}

end := time.Now()
Expand Down Expand Up @@ -194,21 +206,37 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
updatedUsers, err := p.doFetch(ctx, state, false)
updatedUsers, updatedDevices, err := p.doFetch(ctx, state, false)
if err != nil {
return err
}

if updatedUsers.Len() != 0 {
if updatedUsers.Len() != 0 || updatedDevices.Len() != 0 {
tracker := kvstore.NewTxTracker(ctx)
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
p.logger.Warnf("Unable to lookup user %q", id, u.ID)
return
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

if updatedUsers.Len() != 0 {
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
p.logger.Warnf("Unable to lookup user %q", id)
return
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

}

if updatedDevices.Len() != 0 {
updatedDevices.ForEach(func(id uuid.UUID) {
d, ok := state.devices[id]
if !ok {
p.logger.Warnf("Unable to lookup device %q", id)
return
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
}
Expand All @@ -229,37 +257,47 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
// and enriching users with group memberships. If fullSync is true, then any
// existing deltaLink will be ignored, forcing a full synchronization from
// Azure Active Directory. Returns a set of modified users by ID.
func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (collections.UUIDSet, error) {
var updatedUsers collections.UUIDSet
var usersDeltaLink string
var groupsDeltaLink string
func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (updatedUsers, updatedDevices collections.UUIDSet, err error) {
var usersDeltaLink, devicesDeltaLink, groupsDeltaLink string
bhapas marked this conversation as resolved.
Show resolved Hide resolved

// Get user changes.
if !fullSync {
usersDeltaLink = state.usersLink
devicesDeltaLink = state.devicesLink
groupsDeltaLink = state.groupsLink
}

changedUsers, userLink, err := p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, err
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))

changedDevices, deviceLink, err := p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedUsers))

// Get group changes.
changedGroups, groupLink, err := p.fetcher.Groups(ctx, groupsDeltaLink)
if err != nil {
return updatedUsers, err
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d groups from API", len(changedGroups))

state.usersLink = userLink
state.devicesLink = deviceLink
state.groupsLink = groupLink

for _, v := range changedUsers {
updatedUsers.Add(v.ID)
state.storeUser(v)
}
for _, v := range changedDevices {
updatedDevices.Add(v.ID)
state.storeDevice(v)
}
for _, v := range changedGroups {
state.storeGroup(v)
}
Expand Down Expand Up @@ -299,6 +337,16 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
u.MemberOf.Add(g.ID)
}
}

case fetcher.MemberDevice:
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
d.MemberOf.Remove(g.ID)
} else {
d.MemberOf.Add(g.ID)
}
}
}
}
}
Expand All @@ -312,6 +360,7 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}

Expand All @@ -321,7 +370,26 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
})
})

return updatedUsers, nil
// Expand device group memberships.
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
taylor-swanson marked this conversation as resolved.
Show resolved Hide resolved
}

d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})

return updatedUsers, updatedDevices, nil
}

// publishMarker will publish a write marker document using the given beat.Client.
Expand Down Expand Up @@ -394,6 +462,47 @@ func (p *azure) publishUser(u *fetcher.User, state *stateStore, inputID string,
client.Publish(event)
}

// publishDevice will publish a device document using the given beat.Client.
func (p *azure) publishDevice(d *fetcher.Device, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker) {
deviceDoc := mapstr.M{}

_, _ = deviceDoc.Put("azure_ad", d.Fields)
_, _ = deviceDoc.Put("labels.identity_source", inputID)
_, _ = deviceDoc.Put("device.id", d.ID.String())

if d.Deleted {
_, _ = deviceDoc.Put("event.action", "device-deleted")
} else if d.Discovered {
_, _ = deviceDoc.Put("event.action", "device-discovered")
} else if d.Modified {
_, _ = deviceDoc.Put("event.action", "device-modified")
}

var groups []fetcher.GroupECS
d.TransitiveMemberOf.ForEach(func(groupID uuid.UUID) {
g, ok := state.groups[groupID]
if !ok {
p.logger.Warnf("Unable to lookup group %q for device %q", groupID, d.ID)
return
}
groups = append(groups, g.ToECS())
})
if len(groups) != 0 {
_, _ = deviceDoc.Put("device.group", groups)
}

event := beat.Event{
Timestamp: time.Now(),
Fields: deviceDoc,
Private: tracker,
}
tracker.Add()

p.logger.Debugf("Publishing device %q", d.ID)

client.Publish(event)
}

// configure configures this provider using the given configuration.
func (p *azure) configure(cfg *config.C) (kvstore.Input, error) {
var err error
Expand Down