Skip to content

Commit

Permalink
crowdstrike: refactor host collector and improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Apr 26, 2024
1 parent daebc22 commit cdcbfe2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 51 deletions.
2 changes: 1 addition & 1 deletion packages/crowdstrike/changelog.yml
@@ -1,7 +1,7 @@
# newer versions go on top
- version: "1.33.0"
changes:
- description: Refactor alert collector and improve error handling.
- description: Refactor alert and host collector and improve error handling.
type: enhancement
link: https://github.com/elastic/integrations/pull/1
- version: "1.32.1"
Expand Down
125 changes: 75 additions & 50 deletions packages/crowdstrike/data_stream/host/agent/stream/cel.yml.hbs
Expand Up @@ -26,61 +26,86 @@ state:
redact:
fields: ~
program: |
(
state.with(
(
!state.want_more ?
request("GET", state.url + "/devices/queries/devices/v1?sort=modified_timestamp.asc&offset=0&limit=" + string(state.batch_size) + '&filter=modified_timestamp:>"' + (
has(state.cursor) && has(state.cursor.last_timestamp) && state.cursor.last_timestamp != null ?
state.cursor.last_timestamp + '"'
:
(now - duration(state.initial_interval)).format(time_layout.RFC3339) + '"'
))
:
request("GET", state.url + "/devices/queries/devices/v1?sort=modified_timestamp.asc&offset=" + string(state.offset) + "&limit=" + string(state.batch_size) + '&filter=modified_timestamp:>"' + (
has(state.cursor) && has(state.cursor.first_timestamp) && state.cursor.first_timestamp != null ?
state.cursor.first_timestamp + '"'
:
'"'
))
).do_request().as(resp, bytes(resp.Body).decode_json().as(body, {
state.with(
(
!state.want_more ?
request(
"GET",
state.url.trim_right("/") + "/devices/queries/devices/v1?" + {
"sort": ["modified_timestamp.asc"],
"offset": ["0"],
"limit": [string(state.batch_size)],
"filter": ['modified_timestamp:>"'+state.?cursor.last_timestamp.orValue(string(now - duration(state.initial_interval)))+'"'],
}.format_query()
)
:
request(
"GET",
state.url.trim_right("/") + "/devices/queries/devices/v1?" + {
"sort": ["modified_timestamp.asc"],
"offset": [string(state.offset)],
"limit": [string(state.batch_size)],
?"filter": has(state.?cursor.first_timestamp) ? optional.of(['modified_timestamp:>"'+state.cursor.first_timestamp+'"']) : optional.none(),
}.format_query()
)
).do_request().as(get_resp, get_resp.StatusCode == 200 ?
bytes(get_resp.Body).decode_json().as(body, {
"resources": has(body.resources) && body.resources.size() > 0 ? body.resources : "",
"want_more": ((int(state.offset) + body.resources.size()) < body.meta.pagination.total),
"offset": ((int(state.offset) + body.resources.size()) < body.meta.pagination.total) ?
int(state.offset) + int(body.resources.size())
int(state.offset) + body.resources.size()
:
0,
"url": state.url,
"batch_size": state.batch_size,
"initial_interval": state.initial_interval,
}))
).as(state, state.with(
!has(state.resources) || state.resources == "" ? {"events": []} :
post_request(
state.url + "/devices/entities/devices/v2",
"application/json",
{"ids": state.resources }.encode_json()
).do_request().as(resp, bytes(resp.Body).decode_json().as(inner_body, {
"events": inner_body.resources.map(e, {
"message": e.encode_json(),
}),
"cursor": {
"last_timestamp": (
has(inner_body.resources) && inner_body.resources.size() > 0 ?
inner_body.resources.map(e, e.modified_timestamp).max()
: has(state.cursor) && has(state.cursor.last_timestamp) ?
state.cursor.last_timestamp
:
null
),
"first_timestamp": (
has(state.cursor) && has(state.cursor.first_timestamp) && state.cursor.first_timestamp != null ?
( state.want_more ? state.cursor.first_timestamp : state.cursor.last_timestamp )
:
(now - duration(state.initial_interval)).format(time_layout.RFC3339)
),
},
}))
})
:
{
"events": {
"error": {
"code": string(get_resp.StatusCode),
"id": string(get_resp.Status),
"message": string(get_resp.Body)
},
},
"want_more": false,
}
)
).as(state, state.with(
!has(state.resources) ? state : // Exit early due to GET failure.
post_request(
state.url + "/devices/entities/devices/v2",
"application/json",
{"ids": state.resources }.encode_json()
).do_request().as(post_resp, post_resp.StatusCode == 200 ?
bytes(post_resp.Body).decode_json().as(inner_body, {
"events": inner_body.resources.map(e, {
"message": e.encode_json(),
}),
"cursor": {
?"last_timestamp": (
has(inner_body.resources) && inner_body.resources.size() > 0 ?
optional.of(inner_body.resources.map(e, e.modified_timestamp).max())
:
state.?cursor.last_timestamp
),
"first_timestamp": (
state.?cursor.first_timestamp.orValue(null) != null ?
(state.want_more ? state.cursor.first_timestamp : state.cursor.last_timestamp)
:
string(now - duration(state.initial_interval))
),
},
})
:
{
"events": {
"error": {
"code": string(post_resp.StatusCode),
"id": string(post_resp.Status),
"message": string(post_resp.Body)
},
},
"want_more": false,
}
)
)
)
Expand Down

0 comments on commit cdcbfe2

Please sign in to comment.