Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FeedTAXII2: return report object with the relationships #27860

Merged
merged 12 commits into from Jul 4, 2023
119 changes: 70 additions & 49 deletions Packs/ApiModules/Scripts/TAXII2ApiModule/TAXII2ApiModule.py
Expand Up @@ -3,7 +3,7 @@
from CommonServerPython import *
from CommonServerUserPython import *

from typing import Optional, List, Dict, Tuple
from typing import Optional
from requests.sessions import merge_setting, CaseInsensitiveDict
import re
import copy
Expand Down Expand Up @@ -187,7 +187,7 @@ def __init__(
collection_to_fetch,
proxies,
verify: bool,
objects_to_fetch: List[str],
objects_to_fetch: list[str],
skip_complex_mode: bool = False,
username: Optional[str] = None,
password: Optional[str] = None,
Expand Down Expand Up @@ -266,7 +266,7 @@ def __init__(
re.compile(CIDR_ISSUBSET_VAL_PATTERN),
re.compile(CIDR_ISUPPERSET_VAL_PATTERN),
]
self.id_to_object: Dict[str, Any] = {}
self.id_to_object: dict[str, Any] = {}
self.objects_to_fetch = objects_to_fetch
self.default_api_root = default_api_root
self.update_custom_fields = update_custom_fields
Expand Down Expand Up @@ -349,7 +349,7 @@ def init_collections(self):
"""
Collects available taxii collections
"""
self.collections = [x for x in self.api_root.collections] # type: ignore[union-attr, attr-defined, assignment]
self.collections = list(self.api_root.collections) # type: ignore[union-attr, attr-defined, assignment]

def init_collection_to_fetch(self, collection_to_fetch=None):
"""
Expand Down Expand Up @@ -391,7 +391,7 @@ def build_certificate(cert_var):
return cf.name

@staticmethod
def get_indicator_publication(indicator: Dict[str, Any]):
def get_indicator_publication(indicator: dict[str, Any]):
"""
Build publications grid field from the indicator external_references field

Expand All @@ -410,23 +410,42 @@ def get_indicator_publication(indicator: Dict[str, Any]):
return publications

@staticmethod
def change_attack_pattern_to_stix_attack_pattern(indicator: Dict[str, Any]):
def change_attack_pattern_to_stix_attack_pattern(indicator: dict[str, Any]):
indicator['type'] = f'STIX {indicator["type"]}'
indicator['fields']['stixkillchainphases'] = indicator['fields'].pop('killchainphases', None)
indicator['fields']['stixdescription'] = indicator['fields'].pop('description', None)

return indicator

@staticmethod
def is_sub_report(report_obj: Dict[str, Any]) -> bool:
def is_sub_report(report_obj: dict[str, Any]) -> bool:
obj_refs = report_obj.get('object_refs', [])
for obj_ref in obj_refs:
if obj_ref.startswith('report--'):
return False
return True
return all(not obj_ref.startswith("report--") for obj_ref in obj_refs)

@staticmethod
def get_ioc_type(indicator: str, id_to_object: Dict[str, Dict[str, Any]]) -> str:
def parse_report_relationships(report_obj: dict[str, Any],
id_to_object: dict[str, dict[str, Any]]) -> list[EntityRelationship]:
obj_refs = report_obj.get('object_refs', [])
relationships: list[EntityRelationship] = []
for related_obj in obj_refs:
# relationships objects ref handled in parse_relationships
if not related_obj.startswith('relationship--'):
entity_b_obj = id_to_object.get(related_obj, {})
entity_b_name = entity_b_obj and entity_b_obj.get('name') or related_obj
entity_b_type = entity_b_obj and entity_b_obj.get('type') or related_obj.split('--')[0]
relationships.append(
EntityRelationship(
name='related-to',
entity_a=report_obj.get('name'),
entity_a_type=ThreatIntel.ObjectsNames.REPORT,
entity_b=entity_b_name,
entity_b_type=STIX_2_TYPES_TO_CORTEX_TYPES.get(entity_b_type, '')
)
)
return relationships

@staticmethod
def get_ioc_type(indicator: str, id_to_object: dict[str, dict[str, Any]]) -> str:
"""
Get IOC type by extracting it from the pattern field.

Expand Down Expand Up @@ -500,7 +519,7 @@ def parse_custom_fields(extensions):
break
return custom_fields, score

def parse_indicator(self, indicator_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_indicator(self, indicator_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single indicator object
:param indicator_obj: indicator object
Expand Down Expand Up @@ -541,7 +560,7 @@ def parse_indicator(self, indicator_obj: Dict[str, Any]) -> List[Dict[str, Any]]

return indicators

def parse_attack_pattern(self, attack_pattern_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_attack_pattern(self, attack_pattern_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single attack pattern object
:param attack_pattern_obj: attack pattern object
Expand Down Expand Up @@ -580,15 +599,12 @@ def parse_attack_pattern(self, attack_pattern_obj: Dict[str, Any]) -> List[Dict[

return [attack_pattern]

def parse_report(self, report_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_report(self, report_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single report object
:param report_obj: report object
:return: report extracted from the report object in cortex format
"""
if self.is_sub_report(report_obj):
return []

report = {
"type": ThreatIntel.ObjectsNames.REPORT,
"value": report_obj.get('name'),
Expand All @@ -613,9 +629,11 @@ def parse_report(self, report_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

report["fields"] = fields

report['relationships'] = self.parse_report_relationships(report_obj, self.id_to_object)

return [report]

def parse_threat_actor(self, threat_actor_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_threat_actor(self, threat_actor_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single threat actor object
:param threat_actor_obj: report object
Expand Down Expand Up @@ -653,7 +671,7 @@ def parse_threat_actor(self, threat_actor_obj: Dict[str, Any]) -> List[Dict[str,

return [threat_actor]

def parse_infrastructure(self, infrastructure_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_infrastructure(self, infrastructure_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single infrastructure object
:param infrastructure_obj: infrastructure object
Expand Down Expand Up @@ -689,7 +707,7 @@ def parse_infrastructure(self, infrastructure_obj: Dict[str, Any]) -> List[Dict[

return [infrastructure]

def parse_malware(self, malware_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_malware(self, malware_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single malware object
:param malware_obj: malware object
Expand Down Expand Up @@ -731,7 +749,7 @@ def parse_malware(self, malware_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

return [malware]

def parse_tool(self, tool_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_tool(self, tool_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single tool object
:param tool_obj: tool object
Expand Down Expand Up @@ -766,7 +784,7 @@ def parse_tool(self, tool_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

return [tool]

def parse_course_of_action(self, coa_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_course_of_action(self, coa_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single course of action object
:param coa_obj: course of action object
Expand Down Expand Up @@ -799,7 +817,7 @@ def parse_course_of_action(self, coa_obj: Dict[str, Any]) -> List[Dict[str, Any]

return [course_of_action]

def parse_campaign(self, campaign_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_campaign(self, campaign_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single campaign object
:param campaign_obj: campaign object
Expand Down Expand Up @@ -827,7 +845,7 @@ def parse_campaign(self, campaign_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

return [campaign]

def parse_intrusion_set(self, intrusion_set_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_intrusion_set(self, intrusion_set_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single intrusion set object
:param intrusion_set_obj: intrusion set object
Expand Down Expand Up @@ -864,8 +882,8 @@ def parse_intrusion_set(self, intrusion_set_obj: Dict[str, Any]) -> List[Dict[st
return [intrusion_set]

def parse_general_sco_indicator(
self, sco_object: Dict[str, Any], value_mapping: str = 'value'
) -> List[Dict[str, Any]]:
self, sco_object: dict[str, Any], value_mapping: str = 'value'
) -> list[dict[str, Any]]:
"""
Parses a single SCO indicator.

Expand Down Expand Up @@ -893,7 +911,7 @@ def parse_general_sco_indicator(

return [sco_indicator]

def parse_sco_autonomous_system_indicator(self, autonomous_system_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_sco_autonomous_system_indicator(self, autonomous_system_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses autonomous_system indicator type to cortex format.

Expand All @@ -905,7 +923,7 @@ def parse_sco_autonomous_system_indicator(self, autonomous_system_obj: Dict[str,

return autonomous_system_indicator

def parse_sco_file_indicator(self, file_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_sco_file_indicator(self, file_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses file indicator type to cortex format.

Expand Down Expand Up @@ -933,7 +951,7 @@ def parse_sco_file_indicator(self, file_obj: Dict[str, Any]) -> List[Dict[str, A

return file_indicator

def parse_sco_mutex_indicator(self, mutex_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_sco_mutex_indicator(self, mutex_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses mutex indicator type to cortex format.

Expand All @@ -942,7 +960,7 @@ def parse_sco_mutex_indicator(self, mutex_obj: Dict[str, Any]) -> List[Dict[str,
"""
return self.parse_general_sco_indicator(sco_object=mutex_obj, value_mapping='name')

def parse_sco_account_indicator(self, account_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_sco_account_indicator(self, account_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses account indicator type to cortex format.

Expand All @@ -958,7 +976,7 @@ def parse_sco_account_indicator(self, account_obj: Dict[str, Any]) -> List[Dict[
)
return account_indicator

def parse_sco_windows_registry_key_indicator(self, registry_key_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_sco_windows_registry_key_indicator(self, registry_key_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses registry_key indicator type to cortex format.

Expand All @@ -975,7 +993,7 @@ def parse_sco_windows_registry_key_indicator(self, registry_key_obj: Dict[str, A
)
return registry_key_indicator

def parse_identity(self, identity_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_identity(self, identity_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single identity object
:param identity_obj: identity object
Expand Down Expand Up @@ -1006,7 +1024,7 @@ def parse_identity(self, identity_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

return [identity]

def parse_location(self, location_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_location(self, location_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single location object
:param location_obj: location object
Expand Down Expand Up @@ -1039,7 +1057,7 @@ def parse_location(self, location_obj: Dict[str, Any]) -> List[Dict[str, Any]]:

return [location]

def parse_vulnerability(self, vulnerability_obj: Dict[str, Any]) -> List[Dict[str, Any]]:
def parse_vulnerability(self, vulnerability_obj: dict[str, Any]) -> list[dict[str, Any]]:
"""
Parses a single vulnerability object
:param vulnerability_obj: vulnerability object
Expand Down Expand Up @@ -1073,7 +1091,7 @@ def parse_vulnerability(self, vulnerability_obj: Dict[str, Any]) -> List[Dict[st

return [cve]

def parse_relationships(self, relationships_lst: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def parse_relationships(self, relationships_lst: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Parse the Relationships objects retrieved from the feed.

Returns:
Expand Down Expand Up @@ -1127,7 +1145,7 @@ def parse_relationships(self, relationships_lst: List[Dict[str, Any]]) -> List[D
}
return [dummy_indicator] if dummy_indicator else []

def build_iterator(self, limit: int = -1, **kwargs) -> List[Dict[str, str]]:
def build_iterator(self, limit: int = -1, **kwargs) -> list[dict[str, str]]:
"""
Polls the taxii server and builds a list of cortex indicators objects from the result
:param limit: max amount of indicators to fetch
Expand Down Expand Up @@ -1201,6 +1219,9 @@ def parse_generator_type_envelope(self, envelopes: types.GeneratorType, parse_ob
# no fetched objects
break

# we should build the id_to_object dict before iteration as some object reference each other
self.id_to_object.update({obj.get('id'): obj for obj in stix_objects})

# now we have a list of objects, go over each obj, save id with obj, parse the obj
for obj in stix_objects:
obj_type = obj.get('type')
Expand All @@ -1212,7 +1233,6 @@ def parse_generator_type_envelope(self, envelopes: types.GeneratorType, parse_ob
relationships_lst.append(obj)
continue

self.id_to_object[obj.get('id')] = obj
if not parse_objects_func.get(obj_type):
demisto.debug(f'There is no parsing function for object type {obj_type}, '
f'available parsing functions are for types: {",".join(parse_objects_func.keys())}.')
Expand Down Expand Up @@ -1257,8 +1277,8 @@ def get_page_size(self, max_limit: int, cur_limit: int) -> int:

@staticmethod
def extract_indicators_from_stix_objects(
stix_objs: List[Dict[str, str]], required_objects: List[str]
) -> List[Dict[str, str]]:
stix_objs: list[dict[str, str]], required_objects: list[str]
) -> list[dict[str, str]]:
"""
Extracts indicators from taxii objects
:param stix_objs: taxii objects
Expand All @@ -1272,11 +1292,11 @@ def extract_indicators_from_stix_objects(

def get_indicators_from_indicator_groups(
self,
indicator_groups: List[Tuple[str, str]],
indicator_obj: Dict[str, str],
indicator_types: Dict[str, str],
field_map: Dict[str, str],
) -> List[Dict[str, str]]:
indicator_groups: list[tuple[str, str]],
indicator_obj: dict[str, str],
indicator_types: dict[str, str],
field_map: dict[str, str],
) -> list[dict[str, str]]:
"""
Get indicators from indicator regex groups
:param indicator_groups: caught regex group in pattern of: [`type`, `indicator`]
Expand All @@ -1288,7 +1308,7 @@ def get_indicators_from_indicator_groups(
indicators = []
if indicator_groups:
for term in indicator_groups:
for taxii_type in indicator_types.keys():
for taxii_type in indicator_types:
# term should be list with 2 argument parsed with regex - [`type`, `indicator`]
if len(term) == 2 and taxii_type in term[0]:
type_ = indicator_types[taxii_type]
Expand Down Expand Up @@ -1360,15 +1380,15 @@ def create_indicator(self, indicator_obj, type_, value, field_map):

@staticmethod
def extract_indicator_groups_from_pattern(
pattern: str, regexes: List
) -> List[Tuple[str, str]]:
pattern: str, regexes: list
) -> list[tuple[str, str]]:
"""
Extracts indicator [`type`, `indicator`] groups from pattern
:param pattern: stix pattern
:param regexes: regexes to run to pattern
:return: extracted indicators list from pattern
"""
groups: List[Tuple[str, str]] = []
groups: list[tuple[str, str]] = []
for regex in regexes:
find_result = regex.findall(pattern)
if find_result:
Expand Down Expand Up @@ -1407,6 +1427,7 @@ def get_ioc_value(ioc, id_to_obj):
return Taxii2FeedClient.get_ioc_value_from_ioc_name(ioc_obj)
else:
return name
return None

@staticmethod
def get_ioc_value_from_ioc_name(ioc_obj):
Expand Down