diff --git a/uncoder-core/app/translator/core/exceptions/core.py b/uncoder-core/app/translator/core/exceptions/core.py index 68c8851c..e6358cce 100644 --- a/uncoder-core/app/translator/core/exceptions/core.py +++ b/uncoder-core/app/translator/core/exceptions/core.py @@ -84,5 +84,9 @@ class InvalidJSONStructure(InvalidRuleStructure): rule_type: str = "JSON" +class InvalidTOMLStructure(InvalidRuleStructure): + rule_type: str = "TOML" + + class InvalidXMLStructure(InvalidRuleStructure): rule_type: str = "XML" diff --git a/uncoder-core/app/translator/core/mixins/rule.py b/uncoder-core/app/translator/core/mixins/rule.py index 8f6bc080..60439f6e 100644 --- a/uncoder-core/app/translator/core/mixins/rule.py +++ b/uncoder-core/app/translator/core/mixins/rule.py @@ -1,10 +1,16 @@ import json from typing import Union +import toml import xmltodict import yaml -from app.translator.core.exceptions.core import InvalidJSONStructure, InvalidXMLStructure, InvalidYamlStructure +from app.translator.core.exceptions.core import ( + InvalidJSONStructure, + InvalidTOMLStructure, + InvalidXMLStructure, + InvalidYamlStructure, +) from app.translator.core.mitre import MitreConfig, MitreInfoContainer @@ -50,3 +56,14 @@ def load_rule(text: Union[str, bytes]) -> dict: return xmltodict.parse(text) except Exception as err: raise InvalidXMLStructure(error=str(err)) from err + + +class TOMLRuleMixin: + mitre_config: MitreConfig = MitreConfig() + + @staticmethod + def load_rule(text: str) -> dict: + try: + return toml.loads(text) + except toml.TomlDecodeError as err: + raise InvalidTOMLStructure(error=str(err)) from err diff --git a/uncoder-core/app/translator/core/models/query_container.py b/uncoder-core/app/translator/core/models/query_container.py index 7dca8e7f..bb95f9b4 100644 --- a/uncoder-core/app/translator/core/models/query_container.py +++ b/uncoder-core/app/translator/core/models/query_container.py @@ -39,11 +39,15 @@ def __init__( trigger_threshold: Optional[str] = None, query_frequency: Optional[str] = None, query_period: Optional[str] = None, + from_: Optional[str] = None, + interval: Optional[str] = None, ) -> None: self.trigger_operator = trigger_operator self.trigger_threshold = trigger_threshold self.query_frequency = query_frequency self.query_period = query_period + self.from_ = from_ + self.interval = interval class MetaInfoContainer: @@ -51,6 +55,10 @@ def __init__( self, *, id_: Optional[str] = None, + index: Optional[list[str]] = None, + language: Optional[str] = None, + risk_score: Optional[int] = None, + type_: Optional[str] = None, title: Optional[str] = None, description: Optional[str] = None, author: Optional[list[str]] = None, @@ -73,6 +81,10 @@ def __init__( ) -> None: self.id = id_ or str(uuid.uuid4()) self.title = title or "" + self.index = index or [] + self.language = language or "" + self.risk_score = risk_score + self.type_ = type_ or "" self.description = description or "" self.author = [v.strip() for v in author] if author else [] self.date = date or datetime.now().date().strftime("%Y-%m-%d") diff --git a/uncoder-core/app/translator/platforms/elasticsearch/__init__.py b/uncoder-core/app/translator/platforms/elasticsearch/__init__.py index 6c1fca9e..91a7d362 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/__init__.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/__init__.py @@ -1,4 +1,7 @@ -from app.translator.platforms.elasticsearch.parsers.detection_rule import ElasticSearchRuleParser # noqa: F401 +from app.translator.platforms.elasticsearch.parsers.detection_rule import ( + ElasticSearchRuleParser, # noqa: F401 + ElasticSearchRuleTOMLParser, # noqa: F401 +) from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser # noqa: F401 from app.translator.platforms.elasticsearch.renders.detection_rule import ElasticSearchRuleRender # noqa: F401 from app.translator.platforms.elasticsearch.renders.elast_alert import ElastAlertRuleRender # noqa: F401 diff --git a/uncoder-core/app/translator/platforms/elasticsearch/const.py b/uncoder-core/app/translator/platforms/elasticsearch/const.py index a87d5e84..b48c4f0b 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/const.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/const.py @@ -5,6 +5,7 @@ _ELASTIC_LUCENE_QUERY = "elastic-lucene-query" _ELASTIC_LUCENE_RULE = "elastic-lucene-rule" +_ELASTIC_LUCENE_RULE_TOML = "elastic-lucene-rule-toml" _ELASTIC_KIBANA_RULE = "elastic-kibana-rule" _ELASTALERT_LUCENE_RULE = "elastalert-lucene-rule" _ELASTIC_WATCHER_RULE = "elastic-watcher-rule" @@ -50,6 +51,14 @@ **PLATFORM_DETAILS, } +ELASTICSEARCH_RULE_TOML_DETAILS = { + "platform_id": _ELASTIC_LUCENE_RULE_TOML, + "name": "Elastic Rule TOML", + "platform_name": "Detection Rule (Lucene) TOML", + "first_choice": 0, + **PLATFORM_DETAILS, +} + KIBANA_DETAILS = { "platform_id": _ELASTIC_KIBANA_RULE, "name": "Elastic Kibana Saved Search", @@ -78,6 +87,7 @@ elasticsearch_esql_query_details = PlatformDetails(**ELASTICSEARCH_ESQL_QUERY_DETAILS) elasticsearch_esql_rule_details = PlatformDetails(**ELASTICSEARCH_ESQL_RULE_DETAILS) elasticsearch_rule_details = PlatformDetails(**ELASTICSEARCH_RULE_DETAILS) +elasticsearch_rule_toml_details = PlatformDetails(**ELASTICSEARCH_RULE_TOML_DETAILS) elastalert_details = PlatformDetails(**ELASTALERT_DETAILS) kibana_rule_details = PlatformDetails(**KIBANA_DETAILS) xpack_watcher_details = PlatformDetails(**XPACK_WATCHER_DETAILS) diff --git a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py index 91ff35c6..6d04b229 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py @@ -15,12 +15,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ----------------------------------------------------------------- """ +from datetime import datetime -from app.translator.core.mixins.rule import JsonRuleMixin +from app.translator.core.mixins.rule import JsonRuleMixin, TOMLRuleMixin from app.translator.core.models.platform_details import PlatformDetails -from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer +from app.translator.core.models.query_container import MetaInfoContainer, RawMetaInfoContainer, RawQueryContainer from app.translator.managers import parser_manager -from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details +from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details, elasticsearch_rule_toml_details from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser from app.translator.tools.utils import parse_rule_description_str @@ -53,3 +54,45 @@ def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: mitre_attack=mitre_attack, ), ) + + +@parser_manager.register +class ElasticSearchRuleTOMLParser(ElasticSearchQueryParser, TOMLRuleMixin): + details: PlatformDetails = elasticsearch_rule_toml_details + + def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: + raw_rule = self.load_rule(text=text) + rule = raw_rule.get("rule") + metadata = raw_rule.get("metadata") + techniques = [] + for threat_data in rule.get("threat", []): + if threat_data.get("technique"): + techniques.append(threat_data["technique"][0]["id"].lower()) + mitre_attack = self.mitre_config.get_mitre_info( + tactics=[threat_data["tactic"]["name"].replace(" ", "_").lower() for threat_data in rule.get("threat", [])], + techniques=techniques, + ) + date = None + if metadata.get("creation_date"): + date = datetime.strptime(metadata.get("creation_date"), "%Y/%m/%d").strftime("%Y-%m-%d") + return RawQueryContainer( + query=rule["query"], + language=language, + meta_info=MetaInfoContainer( + id_=rule.get("rule_id"), + title=rule.get("name"), + description=rule.get("description"), + author=rule.get("author"), + date=date, + license_=rule.get("license"), + severity=rule.get("severity"), + references=rule.get("references"), + tags=rule.get("tags"), + mitre_attack=mitre_attack, + index=rule.get("index"), + language=rule.get("language"), + risk_score=rule.get("risk_score"), + type_=rule.get("type"), + raw_metainfo_container=RawMetaInfoContainer(from_=rule.get("from"), interval=rule.get("interval")), + ), + )