Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve AQL mapping method is_suitable #112

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions uncoder-core/app/translator/core/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def apply_field_value(self, field: str, operator: Identifier, value: DEFAULT_VAL

class QueryRender(ABC):
comment_symbol: str = None
details: PlatformDetails = None
is_single_line_comment: bool = False
unsupported_functions_text = "Unsupported functions were excluded from the result query:"

Expand All @@ -146,7 +147,6 @@ def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryConta

class PlatformQueryRender(QueryRender):
mappings: BasePlatformMappings = None
details: PlatformDetails = None
is_strict_mapping: bool = False

or_token = "or"
Expand Down Expand Up @@ -295,28 +295,35 @@ def generate_raw_log_fields(self, fields: list[Field], source_mapping: SourceMap

def _generate_from_tokenized_query_container(self, query_container: TokenizedQueryContainer) -> str:
queries_map = {}
errors = []
source_mappings = self._get_source_mappings(query_container.meta_info.source_mapping_ids)

for source_mapping in source_mappings:
prefix = self.generate_prefix(source_mapping.log_source_signature)
if source_mapping.raw_log_fields:
defined_raw_log_fields = self.generate_raw_log_fields(
fields=query_container.meta_info.query_fields, source_mapping=source_mapping
try:
if source_mapping.raw_log_fields:
defined_raw_log_fields = self.generate_raw_log_fields(
fields=query_container.meta_info.query_fields, source_mapping=source_mapping
)
prefix += f"\n{defined_raw_log_fields}\n"
result = self.generate_query(tokens=query_container.tokens, source_mapping=source_mapping)
except StrictPlatformException as err:
errors.append(err)
continue
else:
rendered_functions = self.generate_functions(query_container.functions.functions, source_mapping)
not_supported_functions = query_container.functions.not_supported + rendered_functions.not_supported
finalized_query = self.finalize_query(
prefix=prefix,
query=result,
functions=rendered_functions.rendered,
not_supported_functions=not_supported_functions,
meta_info=query_container.meta_info,
source_mapping=source_mapping,
)
prefix += f"\n{defined_raw_log_fields}\n"
result = self.generate_query(tokens=query_container.tokens, source_mapping=source_mapping)
rendered_functions = self.generate_functions(query_container.functions.functions, source_mapping)
not_supported_functions = query_container.functions.not_supported + rendered_functions.not_supported
finalized_query = self.finalize_query(
prefix=prefix,
query=result,
functions=rendered_functions.rendered,
not_supported_functions=not_supported_functions,
meta_info=query_container.meta_info,
source_mapping=source_mapping,
)
queries_map[source_mapping.source_id] = finalized_query

queries_map[source_mapping.source_id] = finalized_query
if not queries_map and errors:
raise errors[0]
return self.finalize(queries_map)

def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str:
Expand Down
86 changes: 86 additions & 0 deletions uncoder-core/app/translator/platforms/base/aql/mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Optional

from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping


class AQLLogSourceSignature(LogSourceSignature):
def __init__(
self,
device_types: Optional[list[int]],
categories: Optional[list[int]],
qids: Optional[list[int]],
qid_event_categories: Optional[list[int]],
default_source: dict,
):
self.device_types = set(device_types or [])
self.categories = set(categories or [])
self.qids = set(qids or [])
self.qid_event_categories = set(qid_event_categories or [])
self._default_source = default_source or {}

def is_suitable(
self,
devicetype: Optional[list[int]],
category: Optional[list[int]],
qid: Optional[list[int]],
qideventcategory: Optional[list[int]],
) -> bool:
device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None
category_match = set(category).issubset(self.categories) if category else None
qid_match = set(qid).issubset(self.qids) if qid else None
qid_event_category_match = set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None
return all(
condition for condition in (
device_type_match, category_match,
qid_match, qid_event_category_match)
if condition is not None
)

def __str__(self) -> str:
return self._default_source.get("table", "events")

@property
def extra_condition(self) -> str:
default_source = self._default_source
return " AND ".join((f"{key}={value}" for key, value in default_source.items() if key != "table" and value))


class AQLMappings(BasePlatformMappings):
def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature:
log_source = mapping.get("log_source", {})
default_log_source = mapping["default_log_source"]
return AQLLogSourceSignature(
device_types=log_source.get("devicetype"),
categories=log_source.get("category"),
qids=log_source.get("qid"),
qid_event_categories=log_source.get("qideventcategory"),
default_source=default_log_source,
)

def get_suitable_source_mappings(
self,
field_names: list[str],
devicetype: Optional[list[int]] = None,
category: Optional[list[int]] = None,
qid: Optional[list[int]] = None,
qideventcategory: Optional[list[int]] = None,
) -> list[SourceMapping]:
suitable_source_mappings = []
for source_mapping in self._source_mappings.values():
if source_mapping.source_id == DEFAULT_MAPPING_NAME:
continue

log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature
if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory):
if source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)
elif source_mapping.fields_mapping.is_suitable(field_names):
suitable_source_mappings.append(source_mapping)

if not suitable_source_mappings:
suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]

return suitable_source_mappings


aql_mappings = AQLMappings(platform_dir="qradar")
113 changes: 113 additions & 0 deletions uncoder-core/app/translator/platforms/base/aql/parsers/aql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Uncoder IO Commercial Edition License
-----------------------------------------------------------------
Copyright (c) 2024 SOC Prime, Inc.

