Skip to content
4 changes: 4 additions & 0 deletions uncoder-core/app/translator/core/exceptions/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,9 @@ class InvalidJSONStructure(InvalidRuleStructure):
rule_type: str = "JSON"


class InvalidTOMLStructure(InvalidRuleStructure):
rule_type: str = "TOML"


class InvalidXMLStructure(InvalidRuleStructure):
rule_type: str = "XML"
19 changes: 18 additions & 1 deletion uncoder-core/app/translator/core/mixins/rule.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import json
from typing import Union

import toml
import xmltodict
import yaml

from app.translator.core.exceptions.core import InvalidJSONStructure, InvalidXMLStructure, InvalidYamlStructure
from app.translator.core.exceptions.core import (
InvalidJSONStructure,
InvalidTOMLStructure,
InvalidXMLStructure,
InvalidYamlStructure,
)
from app.translator.core.mitre import MitreConfig, MitreInfoContainer


Expand Down Expand Up @@ -50,3 +56,14 @@ def load_rule(text: Union[str, bytes]) -> dict:
return xmltodict.parse(text)
except Exception as err:
raise InvalidXMLStructure(error=str(err)) from err


class TOMLRuleMixin:
mitre_config: MitreConfig = MitreConfig()

@staticmethod
def load_rule(text: str) -> dict:
try:
return toml.loads(text)
except toml.TomlDecodeError as err:
raise InvalidTOMLStructure(error=str(err)) from err
12 changes: 12 additions & 0 deletions uncoder-core/app/translator/core/models/query_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,26 @@ def __init__(
trigger_threshold: Optional[str] = None,
query_frequency: Optional[str] = None,
query_period: Optional[str] = None,
from_: Optional[str] = None,
interval: Optional[str] = None,
) -> None:
self.trigger_operator = trigger_operator
self.trigger_threshold = trigger_threshold
self.query_frequency = query_frequency
self.query_period = query_period
self.from_ = from_
self.interval = interval


class MetaInfoContainer:
def __init__(
self,
*,
id_: Optional[str] = None,
index: Optional[list[str]] = None,
language: Optional[str] = None,
risk_score: Optional[int] = None,
type_: Optional[str] = None,
title: Optional[str] = None,
description: Optional[str] = None,
author: Optional[list[str]] = None,
Expand All @@ -73,6 +81,10 @@ def __init__(
) -> None:
self.id = id_ or str(uuid.uuid4())
self.title = title or ""
self.index = index or []
self.language = language or ""
self.risk_score = risk_score
self.type_ = type_ or ""
self.description = description or ""
self.author = [v.strip() for v in author] if author else []
self.date = date or datetime.now().date().strftime("%Y-%m-%d")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from app.translator.platforms.elasticsearch.parsers.detection_rule import ElasticSearchRuleParser # noqa: F401
from app.translator.platforms.elasticsearch.parsers.detection_rule import (
ElasticSearchRuleParser, # noqa: F401
ElasticSearchRuleTOMLParser, # noqa: F401
)
from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser # noqa: F401
from app.translator.platforms.elasticsearch.renders.detection_rule import ElasticSearchRuleRender # noqa: F401
from app.translator.platforms.elasticsearch.renders.elast_alert import ElastAlertRuleRender # noqa: F401
Expand Down
10 changes: 10 additions & 0 deletions uncoder-core/app/translator/platforms/elasticsearch/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

_ELASTIC_LUCENE_QUERY = "elastic-lucene-query"
_ELASTIC_LUCENE_RULE = "elastic-lucene-rule"
_ELASTIC_LUCENE_RULE_TOML = "elastic-lucene-rule-toml"
_ELASTIC_KIBANA_RULE = "elastic-kibana-rule"
_ELASTALERT_LUCENE_RULE = "elastalert-lucene-rule"
_ELASTIC_WATCHER_RULE = "elastic-watcher-rule"
Expand Down Expand Up @@ -50,6 +51,14 @@
**PLATFORM_DETAILS,
}

ELASTICSEARCH_RULE_TOML_DETAILS = {
"platform_id": _ELASTIC_LUCENE_RULE_TOML,
"name": "Elastic Rule TOML",
"platform_name": "Detection Rule (Lucene) TOML",
"first_choice": 0,
**PLATFORM_DETAILS,
}

KIBANA_DETAILS = {
"platform_id": _ELASTIC_KIBANA_RULE,
"name": "Elastic Kibana Saved Search",
Expand Down Expand Up @@ -78,6 +87,7 @@
elasticsearch_esql_query_details = PlatformDetails(**ELASTICSEARCH_ESQL_QUERY_DETAILS)
elasticsearch_esql_rule_details = PlatformDetails(**ELASTICSEARCH_ESQL_RULE_DETAILS)
elasticsearch_rule_details = PlatformDetails(**ELASTICSEARCH_RULE_DETAILS)
elasticsearch_rule_toml_details = PlatformDetails(**ELASTICSEARCH_RULE_TOML_DETAILS)
elastalert_details = PlatformDetails(**ELASTALERT_DETAILS)
kibana_rule_details = PlatformDetails(**KIBANA_DETAILS)
xpack_watcher_details = PlatformDetails(**XPACK_WATCHER_DETAILS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-----------------------------------------------------------------
"""
from datetime import datetime

from app.translator.core.mixins.rule import JsonRuleMixin
from app.translator.core.mixins.rule import JsonRuleMixin, TOMLRuleMixin
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer
from app.translator.core.models.query_container import MetaInfoContainer, RawMetaInfoContainer, RawQueryContainer
from app.translator.managers import parser_manager
from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details
from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details, elasticsearch_rule_toml_details
from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser
from app.translator.tools.utils import parse_rule_description_str

Expand Down Expand Up @@ -53,3 +54,45 @@ def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
mitre_attack=mitre_attack,
),
)


@parser_manager.register
class ElasticSearchRuleTOMLParser(ElasticSearchQueryParser, TOMLRuleMixin):
details: PlatformDetails = elasticsearch_rule_toml_details

def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
raw_rule = self.load_rule(text=text)
rule = raw_rule.get("rule")
metadata = raw_rule.get("metadata")
techniques = []
for threat_data in rule.get("threat", []):
if threat_data.get("technique"):
techniques.append(threat_data["technique"][0]["id"].lower())
mitre_attack = self.mitre_config.get_mitre_info(
tactics=[threat_data["tactic"]["name"].replace(" ", "_").lower() for threat_data in rule.get("threat", [])],
techniques=techniques,
)
date = None
if metadata.get("creation_date"):
date = datetime.strptime(metadata.get("creation_date"), "%Y/%m/%d").strftime("%Y-%m-%d")
return RawQueryContainer(
query=rule["query"],
language=language,
meta_info=MetaInfoContainer(
id_=rule.get("rule_id"),
title=rule.get("name"),
description=rule.get("description"),
author=rule.get("author"),
date=date,
license_=rule.get("license"),
severity=rule.get("severity"),
references=rule.get("references"),
tags=rule.get("tags"),
mitre_attack=mitre_attack,
index=rule.get("index"),
language=rule.get("language"),
risk_score=rule.get("risk_score"),
type_=rule.get("type"),
raw_metainfo_container=RawMetaInfoContainer(from_=rule.get("from"), interval=rule.get("interval")),
),
)