[Cato Networks][Event] Add cato networks event data-stream#17775
[Cato Networks][Event] Add cato networks event data-stream#17775sharadcrest wants to merge 5 commits intoelastic:feature/cato_networks-0.1.0from
Conversation
ReviewersBuildkite won't run for external contributors automatically; you need to add a comment:
NOTE: https://github.com/elastic/integrations/blob/main/.buildkite/pull-requests.json contains all those details. |
ShourieG
left a comment
There was a problem hiding this comment.
🤖 AI-Generated Review | Elastic Integration PR Review Bot
⚠️ This is an automated review generated by an AI assistant. Please verify all suggestions before applying changes. This review does not represent a human reviewer's opinion.
PR Review | elastic/integrations #17775
Field Mapping
Data Stream: event (package: cato_networks)
File: packages/cato_networks/data_stream/event/fields/fields.yml
Issue 1: All custom fields missing description properties
Severity: 🟡 Medium
Location: packages/cato_networks/data_stream/event/fields/fields.yml line 7
Problem: All ~90 custom fields under cato_networks.event are missing description properties. Every custom field definition must include a meaningful description explaining what the field contains.
Recommendation:
- name: account_id
type: keyword
description: |
Unique identifier of the Cato Networks account associated with this event.
- name: event_message
type: match_only_text
description: |
Human-readable message describing the event.Issue 2: time_str field name implies string but is typed as date
Severity: 🟡 Medium
Location: packages/cato_networks/data_stream/event/fields/fields.yml line 183
Problem: time_str is named with a _str suffix implying a raw string representation, yet it is typed as date. If the pipeline parses this into a proper date, the name is misleading; if it is stored as-is (a string), the type is wrong.
Recommendation:
# Option A: if pipeline parses it
- name: time_parsed
type: date
description: Parsed timestamp of the event.
# Option B: if stored as raw string
- name: time_str
type: keyword
description: Raw string representation of the event timestamp.Issue 3: src_is_site_or_vpn uses boolean-style is_ prefix but is typed as keyword
Severity: 🔵 Low
Location: packages/cato_networks/data_stream/event/fields/fields.yml line 159
Problem: src_is_site_or_vpn uses the is_ boolean-style prefix but is typed as keyword. The name implies a true/false flag, but the type suggests it holds string values.
Recommendation:
# If boolean:
- name: src_is_site_or_vpn
type: boolean
# If string enum ("site" / "vpn"):
- name: src_connection_type
type: keyword💡 Suggestions
user_agent(line 193–194) — Consider adding amulti_fieldsentry withtype: textto enable full-text search on user agent strings, which are often queried with partial matches.mitre_attack_*fields (lines 117–122) — These fields likely hold arrays of values (multiple tactics/techniques per event). Consider documenting this in the description and verifying the pipeline sets them correctly as arrays.categoriesanddevice_categories(lines 31–32, 59–60) — Similarly likely to be multi-value; descriptions should clarify this.host_mac(line 97–98) — MAC addresses are sometimes stored askeyword; this is acceptable, but a description clarifying the format (e.g., colon-separated hex) would be helpful.
Pipeline
Data Stream: event (package: cato_networks)
File: packages/cato_networks/data_stream/event/elasticsearch/ingest_pipeline/default.yml
Issue 1: Unsafe null-dereference in rename processor if condition
Severity: 🟠 High
Location: packages/cato_networks/data_stream/event/elasticsearch/ingest_pipeline/default.yml line 472
Problem: The if condition ctx.cato_networks.event.internal_id == null uses unguarded dot-notation access. If cato_networks or cato_networks.event does not exist in the document context (e.g., the preceding rename of json.internalId found no field), this will throw a NullPointerException and route the document to the global on_failure handler unexpectedly.
Recommendation:
- rename:
field: json.internal_id
tag: rename_internal_id
target_field: cato_networks.event.internal_id
ignore_missing: true
if: ctx.cato_networks?.event?.internal_id == nullIssue 2: script processor for host.os.type mapping has no on_failure handler
Severity: 🟡 Medium
Location: packages/cato_networks/data_stream/event/elasticsearch/ingest_pipeline/default.yml line 565
Problem: The script processor for host.os.type mapping has no on_failure handler. If the script throws a runtime exception (e.g., unexpected type for os_type), the error will propagate to the global handler without a descriptive tag-based message.
Recommendation:
- script:
tag: script_map_host_os_type
lang: painless
if: ctx.cato_networks?.event?.os_type != null
# ... params and source ...
on_failure:
- append:
field: error.message
value: 'Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} failed with message: {{{_ingest.on_failure_message}}}'Issue 3: Global on_failure handler sets event.kind after error.message append
Severity: 🔵 Low
Location: packages/cato_networks/data_stream/event/elasticsearch/ingest_pipeline/default.yml line 1057
Problem: The global on_failure handler appends to error.message before setting event.kind: pipeline_error. The recommended order is to set event.kind first so that if the append itself fails, the document is still correctly classified.
Recommendation:
on_failure:
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: error.message
value: >-
Processor '{{{_ingest.on_failure_processor_type}}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{_ingest.on_failure_processor_tag}}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{_ingest.on_failure_message}}}'
- append:
field: tags
value: preserve_original_event
allow_duplicates: false💡 Suggestions
event.categorynot set — The pipeline setsevent.kind: eventandevent.type: infobut never setsevent.category. Consider conditionally settingevent.categorybased oncato_networks.event.event_typeorcato_networks.event.event_sub_type(e.g.,network,authentication,intrusion_detection).@timestampfallback missing —@timestampis set fromcato_networks.event.time_str(ISO8601) but there is no fallback tocato_networks.event.time(UNIX_MS) whentime_stris absent. Consider adding a conditionalsetfor@timestampfromcato_networks.event.timewhentime_stris null.cato_networks.event.timenot removed in cleanup —cato_networks.event.time_stris removed in the cleanup block (line 1007), butcato_networks.event.time(the UNIX_MS parsed date) is not removed. If the intent is to keep only the parsed timestamp in@timestamp, consider removingcato_networks.event.timeas well, or document why it is retained.destination.addressandsource.addressnot populated — ECS recommends populatingsource.addressanddestination.addressalongsidesource.ip/destination.ipfor full ECS compliance.
Input Configuration
Data Stream: event (package: cato_networks)
File: packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs
Issue 1: Unsafe unguarded access to body.errors[0].message
Severity: 🟠 High
Location: packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs line 72
Problem: body.errors[0].message is accessed without first checking that body.errors exists and is non-empty. When eventsFeed is null (e.g., during rate limiting), the API may not include an errors array, causing a runtime panic.
Recommendation:
"message": "POST " + state.url.trim_right("/") + "/api/v1/graphql2" + ": " + (
has(body.errors) && size(body.errors) > 0 ?
body.errors[0].message
:
"eventsFeed returned null (possible rate limit or invalid API key)"
),
Issue 2: Only accounts[0] processed — multi-account responses silently dropped
Severity: 🟡 Medium
Location: packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs line 60
Problem: accounts[0] is hardcoded — only the first account's records are processed. If the API returns data for multiple accounts, all subsequent accounts' records are silently dropped.
Recommendation:
"events": body.data.eventsFeed.accounts.flatMap(acct, acct.records.map(record, {
"message": record.fieldsMap.encode_json(),
})),
Issue 3: Pagination termination uses strict equality (==) instead of >=
Severity: 🟡 Medium
Location: packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs line 66
Problem: body.data.eventsFeed.fetchedCount == state.max_page_limit uses == instead of >=. If fetchedCount ever exceeds max_page_limit, the condition will be false and pagination will stop prematurely even though more data exists.
Recommendation:
"want_more": body.data.eventsFeed.fetchedCount >= state.max_page_limit,
Issue 4: CEL program does not match celfmt canonical formatting
Severity: 🔵 Low
Location: packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs line 38
Problem: The CEL program compiles correctly but does not match celfmt canonical formatting. Changes include trailing commas, parenthesized ternary conditions, .with() argument style, and .map() body indentation.
Recommendation:
# Run celfmt on the CEL program and apply the resulting changes
celfmt packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs💡 Suggestions
- Cursor not preserved in error paths — The error paths (lines 69–76 and 79–93) do not return a
"cursor"key. If an error occurs mid-pagination, the cursor is not preserved and the next execution will restart from the beginning. Consider returning the current cursor in error paths.
Transform
✅ No transform files in this PR.
Summary
| Severity | Count |
|---|---|
| 🔴 Critical | 0 |
| 🟠 High | 3 |
| 🟡 Medium | 6 |
| 🔵 Low | 3 |
Total Actionable Items: 12
| - name: event | ||
| type: group | ||
| fields: | ||
| - name: account_id |
There was a problem hiding this comment.
🤖 AI Review: 🟡 All custom fields missing description properties — All ~90 custom fields under cato_networks.event are missing description properties. Every custom field definition must include a meaningful description explaining what the field contains.
Recommendation:
- name: account_id
type: keyword
description: |
Unique identifier of the Cato Networks account associated with this event.
- name: event_message
type: match_only_text
description: |
Human-readable message describing the event.🤖 AI-generated comment by Elastic Integration PR Review Bot
There was a problem hiding this comment.
schema is not available for the event data, hence description is not provided
| type: keyword | ||
| - name: time | ||
| type: date | ||
| - name: time_str |
There was a problem hiding this comment.
🤖 AI Review: 🟡 time_str field name implies string but is typed as date — time_str is named with a _str suffix implying a raw string representation, yet it is typed as date. If the pipeline parses this into a proper date, the name is misleading; if it is stored as-is (a string), the type is wrong.
Recommendation:
# Option A: if pipeline parses it
- name: time_parsed
type: date
description: Parsed timestamp of the event.
# Option B: if stored as raw string
- name: time_str
type: keyword
description: Raw string representation of the event timestamp.🤖 AI-generated comment by Elastic Integration PR Review Bot
There was a problem hiding this comment.
time_str is converted to date, so actual type will be date.
| type: keyword | ||
| - name: src_ip | ||
| type: ip | ||
| - name: src_is_site_or_vpn |
There was a problem hiding this comment.
🤖 AI Review: 🟡 src_is_site_or_vpn uses boolean-style is_ prefix but is typed as keyword — src_is_site_or_vpn uses the is_ boolean-style prefix but is typed as keyword. The name implies a true/false flag, but the type suggests it holds string values.
Recommendation:
# If boolean:
- name: src_is_site_or_vpn
type: boolean
# If string enum ("site" / "vpn"):
- name: src_connection_type
type: keyword💡 Suggestions
user_agent(line 193–194) — Consider adding amulti_fieldsentry withtype: textto enable full-text search on user agent strings, which are often queried with partial matches.mitre_attack_*fields (lines 117–122) — These fields likely hold arrays of values (multiple tactics/techniques per event). Consider documenting this in the description and verifying the pipeline sets them correctly as arrays.categoriesanddevice_categories(lines 31–32, 59–60) — Similarly likely to be multi-value; descriptions should clarify this.host_mac(line 97–98) — MAC addresses are sometimes stored askeyword; this is acceptable, but a description clarifying the format (e.g., colon-separated hex) would be helpful.
🤖 AI-generated comment by Elastic Integration PR Review Bot
There was a problem hiding this comment.
It contains keyword type of value e.g. SDP User
| tag: rename_internal_id | ||
| target_field: cato_networks.event.internal_id | ||
| ignore_missing: true | ||
| if: ctx.cato_networks.event.internal_id == null |
There was a problem hiding this comment.
🤖 AI Review: 🔴 Unsafe null-dereference in rename processor if condition — The if condition ctx.cato_networks.event.internal_id == null uses unguarded dot-notation access. If cato_networks or cato_networks.event does not exist in the document context (e.g., the preceding rename of json.internalId found no field), this will throw a NullPointerException and route the document to the global on_failure handler unexpectedly.
Recommendation:
- rename:
field: json.internal_id
tag: rename_internal_id
target_field: cato_networks.event.internal_id
ignore_missing: true
if: ctx.cato_networks?.event?.internal_id == null🤖 AI-generated comment by Elastic Integration PR Review Bot
| tag: rename_os_type | ||
| target_field: cato_networks.event.os_type | ||
| ignore_missing: true | ||
| - script: |
There was a problem hiding this comment.
🤖 AI Review: 🟡 The script processor for host.os.type mapping has no on_failure handler. If the script throws a runtime exception (e.g., unexpected type for os_type), the error will propagate to the global handler without a descriptive tag-based message.
Recommendation:
- script:
tag: script_map_host_os_type
lang: painless
if: ctx.cato_networks?.event?.os_type != null
# ... params and source ...
on_failure:
- append:
field: error.message
value: 'Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} failed with message: {{{_ingest.on_failure_message}}}'🤖 AI-generated comment by Elastic Integration PR Review Bot
| value: preserve_original_event | ||
| allow_duplicates: false | ||
| if: ctx.error?.message != null | ||
| on_failure: |
There was a problem hiding this comment.
🤖 AI Review: 💡 Global on_failure handler sets event.kind after error.message append — The global on_failure handler appends to error.message before setting event.kind: pipeline_error. The recommended order is to set event.kind first so that if the append itself fails, the document is still correctly classified.
Recommendation:
on_failure:
- set:
field: event.kind
tag: set_pipeline_error_to_event_kind
value: pipeline_error
- append:
field: error.message
value: >-
Processor '{{{_ingest.on_failure_processor_type}}}'
{{{#_ingest.on_failure_processor_tag}}}with tag '{{{_ingest.on_failure_processor_tag}}}'
{{{/_ingest.on_failure_processor_tag}}}failed with message '{{{_ingest.on_failure_message}}}'
- append:
field: tags
value: preserve_original_event
allow_duplicates: false💡 Suggestions
event.categorynot set — The pipeline setsevent.kind: eventandevent.type: infobut never setsevent.category. Consider conditionally settingevent.categorybased oncato_networks.event.event_typeorcato_networks.event.event_sub_type(e.g.,network,authentication,intrusion_detection).@timestampfallback missing —@timestampis set fromcato_networks.event.time_str(ISO8601) but there is no fallback tocato_networks.event.time(UNIX_MS) whentime_stris absent. Consider adding a conditionalsetfor `@time
🤖 AI-generated comment by Elastic Integration PR Review Bot
There was a problem hiding this comment.
In fact time_str and time both has the same value in different format and it is observed that time_str is always present in each log.
There was a problem hiding this comment.
event_type field values are not complied with the valid values defined for the event.category field, that's why it is not mapped.
| { | ||
| "events": { | ||
| "error": { | ||
| "message": "POST " + state.url.trim_right("/") + "/api/v1/graphql2" + ": " + body.errors[0].message |
There was a problem hiding this comment.
🤖 AI Review: 🔴 Unsafe unguarded access to body.errors[0].message — body.errors[0].message is accessed without first checking that body.errors exists and is non-empty. When eventsFeed is null (e.g., during rate limiting), the API may not include an errors array, causing a runtime panic.
Recommendation:
"message": "POST " + state.url.trim_right("/") + "/api/v1/graphql2" + ": " + (
has(body.errors) && size(body.errors) > 0 ?
body.errors[0].message
:
"eventsFeed returned null (possible rate limit or invalid API key)"
),
🤖 AI-generated comment by Elastic Integration PR Review Bot
| // in case of rate limit and wrong API key, StatusCode will be 200 and eventsFeed will be null | ||
| body.?data.eventsFeed.orValue(null) != null ? | ||
| { | ||
| "events": body.data.eventsFeed.accounts[0].records.map(record, { |
There was a problem hiding this comment.
🤖 AI Review: 🔴 Only accounts[0] processed — multi-account responses silently dropped — accounts[0] is hardcoded — only the first account's records are processed. If the API returns data for multiple accounts, all subsequent accounts' records are silently dropped.
Recommendation:
"events": body.data.eventsFeed.accounts.flatMap(acct, acct.records.map(record, {
"message": record.fieldsMap.encode_json(),
})),
🤖 AI-generated comment by Elastic Integration PR Review Bot
There was a problem hiding this comment.
We are fetching only single account data, so there is always one object inside the accounts
| "cursor": { | ||
| ?"marker": body.?data.eventsFeed.marker, | ||
| }, | ||
| "want_more": body.data.eventsFeed.fetchedCount == state.max_page_limit, |
There was a problem hiding this comment.
🤖 AI Review: 🟡 Pagination termination uses strict equality (==) instead of >= — body.data.eventsFeed.fetchedCount == state.max_page_limit uses == instead of >=. If fetchedCount ever exceeds max_page_limit, the condition will be false and pagination will stop prematurely even though more data exists.
Recommendation:
"want_more": body.data.eventsFeed.fetchedCount >= state.max_page_limit,
🤖 AI-generated comment by Elastic Integration PR Review Bot
| fields: | ||
| - api_key | ||
| program: | | ||
| state.with( |
There was a problem hiding this comment.
🤖 AI Review: 🟡 CEL program does not match celfmt canonical formatting — The CEL program compiles correctly but does not match celfmt canonical formatting. Changes include trailing commas, parenthesized ternary conditions, .with() argument style, and .map() body indentation.
Recommendation:
# Run celfmt on the CEL program and apply the resulting changes
celfmt packages/cato_networks/data_stream/event/agent/stream/cel.yml.hbs💡 Suggestions
- Cursor not preserved in error paths — The error paths (lines 69–76 and 79–93) do not return a
"cursor"key. If an error occurs mid-pagination, the cursor is not preserved and the next execution will restart from the beginning. Consider returning the current cursor in error paths.
🤖 AI-generated comment by Elastic Integration PR Review Bot
|
Resolved below mentioned applicable comments given by bot: Pipeline
suggestions:
Input Configuration
|
There was a problem hiding this comment.
Thanks @sharadcrest. Data coverage looks strong and overall traffic direction, DNS record types, risk levels, auth types, geo map and MITRE all present well.
Can we make risk level more prominent on top versus raw event count? Right now the dashboard opens with a raw event count time series, but it would be really useful if an analyst could see how many High and Critical events are in the current window and whether that number is trending up or down. Can we stack or split the existing time series by risk level?
This may also tie to the 44.18% UNKNOWN risk level in “Events By Risk Level” - nearly half of events have an unknown risk level. Is this a mapping issue?
There was a problem hiding this comment.
Can we make risk level more prominent on top versus raw event count? Right now the dashboard opens with a raw event count time series, but it would be really useful if an analyst could see how many High and Critical events are in the current window and whether that number is trending up or down. Can we stack or split the existing time series by risk level?
Updated the line chart to include a breakdown by host.risk_level.
This may also tie to the 44.18% UNKNOWN risk level in “Events By Risk Level” - nearly half of events have an unknown risk level. Is this a mapping issue?
This does not appear to be a mapping issue. After collecting a larger set of events, the distribution changed, and the majority of events are now categorized under the Medium risk level.
Proposed commit message
The initial release includes event data stream and associated dashboard.
Cato Networks fields are mapped to their corresponding ECS fields where possible.
Test samples were derived from live data samples, which were subsequently
sanitized.
Checklist
changelog.ymlfile.How to test this PR locally
To test the cato networks package:
Screenshots
Implementation Details
API Docs Referred
Default Values
Note: API gives only event data generated between two request hence there is no initial interval and data is collected live.
RateLimit