Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feed Elastic: fetch in batches #31377

Merged
merged 9 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

'''VARIABLES FOR FETCH INDICATORS'''
FETCH_SIZE = 50
FETCH_LIMIT = 10000
API_KEY_PREFIX = '_api_key_id:'
MODULE_TO_FEEDMAP_KEY = 'moduleToFeedMap'
FEED_TYPE_GENERIC = 'Generic Feed'
Expand Down Expand Up @@ -95,8 +96,8 @@ def _get_api_key_header_val(api_key):
<https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html>`
:arg api_key, either a tuple or a base64 encoded string
"""
if isinstance(api_key, (tuple, list)):
s = "{0}:{1}".format(api_key[0], api_key[1]).encode('utf-8')
if isinstance(api_key, tuple | list):
s = f"{api_key[0]}:{api_key[1]}".encode()
return "ApiKey " + base64.b64encode(s).decode('utf-8')
return "ApiKey " + api_key

Expand Down Expand Up @@ -159,7 +160,7 @@ def test_command(client, feed_type, src_val, src_type, default_type, time_method

else:
# if it is unknown error - get the message from the error itself
return_error("Failed to connect. The following error occurred: {}".format(str(e)))
return_error(f"Failed to connect. The following error occurred: {str(e)}")

except requests.exceptions.RequestException as e:
return_error("Failed to connect. Check Server URL field and port number.\nError message: " + str(e))
Expand All @@ -178,7 +179,7 @@ def get_indicators_command(client, feed_type, src_val, src_type, default_type):
# Insight is the name of the indicator object as it's saved into the database
search = get_scan_insight_format(client, now, feed_type=feed_type)
ioc_lst, ioc_enrch_lst = get_demisto_indicators(search, client.tags, client.tlp_color)
hr = tableToMarkdown('Indicators', list(set(map(lambda ioc: ioc.get('name'), ioc_lst))), 'Name')
hr = tableToMarkdown('Indicators', list({ioc.get('name') for ioc in ioc_lst}), 'Name')
if ioc_enrch_lst:
for ioc_enrch in ioc_enrch_lst:
hr += tableToMarkdown('Enrichment', ioc_enrch, ['value', 'sourceBrand', 'score'])
Expand All @@ -188,9 +189,9 @@ def get_indicators_command(client, feed_type, src_val, src_type, default_type):
def get_generic_indicators(search, src_val, src_type, default_type, tags, tlp_color):
"""Implements get indicators in generic format"""
ioc_lst: list = []
for hit in search.scan():
hit_lst = extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, tags, tlp_color)
ioc_lst.extend(hit_lst)
hit = search.execute()
hit_lst = extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, tags, tlp_color)
ioc_lst.extend(hit_lst)
return ioc_lst


Expand All @@ -208,36 +209,53 @@ def get_demisto_indicators(search, tags, tlp_color):
return ioc_lst, ioc_enrch_lst


def fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch):
def update_last_fetch(client, ioc_lst):
last_calculated_time = None
last_ids = []
for ioc in reversed(ioc_lst):
calculate_time = dateparser.parse(ioc.get(client.time_field))
if calculate_time and (not last_calculated_time or calculate_time >= last_calculated_time):
last_calculated_time = calculate_time
last_ids.append(ioc.get('id'))
else:
break
if last_calculated_time is None:
last_calculated_time = datetime.now()
ilaner marked this conversation as resolved.
Show resolved Hide resolved
return last_calculated_time, last_ids


def fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch, fetch_limit):
"""Implements fetch-indicators command"""
last_fetch_timestamp = get_last_fetch_timestamp(last_fetch, client.time_method, client.fetch_time)
prev_iocs_ids = demisto.getLastRun().get("ids", [])
now = datetime.now()
ioc_lst: list = []
ioc_enrch_lst: list = []
if FEED_TYPE_GENERIC not in feed_type:
# Insight is the name of the indicator object as it's saved into the database
search = get_scan_insight_format(client, now, last_fetch_timestamp, feed_type)
for hit in search.scan():
for hit in search if client.time_field else search.scan: # if time field is not set we have to scan all
hit_lst, hit_enrch_lst = extract_indicators_from_insight_hit(hit, tags=client.tags,
tlp_color=client.tlp_color)
ioc_lst.extend(hit_lst)
ioc_enrch_lst.extend(hit_enrch_lst)
else:
search = get_scan_generic_format(client, now, last_fetch_timestamp)
for hit in search.scan():
search = get_scan_generic_format(client, now, last_fetch_timestamp, fetch_limit)
for hit in search if client.time_field else search.scan: # if time field is not set we have to scan all
ioc_lst.extend(extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, client.tags,
client.tlp_color))

ioc_lst = list(filter(lambda ioc: ioc.get("id") not in prev_iocs_ids, ioc_lst))
if ioc_lst:
for b in batch(ioc_lst, batch_size=2000):
demisto.createIndicators(b)
last_calculated_time, last_ids = update_last_fetch(client, ioc_lst)
if ioc_enrch_lst:
ioc_enrch_batches = create_enrichment_batches(ioc_enrch_lst)
for enrch_batch in ioc_enrch_batches:
# ensure batch sizes don't exceed 2000
for b in batch(enrch_batch, batch_size=2000):
demisto.createIndicators(b)
demisto.setLastRun({'time': int(now.timestamp() * 1000)})
demisto.setLastRun({'time': int(last_calculated_time.timestamp() * 1000), 'ids': last_ids})


def get_last_fetch_timestamp(last_fetch, time_method, fetch_time):
Expand All @@ -253,7 +271,7 @@ def get_last_fetch_timestamp(last_fetch, time_method, fetch_time):
return last_fetch_timestamp


def get_scan_generic_format(client, now, last_fetch_timestamp=None):
def get_scan_generic_format(client, now, last_fetch_timestamp=None, fetch_limit=FETCH_LIMIT):
"""Gets a scan object in generic format"""
# if method is simple date - convert the date string to datetime
es = client.es
Expand All @@ -264,9 +282,9 @@ def get_scan_generic_format(client, now, last_fetch_timestamp=None):
if time_field:
query = QueryString(query=time_field + ':*')
range_field = {
time_field: {'gt': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'gte': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'lte': now}}
search = Search(using=es, index=fetch_index).filter({'range': range_field}).query(query)
search = Search(using=es, index=fetch_index).filter({'range': range_field}).extra(size=fetch_limit).sort().query(query)
else:
search = Search(using=es, index=fetch_index).query(QueryString(query=client.query))
return search
Expand All @@ -281,11 +299,11 @@ def extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, ta
return ioc_lst


def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=None):
def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=None, fetch_limit=FETCH_LIMIT):
"""Gets a scan object in insight format"""
time_field = client.time_field
range_field = {
time_field: {'gt': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'gte': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'lte': now}}
es = client.es
query = QueryString(query=time_field + ":*")
Expand All @@ -298,7 +316,9 @@ def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=No
indices += f',-*{tenant_hash}*-shared*'
elif not indices:
indices = '_all'
search = Search(using=es, index=indices).filter({'range': range_field}).query(query)
search = Search(using=es, index=indices).filter({'range': range_field}).extra(
size=fetch_limit).sort({time_field: {'order': 'asc'}}).query(query)

return search


Expand Down Expand Up @@ -362,7 +382,7 @@ def create_enrichment_batches(ioc_enrch_lst):

def main():
try:
LOG('command is %s' % (demisto.command(),))
LOG(f'command is {demisto.command()}')
params = demisto.params()
server = params.get('url', '').rstrip('/')
creds = params.get('credentials')
Expand All @@ -375,6 +395,9 @@ def main():
time_method = params.get('time_method')
fetch_index = params.get('fetch_index')
fetch_time = params.get('fetch_time', '3 days')
fetch_limit = arg_to_number(params.get('fetch_limit', 10000))
if not fetch_limit or fetch_limit > 10_000:
raise DemistoException(f"Fetch limit must be between 1-10,000, got {fetch_limit}")
query = params.get('es_query')
api_id, api_key = extract_api_from_username_password(username, password)
client = ElasticsearchClient(insecure, server, username, password, api_key, api_id, time_field, time_method,
Expand All @@ -388,11 +411,11 @@ def main():
test_command(client, feed_type, src_val, src_type, default_type, time_method, time_field, fetch_time, query,
username, password, api_key, api_id)
elif demisto.command() == 'fetch-indicators':
fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch)
fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch, fetch_limit)
elif demisto.command() == 'es-get-indicators':
get_indicators_command(client, feed_type, src_val, src_type, default_type)
except Exception as e:
return_error("Failed executing {}.\nError message: {}".format(demisto.command(), str(e)))
return_error(f"Failed executing {demisto.command()}.\nError message: {str(e)}")


if __name__ in ("__main__", "__builtin__", "builtins"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ configuration:
name: fetch_time
type: 0
required: false
- defaultvalue: '10000'
display: Fetch Limit
name: fetch_limit
type: 0
required: false
- additionalinfo: Indicators from this integration instance will be marked with this reputation
defaultvalue: feedInstanceReputationNotSet
display: Indicator Reputation
Expand All @@ -60,10 +65,9 @@ configuration:
- Good
- Suspicious
- Bad
type: 18
required: false
type: 18
- additionalinfo: Reliability of the source providing the intelligence data
defaultvalue: F - Reliability cannot be judged
display: Source Reliability
name: feedReliability
options:
Expand All @@ -73,10 +77,10 @@ configuration:
- D - Not usually reliable
- E - Unreliable
- F - Reliability cannot be judged
required: true
type: 15
- additionalinfo: The Traffic Light Protocol (TLP) designation to apply to indicators fetched from the feed
display: Traffic Light Protocol Color
required: true
defaultvalue: F - Reliability cannot be judged
- display: 'Traffic Light Protocol Color'
name: tlp_color
options:
- RED
Expand All @@ -85,26 +89,27 @@ configuration:
- WHITE
type: 15
required: false
- defaultvalue: indicatorType
additionalinfo: The Traffic Light Protocol (TLP) designation to apply to indicators fetched from the feed
- defaultvalue: 'indicatorType'
display: ''
name: feedExpirationPolicy
type: 17
required: false
options:
- never
- interval
- indicatorType
- suddenDeath
type: 17
required: false
- defaultvalue: '20160'
display: ''
name: feedExpirationInterval
type: 1
required: false
- defaultvalue: '240'
display: Feed Fetch Interval
- display: Feed Fetch Interval
name: feedFetchInterval
type: 19
required: false
defaultvalue: '1'
- additionalinfo: Supports CSV values.
display: Tags
name: feedTags
Expand All @@ -130,20 +135,20 @@ configuration:
name: default_type
type: 0
required: false
- additionalinfo: A comma-separated list of indexes. If empty, searches all indexes.
display: Index from Which To Fetch Indicators
- display: Index from Which To Fetch Indicators
name: fetch_index
type: 0
required: false
- defaultvalue: Simple-Date
display: Time Field Type
additionalinfo: A comma-separated list of indexes. If empty, searches all indexes.
- display: Time Field Type
name: time_method
type: 15
required: false
defaultvalue: Simple-Date
options:
- Simple-Date
- Timestamp-Seconds
- Timestamp-Milliseconds
type: 15
required: false
- additionalinfo: Used for sorting and limiting data. If empty, results are not sorted.
display: Index Time Field
name: time_field
Expand All @@ -167,7 +172,7 @@ script:
required: true
description: Gets indicators available in the configured Elasticsearch database.
name: es-get-indicators
dockerimage: demisto/elasticsearch:1.0.0.77444
dockerimage: demisto/elasticsearch:1.0.0.83352
feed: true
runonce: false
script: '-'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from CommonServerPython import *
from dataclasses import dataclass


@dataclass
class MockClient:
time_field = "calculatedTime"


class MockHit:
Expand Down Expand Up @@ -291,3 +297,14 @@ def test_extract_api_from_username_password_username_api_key():
import FeedElasticsearch as esf
username = esf.API_KEY_PREFIX + 'api_id'
assert esf.extract_api_from_username_password(username, 'api_key') == ('api_id', 'api_key')


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check recommit error

def test_last_run():
from FeedElasticsearch import update_last_fetch
ioc_lst = [{"id": "1", "calculatedTime": "2023-01-17T14:30:00.000Z"},
{"id": "2", "calculatedTime": "2023-01-17T14:32:00.000Z"},
{"id": "3", "calculatedTime": "2023-01-17T14:33:00.000Z"},
{"id": "4", "calculatedTime": "2023-01-17T14:33:00.000Z"}]
last_update, last_ids = update_last_fetch(MockClient(), ioc_lst)
assert set(last_ids) == {"4", "3"}
assert last_update.isoformat() == "2023-01-17T14:33:00+00:00"
7 changes: 7 additions & 0 deletions Packs/FeedElasticsearch/ReleaseNotes/1_1_3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

#### Integrations

##### Elasticsearch Feed
- Updated the Docker image to: *demisto/elasticsearch:1.0.0.83352*.

- Added `fetch-limit` parameter to fetch indicators in batches.
2 changes: 1 addition & 1 deletion Packs/FeedElasticsearch/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "Elasticsearch Feed",
"description": "Indicators feed from Elasticsearch database",
"support": "xsoar",
"currentVersion": "1.1.2",
"currentVersion": "1.1.3",
"author": "Cortex XSOAR",
"url": "https://www.paloaltonetworks.com/cortex",
"email": "",
Expand Down