Skip to content

Commit

Permalink
Feed Elastic: fetch in batches (#31377)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilaner committed Dec 13, 2023
1 parent abce390 commit be51007
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

'''VARIABLES FOR FETCH INDICATORS'''
FETCH_SIZE = 50
FETCH_LIMIT = 10000
API_KEY_PREFIX = '_api_key_id:'
MODULE_TO_FEEDMAP_KEY = 'moduleToFeedMap'
FEED_TYPE_GENERIC = 'Generic Feed'
Expand Down Expand Up @@ -95,8 +96,8 @@ def _get_api_key_header_val(api_key):
<https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html>`
:arg api_key, either a tuple or a base64 encoded string
"""
if isinstance(api_key, (tuple, list)):
s = "{0}:{1}".format(api_key[0], api_key[1]).encode('utf-8')
if isinstance(api_key, tuple | list):
s = f"{api_key[0]}:{api_key[1]}".encode()
return "ApiKey " + base64.b64encode(s).decode('utf-8')
return "ApiKey " + api_key

Expand Down Expand Up @@ -159,7 +160,7 @@ def test_command(client, feed_type, src_val, src_type, default_type, time_method

else:
# if it is unknown error - get the message from the error itself
return_error("Failed to connect. The following error occurred: {}".format(str(e)))
return_error(f"Failed to connect. The following error occurred: {str(e)}")

except requests.exceptions.RequestException as e:
return_error("Failed to connect. Check Server URL field and port number.\nError message: " + str(e))
Expand All @@ -178,7 +179,7 @@ def get_indicators_command(client, feed_type, src_val, src_type, default_type):
# Insight is the name of the indicator object as it's saved into the database
search = get_scan_insight_format(client, now, feed_type=feed_type)
ioc_lst, ioc_enrch_lst = get_demisto_indicators(search, client.tags, client.tlp_color)
hr = tableToMarkdown('Indicators', list(set(map(lambda ioc: ioc.get('name'), ioc_lst))), 'Name')
hr = tableToMarkdown('Indicators', list({ioc.get('name') for ioc in ioc_lst}), 'Name')
if ioc_enrch_lst:
for ioc_enrch in ioc_enrch_lst:
hr += tableToMarkdown('Enrichment', ioc_enrch, ['value', 'sourceBrand', 'score'])
Expand All @@ -188,9 +189,9 @@ def get_indicators_command(client, feed_type, src_val, src_type, default_type):
def get_generic_indicators(search, src_val, src_type, default_type, tags, tlp_color):
"""Implements get indicators in generic format"""
ioc_lst: list = []
for hit in search.scan():
hit_lst = extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, tags, tlp_color)
ioc_lst.extend(hit_lst)
hit = search.execute()
hit_lst = extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, tags, tlp_color)
ioc_lst.extend(hit_lst)
return ioc_lst


Expand All @@ -208,36 +209,53 @@ def get_demisto_indicators(search, tags, tlp_color):
return ioc_lst, ioc_enrch_lst


def fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch):
def update_last_fetch(client, ioc_lst):
last_calculated_time = None
last_ids = []
for ioc in reversed(ioc_lst):
calculate_time = dateparser.parse(ioc.get(client.time_field))
if calculate_time and (not last_calculated_time or calculate_time >= last_calculated_time):
last_calculated_time = calculate_time
last_ids.append(ioc.get('id'))
else:
break
if last_calculated_time is None:
last_calculated_time = datetime.now()
return last_calculated_time, last_ids


def fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch, fetch_limit):
"""Implements fetch-indicators command"""
last_fetch_timestamp = get_last_fetch_timestamp(last_fetch, client.time_method, client.fetch_time)
prev_iocs_ids = demisto.getLastRun().get("ids", [])
now = datetime.now()
ioc_lst: list = []
ioc_enrch_lst: list = []
if FEED_TYPE_GENERIC not in feed_type:
# Insight is the name of the indicator object as it's saved into the database
search = get_scan_insight_format(client, now, last_fetch_timestamp, feed_type)
for hit in search.scan():
for hit in search if client.time_field else search.scan: # if time field is not set we have to scan all
hit_lst, hit_enrch_lst = extract_indicators_from_insight_hit(hit, tags=client.tags,
tlp_color=client.tlp_color)
ioc_lst.extend(hit_lst)
ioc_enrch_lst.extend(hit_enrch_lst)
else:
search = get_scan_generic_format(client, now, last_fetch_timestamp)
for hit in search.scan():
search = get_scan_generic_format(client, now, last_fetch_timestamp, fetch_limit)
for hit in search if client.time_field else search.scan: # if time field is not set we have to scan all
ioc_lst.extend(extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, client.tags,
client.tlp_color))

ioc_lst = list(filter(lambda ioc: ioc.get("id") not in prev_iocs_ids, ioc_lst))
if ioc_lst:
for b in batch(ioc_lst, batch_size=2000):
demisto.createIndicators(b)
last_calculated_time, last_ids = update_last_fetch(client, ioc_lst)
if ioc_enrch_lst:
ioc_enrch_batches = create_enrichment_batches(ioc_enrch_lst)
for enrch_batch in ioc_enrch_batches:
# ensure batch sizes don't exceed 2000
for b in batch(enrch_batch, batch_size=2000):
demisto.createIndicators(b)
demisto.setLastRun({'time': int(now.timestamp() * 1000)})
demisto.setLastRun({'time': int(last_calculated_time.timestamp() * 1000), 'ids': last_ids})


def get_last_fetch_timestamp(last_fetch, time_method, fetch_time):
Expand All @@ -253,7 +271,7 @@ def get_last_fetch_timestamp(last_fetch, time_method, fetch_time):
return last_fetch_timestamp


def get_scan_generic_format(client, now, last_fetch_timestamp=None):
def get_scan_generic_format(client, now, last_fetch_timestamp=None, fetch_limit=FETCH_LIMIT):
"""Gets a scan object in generic format"""
# if method is simple date - convert the date string to datetime
es = client.es
Expand All @@ -264,9 +282,9 @@ def get_scan_generic_format(client, now, last_fetch_timestamp=None):
if time_field:
query = QueryString(query=time_field + ':*')
range_field = {
time_field: {'gt': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'gte': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'lte': now}}
search = Search(using=es, index=fetch_index).filter({'range': range_field}).query(query)
search = Search(using=es, index=fetch_index).filter({'range': range_field}).extra(size=fetch_limit).sort().query(query)
else:
search = Search(using=es, index=fetch_index).query(QueryString(query=client.query))
return search
Expand All @@ -281,11 +299,11 @@ def extract_indicators_from_generic_hit(hit, src_val, src_type, default_type, ta
return ioc_lst


def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=None):
def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=None, fetch_limit=FETCH_LIMIT):
"""Gets a scan object in insight format"""
time_field = client.time_field
range_field = {
time_field: {'gt': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'gte': last_fetch_timestamp, 'lte': now}} if last_fetch_timestamp else {
time_field: {'lte': now}}
es = client.es
query = QueryString(query=time_field + ":*")
Expand All @@ -298,7 +316,9 @@ def get_scan_insight_format(client, now, last_fetch_timestamp=None, feed_type=No
indices += f',-*{tenant_hash}*-shared*'
elif not indices:
indices = '_all'
search = Search(using=es, index=indices).filter({'range': range_field}).query(query)
search = Search(using=es, index=indices).filter({'range': range_field}).extra(
size=fetch_limit).sort({time_field: {'order': 'asc'}}).query(query)

return search


Expand Down Expand Up @@ -362,7 +382,7 @@ def create_enrichment_batches(ioc_enrch_lst):

def main():
try:
LOG('command is %s' % (demisto.command(),))
LOG(f'command is {demisto.command()}')
params = demisto.params()
server = params.get('url', '').rstrip('/')
creds = params.get('credentials')
Expand All @@ -375,6 +395,9 @@ def main():
time_method = params.get('time_method')
fetch_index = params.get('fetch_index')
fetch_time = params.get('fetch_time', '3 days')
fetch_limit = arg_to_number(params.get('fetch_limit', 10000))
if not fetch_limit or fetch_limit > 10_000:
raise DemistoException(f"Fetch limit must be between 1-10,000, got {fetch_limit}")
query = params.get('es_query')
api_id, api_key = extract_api_from_username_password(username, password)
client = ElasticsearchClient(insecure, server, username, password, api_key, api_id, time_field, time_method,
Expand All @@ -388,11 +411,11 @@ def main():
test_command(client, feed_type, src_val, src_type, default_type, time_method, time_field, fetch_time, query,
username, password, api_key, api_id)
elif demisto.command() == 'fetch-indicators':
fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch)
fetch_indicators_command(client, feed_type, src_val, src_type, default_type, last_fetch, fetch_limit)
elif demisto.command() == 'es-get-indicators':
get_indicators_command(client, feed_type, src_val, src_type, default_type)
except Exception as e:
return_error("Failed executing {}.\nError message: {}".format(demisto.command(), str(e)))
return_error(f"Failed executing {demisto.command()}.\nError message: {str(e)}")


if __name__ in ("__main__", "__builtin__", "builtins"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ configuration:
name: fetch_time
type: 0
required: false
- defaultvalue: '10000'
display: Fetch Limit
name: fetch_limit
type: 0
required: false
- additionalinfo: Indicators from this integration instance will be marked with this reputation
defaultvalue: feedInstanceReputationNotSet
display: Indicator Reputation
Expand All @@ -60,10 +65,9 @@ configuration:
- Good
- Suspicious
- Bad
type: 18
required: false
type: 18
- additionalinfo: Reliability of the source providing the intelligence data
defaultvalue: F - Reliability cannot be judged
display: Source Reliability
name: feedReliability
options:
Expand All @@ -73,10 +77,10 @@ configuration:
- D - Not usually reliable
- E - Unreliable
- F - Reliability cannot be judged
required: true
type: 15
- additionalinfo: The Traffic Light Protocol (TLP) designation to apply to indicators fetched from the feed
display: Traffic Light Protocol Color
required: true
defaultvalue: F - Reliability cannot be judged
- display: 'Traffic Light Protocol Color'
name: tlp_color
options:
- RED
Expand All @@ -85,26 +89,27 @@ configuration:
- WHITE
type: 15
required: false
- defaultvalue: indicatorType
additionalinfo: The Traffic Light Protocol (TLP) designation to apply to indicators fetched from the feed
- defaultvalue: 'indicatorType'
display: ''
name: feedExpirationPolicy
type: 17
required: false
options:
- never
- interval
- indicatorType
- suddenDeath
type: 17
required: false
- defaultvalue: '20160'
display: ''
name: feedExpirationInterval
type: 1
required: false
- defaultvalue: '240'
display: Feed Fetch Interval
- display: Feed Fetch Interval
name: feedFetchInterval
type: 19
required: false
defaultvalue: '1'
- additionalinfo: Supports CSV values.
display: Tags
name: feedTags
Expand All @@ -130,20 +135,20 @@ configuration:
name: default_type
type: 0
required: false
- additionalinfo: A comma-separated list of indexes. If empty, searches all indexes.
display: Index from Which To Fetch Indicators
- display: Index from Which To Fetch Indicators
name: fetch_index
type: 0
required: false
- defaultvalue: Simple-Date
display: Time Field Type
additionalinfo: A comma-separated list of indexes. If empty, searches all indexes.
- display: Time Field Type
name: time_method
type: 15
required: false
defaultvalue: Simple-Date
options:
- Simple-Date
- Timestamp-Seconds
- Timestamp-Milliseconds
type: 15
required: false
- additionalinfo: Used for sorting and limiting data. If empty, results are not sorted.
display: Index Time Field
name: time_field
Expand All @@ -167,7 +172,7 @@ script:
required: true
description: Gets indicators available in the configured Elasticsearch database.
name: es-get-indicators
dockerimage: demisto/elasticsearch:1.0.0.77444
dockerimage: demisto/elasticsearch:1.0.0.83352
feed: true
runonce: false
script: '-'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from CommonServerPython import *
from dataclasses import dataclass


@dataclass
class MockClient:
time_field = "calculatedTime"


class MockHit:
Expand Down Expand Up @@ -291,3 +297,14 @@ def test_extract_api_from_username_password_username_api_key():
import FeedElasticsearch as esf
username = esf.API_KEY_PREFIX + 'api_id'
assert esf.extract_api_from_username_password(username, 'api_key') == ('api_id', 'api_key')


def test_last_run():
from FeedElasticsearch import update_last_fetch
ioc_lst = [{"id": "1", "calculatedTime": "2023-01-17T14:30:00.000Z"},
{"id": "2", "calculatedTime": "2023-01-17T14:32:00.000Z"},
{"id": "3", "calculatedTime": "2023-01-17T14:33:00.000Z"},
{"id": "4", "calculatedTime": "2023-01-17T14:33:00.000Z"}]
last_update, last_ids = update_last_fetch(MockClient(), ioc_lst)
assert set(last_ids) == {"4", "3"}
assert last_update.isoformat() == "2023-01-17T14:33:00+00:00"
7 changes: 7 additions & 0 deletions Packs/FeedElasticsearch/ReleaseNotes/1_1_3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

#### Integrations

##### Elasticsearch Feed
- Updated the Docker image to: *demisto/elasticsearch:1.0.0.83352*.

- Added `fetch-limit` parameter to fetch indicators in batches.
2 changes: 1 addition & 1 deletion Packs/FeedElasticsearch/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "Elasticsearch Feed",
"description": "Indicators feed from Elasticsearch database",
"support": "xsoar",
"currentVersion": "1.1.2",
"currentVersion": "1.1.3",
"author": "Cortex XSOAR",
"url": "https://www.paloaltonetworks.com/cortex",
"email": "",
Expand Down

0 comments on commit be51007

Please sign in to comment.