diff --git a/uncoder-core/app/translator/core/mapping.py b/uncoder-core/app/translator/core/mapping.py index 1486acad..886cfdc3 100644 --- a/uncoder-core/app/translator/core/mapping.py +++ b/uncoder-core/app/translator/core/mapping.py @@ -116,7 +116,7 @@ def prepare_mapping(self) -> dict[str, SourceMapping]: default_mapping = SourceMapping(source_id=DEFAULT_MAPPING_NAME) for mapping_dict in self._loader.load_platform_mappings(self._platform_dir): log_source_signature = self.prepare_log_source_signature(mapping=mapping_dict) - if (source_id := mapping_dict["source"]) == DEFAULT_MAPPING_NAME: + if (source_id := mapping_dict.get("source")) == DEFAULT_MAPPING_NAME: default_mapping.log_source_signature = log_source_signature if self.skip_load_default_mappings: continue diff --git a/uncoder-core/app/translator/core/mitre.py b/uncoder-core/app/translator/core/mitre.py index 681054f6..2e86a3be 100644 --- a/uncoder-core/app/translator/core/mitre.py +++ b/uncoder-core/app/translator/core/mitre.py @@ -189,7 +189,7 @@ def __load_mitre_configs_from_files(self) -> None: technique_id=technique_data["technique_id"], name=technique_data["technique"], url=technique_data["url"], - tactic=technique_data["tactic"], + tactic=technique_data.get("tactic", []), ) self.techniques.insert(technique_id, technique) except JSONDecodeError: diff --git a/uncoder-core/app/translator/core/parser.py b/uncoder-core/app/translator/core/parser.py index 28a8e13f..2d8ba1cc 100644 --- a/uncoder-core/app/translator/core/parser.py +++ b/uncoder-core/app/translator/core/parser.py @@ -83,4 +83,3 @@ def get_source_mappings( source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources) self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping) return source_mappings - diff --git a/uncoder-core/app/translator/platforms/base/aql/mapping.py b/uncoder-core/app/translator/platforms/base/aql/mapping.py index a7849513..984b85f2 100644 --- a/uncoder-core/app/translator/platforms/base/aql/mapping.py +++ b/uncoder-core/app/translator/platforms/base/aql/mapping.py @@ -48,7 +48,7 @@ class AQLMappings(BasePlatformMappings): def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature: log_source = mapping.get("log_source", {}) - default_log_source = mapping["default_log_source"] + default_log_source = mapping.get("default_log_source") return AQLLogSourceSignature( device_types=log_source.get("devicetype"), categories=log_source.get("category"), diff --git a/uncoder-core/app/translator/platforms/base/spl/str_value_manager.py b/uncoder-core/app/translator/platforms/base/spl/str_value_manager.py new file mode 100644 index 00000000..84ebaab7 --- /dev/null +++ b/uncoder-core/app/translator/platforms/base/spl/str_value_manager.py @@ -0,0 +1,55 @@ +""" +Uncoder IO Community Edition License +----------------------------------------------------------------- +Copyright (c) 2023 SOC Prime, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +----------------------------------------------------------------- +""" +from typing import ClassVar + +from app.translator.core.str_value_manager import BaseSpecSymbol, StrValue, StrValueManager, UnboundLenWildCard +from app.translator.platforms.base.spl.escape_manager import spl_escape_manager + + +class SplStrValueManager(StrValueManager): + escape_manager = spl_escape_manager + str_spec_symbols_map: ClassVar[dict[str, type[BaseSpecSymbol]]] = {"*": UnboundLenWildCard} + + def from_str_to_container(self, value: str) -> StrValue: + split = [] + prev_char = None + for char in value: + if char == "\\": + if prev_char == "\\": + split.append("\\") + prev_char = None + continue + elif char in self.str_spec_symbols_map: + if prev_char == "\\": + split.append(char) + else: + split.append(self.str_spec_symbols_map[char]()) + elif char in ('"', "=", "|", "<", ">"): + split.append(char) + else: + if prev_char == "\\": + split.append(prev_char) + split.append(char) + + prev_char = char + + return StrValue(self.escape_manager.remove_escape(value), self._concat(split)) + + +spl_str_value_manager = SplStrValueManager() diff --git a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py index 03c7ed70..4f04335a 100644 --- a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py +++ b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py @@ -18,7 +18,6 @@ """ from datetime import timedelta -from re import I from typing import Optional, Union from app.translator.core.exceptions.core import SigmaRuleValidationException diff --git a/uncoder-core/app/translator/tools/utils.py b/uncoder-core/app/translator/tools/utils.py index d61aa086..1c69efc7 100644 --- a/uncoder-core/app/translator/tools/utils.py +++ b/uncoder-core/app/translator/tools/utils.py @@ -4,6 +4,13 @@ from typing import Optional +def execute_module(path: str) -> None: + with suppress(FileNotFoundError): + spec = importlib.util.spec_from_file_location("__init__", path) + init_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(init_module) + + def execute_module(path: str) -> None: with suppress(FileNotFoundError): spec = importlib.util.spec_from_file_location("__init__", path)