This file is part of the Uncoder IO Commercial Edition ("CE") and is
licensed under the Uncoder IO Non-Commercial License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://github.com/UncoderIO/UncoderIO/blob/main/LICENSE

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-----------------------------------------------------------------
"""

import re
from typing import Union

from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer
from app.translator.core.parser import PlatformQueryParser
from app.translator.platforms.base.aql.const import NUM_VALUE_PATTERN, SINGLE_QUOTES_VALUE_PATTERN
from app.translator.platforms.base.aql.mapping import AQLMappings, aql_mappings
from app.translator.platforms.base.aql.tokenizer import AQLTokenizer
from app.translator.tools.utils import get_match_group


class AQLQueryParser(PlatformQueryParser):
tokenizer = AQLTokenizer()
mappings: AQLMappings = aql_mappings

log_source_functions = ("LOGSOURCENAME", "LOGSOURCEGROUPNAME", "LOGSOURCETYPENAME", "CATEGORYNAME")
log_source_function_pattern = r"\(?(?P<key>___func_name___\([a-zA-Z]+\))(?:\s+like\s+|\s+ilike\s+|\s*=\s*)'(?P<value>[%a-zA-Z\s]+)'\s*\)?\s+(?:and|or)?\s" # noqa: E501

log_source_key_types = ("devicetype", "category", "qid", "qideventcategory")
log_source_pattern = rf"___source_type___(?:\s+like\s+|\s+ilike\s+|\s*=\s*)(?:{SINGLE_QUOTES_VALUE_PATTERN}|{NUM_VALUE_PATTERN})(?:\s+(?:and|or)\s+|\s+)?" # noqa: E501
num_value_pattern = r"[0-9]+"
multi_num_log_source_pattern = (
rf"___source_type___\s+in\s+\((?P<value>(?:{num_value_pattern}(?:\s*,\s*)?)+)\)(?:\s+(?:and|or)\s+|\s+)?"
)
str_value_pattern = r"""(?:')(?P<s_q_value>(?:[:a-zA-Z\*0-9=+%#\-\/\\,_".$&^@!\(\)\{\}\s]|'')+)(?:')"""
multi_str_log_source_pattern = (
rf"""___source_type___\s+in\s+\((?P<value>(?:{str_value_pattern}(?:\s*,\s*)?)+)\)(?:\s+(?:and|or)\s+|\s+)?"""
)

table_pattern = r"\sFROM\s(?P<table>[a-zA-Z\.\-\*]+)\sWHERE\s"

def __clean_query(self, query: str) -> str:
for func_name in self.log_source_functions:
pattern = self.log_source_function_pattern.replace("___func_name___", func_name)
while search := re.search(pattern, query, flags=re.IGNORECASE):
pos_start = search.start()
pos_end = search.end()
query = query[:pos_start] + query[pos_end:]

return query

@staticmethod
def __parse_multi_value_log_source(
match: re.Match, query: str, pattern: str
) -> tuple[str, Union[list[str], list[int]]]:
value = match.group("value")
pos_start = match.start()
pos_end = match.end()
query = query[:pos_start] + query[pos_end:]
return query, re.findall(pattern, value)

def __parse_log_sources(self, query: str) -> tuple[dict[str, Union[list[str], list[int]]], str]:
log_sources = {}

if search := re.search(self.table_pattern, query, flags=re.IGNORECASE):
pos_end = search.end()
query = query[pos_end:]

for log_source_key in self.log_source_key_types:
pattern = self.log_source_pattern.replace("___source_type___", log_source_key)
while search := re.search(pattern, query, flags=re.IGNORECASE):
num_value = get_match_group(search, group_name="num_value")
str_value = get_match_group(search, group_name="s_q_value")
value = num_value and int(num_value) or str_value
log_sources.setdefault(log_source_key, []).append(value)
pos_start = search.start()
pos_end = search.end()
query = query[:pos_start] + query[pos_end:]

pattern = self.multi_num_log_source_pattern.replace("___source_type___", log_source_key)
if search := re.search(pattern, query, flags=re.IGNORECASE):
query, values = self.__parse_multi_value_log_source(search, query, self.num_value_pattern)
values = [int(v) for v in values]
log_sources.setdefault(log_source_key, []).extend(values)

pattern = self.multi_str_log_source_pattern.replace("___source_type___", log_source_key)
if search := re.search(pattern, query, flags=re.IGNORECASE):
query, values = self.__parse_multi_value_log_source(search, query, self.str_value_pattern)
log_sources.setdefault(log_source_key, []).extend(values)

return log_sources, query

def _parse_query(self, text: str) -> tuple[str, dict[str, Union[list[str], list[int]]]]:
query = self.__clean_query(text)
log_sources, query = self.__parse_log_sources(query)
return query, log_sources

def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContainer:
query, log_sources = self._parse_query(raw_query_container.query)
tokens, source_mappings = self.get_tokens_and_source_mappings(query, log_sources)
fields_tokens = self.get_fields_tokens(tokens=tokens)
meta_info = raw_query_container.meta_info
meta_info.query_fields = fields_tokens
meta_info.source_mapping_ids = [source_mapping.source_id for source_mapping in source_mappings]
return TokenizedQueryContainer(tokens=tokens, meta_info=meta_info)
Loading