-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #112 from UncoderIO/gis-7814
Improve AQL mapping method is_suitable
- Loading branch information
Showing
3 changed files
with
224 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
from typing import Optional | ||
|
||
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping | ||
|
||
|
||
class AQLLogSourceSignature(LogSourceSignature): | ||
def __init__( | ||
self, | ||
device_types: Optional[list[int]], | ||
categories: Optional[list[int]], | ||
qids: Optional[list[int]], | ||
qid_event_categories: Optional[list[int]], | ||
default_source: dict, | ||
): | ||
self.device_types = set(device_types or []) | ||
self.categories = set(categories or []) | ||
self.qids = set(qids or []) | ||
self.qid_event_categories = set(qid_event_categories or []) | ||
self._default_source = default_source or {} | ||
|
||
def is_suitable( | ||
self, | ||
devicetype: Optional[list[int]], | ||
category: Optional[list[int]], | ||
qid: Optional[list[int]], | ||
qideventcategory: Optional[list[int]], | ||
) -> bool: | ||
device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None | ||
category_match = set(category).issubset(self.categories) if category else None | ||
qid_match = set(qid).issubset(self.qids) if qid else None | ||
qid_event_category_match = set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None | ||
return all( | ||
condition for condition in ( | ||
device_type_match, category_match, | ||
qid_match, qid_event_category_match) | ||
if condition is not None | ||
) | ||
|
||
def __str__(self) -> str: | ||
return self._default_source.get("table", "events") | ||
|
||
@property | ||
def extra_condition(self) -> str: | ||
default_source = self._default_source | ||
return " AND ".join((f"{key}={value}" for key, value in default_source.items() if key != "table" and value)) | ||
|
||
|
||
class AQLMappings(BasePlatformMappings): | ||
def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature: | ||
log_source = mapping.get("log_source", {}) | ||
default_log_source = mapping["default_log_source"] | ||
return AQLLogSourceSignature( | ||
device_types=log_source.get("devicetype"), | ||
categories=log_source.get("category"), | ||
qids=log_source.get("qid"), | ||
qid_event_categories=log_source.get("qideventcategory"), | ||
default_source=default_log_source, | ||
) | ||
|
||
def get_suitable_source_mappings( | ||
self, | ||
field_names: list[str], | ||
devicetype: Optional[list[int]] = None, | ||
category: Optional[list[int]] = None, | ||
qid: Optional[list[int]] = None, | ||
qideventcategory: Optional[list[int]] = None, | ||
) -> list[SourceMapping]: | ||
suitable_source_mappings = [] | ||
for source_mapping in self._source_mappings.values(): | ||
if source_mapping.source_id == DEFAULT_MAPPING_NAME: | ||
continue | ||
|
||
log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature | ||
if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory): | ||
if source_mapping.fields_mapping.is_suitable(field_names): | ||
suitable_source_mappings.append(source_mapping) | ||
elif source_mapping.fields_mapping.is_suitable(field_names): | ||
suitable_source_mappings.append(source_mapping) | ||
|
||
if not suitable_source_mappings: | ||
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] | ||
|
||
return suitable_source_mappings | ||
|
||
|
||
aql_mappings = AQLMappings(platform_dir="qradar") |
113 changes: 113 additions & 0 deletions
113
uncoder-core/app/translator/platforms/base/aql/parsers/aql.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
""" | ||
Uncoder IO Commercial Edition License | ||
----------------------------------------------------------------- | ||
Copyright (c) 2024 SOC Prime, Inc. | ||
This file is part of the Uncoder IO Commercial Edition ("CE") and is | ||
licensed under the Uncoder IO Non-Commercial License (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
https://github.com/UncoderIO/UncoderIO/blob/main/LICENSE | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
----------------------------------------------------------------- | ||
""" | ||
|
||
import re | ||
from typing import Union | ||
|
||
from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer | ||
from app.translator.core.parser import PlatformQueryParser | ||
from app.translator.platforms.base.aql.const import NUM_VALUE_PATTERN, SINGLE_QUOTES_VALUE_PATTERN | ||
from app.translator.platforms.base.aql.mapping import AQLMappings, aql_mappings | ||
from app.translator.platforms.base.aql.tokenizer import AQLTokenizer | ||
from app.translator.tools.utils import get_match_group | ||
|
||
|
||
class AQLQueryParser(PlatformQueryParser): | ||
tokenizer = AQLTokenizer() | ||
mappings: AQLMappings = aql_mappings | ||
|
||
log_source_functions = ("LOGSOURCENAME", "LOGSOURCEGROUPNAME", "LOGSOURCETYPENAME", "CATEGORYNAME") | ||
log_source_function_pattern = r"\(?(?P<key>___func_name___\([a-zA-Z]+\))(?:\s+like\s+|\s+ilike\s+|\s*=\s*)'(?P<value>[%a-zA-Z\s]+)'\s*\)?\s+(?:and|or)?\s" # noqa: E501 | ||
|
||
log_source_key_types = ("devicetype", "category", "qid", "qideventcategory") | ||
log_source_pattern = rf"___source_type___(?:\s+like\s+|\s+ilike\s+|\s*=\s*)(?:{SINGLE_QUOTES_VALUE_PATTERN}|{NUM_VALUE_PATTERN})(?:\s+(?:and|or)\s+|\s+)?" # noqa: E501 | ||
num_value_pattern = r"[0-9]+" | ||
multi_num_log_source_pattern = ( | ||
rf"___source_type___\s+in\s+\((?P<value>(?:{num_value_pattern}(?:\s*,\s*)?)+)\)(?:\s+(?:and|or)\s+|\s+)?" | ||
) | ||
str_value_pattern = r"""(?:')(?P<s_q_value>(?:[:a-zA-Z\*0-9=+%#\-\/\\,_".$&^@!\(\)\{\}\s]|'')+)(?:')""" | ||
multi_str_log_source_pattern = ( | ||
rf"""___source_type___\s+in\s+\((?P<value>(?:{str_value_pattern}(?:\s*,\s*)?)+)\)(?:\s+(?:and|or)\s+|\s+)?""" | ||
) | ||
|
||
table_pattern = r"\sFROM\s(?P<table>[a-zA-Z\.\-\*]+)\sWHERE\s" | ||
|
||
def __clean_query(self, query: str) -> str: | ||
for func_name in self.log_source_functions: | ||
pattern = self.log_source_function_pattern.replace("___func_name___", func_name) | ||
while search := re.search(pattern, query, flags=re.IGNORECASE): | ||
pos_start = search.start() | ||
pos_end = search.end() | ||
query = query[:pos_start] + query[pos_end:] | ||
|
||
return query | ||
|
||
@staticmethod | ||
def __parse_multi_value_log_source( | ||
match: re.Match, query: str, pattern: str | ||
) -> tuple[str, Union[list[str], list[int]]]: | ||
value = match.group("value") | ||
pos_start = match.start() | ||
pos_end = match.end() | ||
query = query[:pos_start] + query[pos_end:] | ||
return query, re.findall(pattern, value) | ||
|
||
def __parse_log_sources(self, query: str) -> tuple[dict[str, Union[list[str], list[int]]], str]: | ||
log_sources = {} | ||
|
||
if search := re.search(self.table_pattern, query, flags=re.IGNORECASE): | ||
pos_end = search.end() | ||
query = query[pos_end:] | ||
|
||
for log_source_key in self.log_source_key_types: | ||
pattern = self.log_source_pattern.replace("___source_type___", log_source_key) | ||
while search := re.search(pattern, query, flags=re.IGNORECASE): | ||
num_value = get_match_group(search, group_name="num_value") | ||
str_value = get_match_group(search, group_name="s_q_value") | ||
value = num_value and int(num_value) or str_value | ||
log_sources.setdefault(log_source_key, []).append(value) | ||
pos_start = search.start() | ||
pos_end = search.end() | ||
query = query[:pos_start] + query[pos_end:] | ||
|
||
pattern = self.multi_num_log_source_pattern.replace("___source_type___", log_source_key) | ||
if search := re.search(pattern, query, flags=re.IGNORECASE): | ||
query, values = self.__parse_multi_value_log_source(search, query, self.num_value_pattern) | ||
values = [int(v) for v in values] | ||
log_sources.setdefault(log_source_key, []).extend(values) | ||
|
||
pattern = self.multi_str_log_source_pattern.replace("___source_type___", log_source_key) | ||
if search := re.search(pattern, query, flags=re.IGNORECASE): | ||
query, values = self.__parse_multi_value_log_source(search, query, self.str_value_pattern) | ||
log_sources.setdefault(log_source_key, []).extend(values) | ||
|
||
return log_sources, query | ||
|
||
def _parse_query(self, text: str) -> tuple[str, dict[str, Union[list[str], list[int]]]]: | ||
query = self.__clean_query(text) | ||
log_sources, query = self.__parse_log_sources(query) | ||
return query, log_sources | ||
|
||
def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContainer: | ||
query, log_sources = self._parse_query(raw_query_container.query) | ||
tokens, source_mappings = self.get_tokens_and_source_mappings(query, log_sources) | ||
fields_tokens = self.get_fields_tokens(tokens=tokens) | ||
meta_info = raw_query_container.meta_info | ||
meta_info.query_fields = fields_tokens | ||
meta_info.source_mapping_ids = [source_mapping.source_id for source_mapping in source_mappings] | ||
return TokenizedQueryContainer(tokens=tokens, meta_info=meta_info) |