diff --git a/translator/app/translator/const.py b/translator/app/translator/const.py index c4da7cad..80bb4339 100644 --- a/translator/app/translator/const.py +++ b/translator/app/translator/const.py @@ -5,4 +5,6 @@ CTI_MIN_LIMIT_QUERY = 10000 +CTI_IOCS_PER_QUERY_LIMIT = 25 + DEFAULT_VALUE_TYPE = Union[Union[int, str, List[int], List[str]]] diff --git a/translator/app/translator/core/parser_cti.py b/translator/app/translator/core/parser_cti.py index 502594db..77a32b33 100644 --- a/translator/app/translator/core/parser_cti.py +++ b/translator/app/translator/core/parser_cti.py @@ -21,10 +21,12 @@ def get_total_count(self) -> int: hash_len += len(value) return len(self.ip) + len(self.url) + len(self.domain) + hash_len - def return_iocs(self) -> dict: + def return_iocs(self, include_source_ip: bool = False) -> dict: if all(not value for value in [self.ip, self.url, self.domain, self.hash_dict]): raise EmptyIOCSException() result = {"DestinationIP": self.ip, "URL": self.url, "Domain": self.domain} + if include_source_ip: + result["SourceIP"] = self.ip for key, value in self.hash_dict.items(): result[HASH_MAP[key]] = value return result @@ -33,14 +35,15 @@ def return_iocs(self) -> dict: class CTIParser: def get_iocs_from_string( - self, - string: str, - include_ioc_types: Optional[List[IOCType]] = None, - include_hash_types: Optional[List[HashType]] = None, - exceptions: Optional[List[str]] = None, - ioc_parsing_rules: Optional[List[IocParsingRule]] = None, - limit: Optional[int] = None - ) -> Iocs: + self, + string: str, + include_ioc_types: Optional[List[IOCType]] = None, + include_hash_types: Optional[List[HashType]] = None, + exceptions: Optional[List[str]] = None, + ioc_parsing_rules: Optional[List[IocParsingRule]] = None, + limit: Optional[int] = None, + include_source_ip: bool = False + ) -> dict: iocs = Iocs() string = self.replace_dots_hxxp(string, ioc_parsing_rules) if not include_ioc_types or "ip" in include_ioc_types: @@ -62,7 +65,7 @@ def get_iocs_from_string( total_count = iocs.get_total_count() if total_count > limit: raise IocsLimitExceededException(f"IOCs count {total_count} exceeds limit {limit}.") - return iocs.return_iocs() + return iocs.return_iocs(include_source_ip) def replace_dots_hxxp(self, string, ioc_parsing_rules): if ioc_parsing_rules is None or "replace_dots" in ioc_parsing_rules: diff --git a/translator/app/translator/cti_translator.py b/translator/app/translator/cti_translator.py index e359a555..ce044456 100644 --- a/translator/app/translator/cti_translator.py +++ b/translator/app/translator/cti_translator.py @@ -1,7 +1,7 @@ import logging from typing import Dict, List -from app.translator.const import CTI_MIN_LIMIT_QUERY +from app.translator.const import CTI_MIN_LIMIT_QUERY, CTI_IOCS_PER_QUERY_LIMIT from app.translator.core.models.iocs import IocsChunkValue from app.translator.core.parser_cti import CTIParser, Iocs from app.translator.core.render_cti import RenderCTI @@ -17,44 +17,46 @@ def __init__(self): self.logger = logging.getLogger("cti_converter") self.parser = CTIParser() - def _get_render_mapping(self, platform: CTIPlatform, include_source_ip: bool = False) -> Dict[str, str]: - return self.renders.get(platform.name).default_mapping - @handle_translation_exceptions - def __parse_iocs_from_string(self, text: str, include_ioc_types: list = None, include_hash_types: list = None, - exceptions: list = None, ioc_parsing_rules: list = None) -> Iocs: + def __parse_iocs_from_string(self, text: str, + include_ioc_types: list = None, + include_hash_types: list = None, + exceptions: list = None, + ioc_parsing_rules: list = None, + include_source_ip: bool = False) -> dict: return self.parser.get_iocs_from_string(string=text, include_ioc_types=include_ioc_types, include_hash_types=include_hash_types, exceptions=exceptions, ioc_parsing_rules=ioc_parsing_rules, - limit=CTI_MIN_LIMIT_QUERY) + limit=CTI_MIN_LIMIT_QUERY, + include_source_ip=include_source_ip) @handle_translation_exceptions - def __render_translation(self, parsed_data: dict, platform_data: CTIPlatform, iocs_per_query: int, - include_source_ip: bool = False) -> List[str]: - mapping = self._get_render_mapping(platform=platform_data, include_source_ip=include_source_ip) + def __render_translation(self, parsed_data: dict, platform_data: CTIPlatform, iocs_per_query: int) -> List[str]: platform = self.renders.get(platform_data.name) platform_generation = self.generate(data=parsed_data, platform=platform, iocs_per_query=iocs_per_query, - mapping=mapping) + mapping=platform.default_mapping) return platform_generation def convert(self, text: str, platform_data: CTIPlatform, - iocs_per_query: int = 25, + iocs_per_query: int = None, include_ioc_types: list = None, include_hash_types: list = None, exceptions: list = None, ioc_parsing_rules: list = None, include_source_ip: bool = False) -> (bool, List[str]): + if not iocs_per_query: + iocs_per_query = CTI_IOCS_PER_QUERY_LIMIT status, parsed_data = self.__parse_iocs_from_string(text=text, include_ioc_types=include_ioc_types, include_hash_types=include_hash_types, exceptions=exceptions, - ioc_parsing_rules=ioc_parsing_rules) + ioc_parsing_rules=ioc_parsing_rules, + include_source_ip=include_source_ip) if status: return self.__render_translation(parsed_data=parsed_data, - include_source_ip=include_source_ip, platform_data=platform_data, iocs_per_query=iocs_per_query )