diff --git a/assemblyline/common/caching.py b/assemblyline/common/caching.py index 4c8c2276c..65ac95b59 100644 --- a/assemblyline/common/caching.py +++ b/assemblyline/common/caching.py @@ -39,8 +39,7 @@ def __init__(self, timeout: float, expiry_rate: float = 5, raise_on_error: bool self.raise_on_error = raise_on_error self.cache: dict[Hashable, T] = {} self.timeout_list: list[Tuple[float, Hashable]] = [] - timeout_thread = threading.Thread(target=self._process_timeouts, name="_process_timeouts") - timeout_thread.setDaemon(True) + timeout_thread = threading.Thread(target=self._process_timeouts, name="_process_timeouts", daemon=True) timeout_thread.start() def __len__(self): @@ -150,7 +149,7 @@ def keys(self): return self.cache.keys() -def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional[Task] = None) -> str: +def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional[Task] = None, partial_result: bool = False) -> str: ignore_salt = None service_config = None submission_params_str = None @@ -165,7 +164,7 @@ def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional } submission_params_str = json.dumps(sorted(submission_params.items())) - if task.ignore_cache: + if task.ignore_cache or partial_result: ignore_salt = get_random_id() if service_tool_version is None and \ diff --git a/assemblyline/datastore/helper.py b/assemblyline/datastore/helper.py index 38eb7427e..12102c7af 100644 --- a/assemblyline/datastore/helper.py +++ b/assemblyline/datastore/helper.py @@ -487,7 +487,7 @@ def get_single_result(self, key, cl_engine=forge.get_classification(), as_obj=Fa if key.endswith(".e"): data = self.create_empty_result_from_key(key, cl_engine, as_obj=as_obj) else: - data = self.result.get(key, as_obj=False) + data = self.result.get(key, as_obj=as_obj) return data diff --git a/assemblyline/odm/__init__.py b/assemblyline/odm/__init__.py index 60c3c9b84..4b494ebf7 100644 --- a/assemblyline/odm/__init__.py +++ b/assemblyline/odm/__init__.py @@ -5,7 +5,7 @@ # Imports that have the same effect as some part of the one above so that # type checking can use this file properly. -from assemblyline.odm.base import Keyword, Optional, Boolean, Integer, List, Compound, Mapping, Date +from assemblyline.odm.base import Keyword, Optional, Boolean, Integer, List, Compound, Mapping, Date, Enum from datetime import datetime _InnerType = typing.TypeVar("_InnerType") @@ -53,3 +53,7 @@ def mapping(child_type: _InnerType, **kwargs) -> dict[str, _InnerType]: def compound(child_type: typing.Callable[..., _InnerType], **kwargs) -> _InnerType: return typing.cast(_InnerType, Compound(child_type, **kwargs)) + + +def enum(values: typing.Iterable[str], **kwargs) -> str: + return typing.cast(str, Enum(values, **kwargs)) diff --git a/assemblyline/odm/models/config.py b/assemblyline/odm/models/config.py index 690945ebb..b3b55745a 100644 --- a/assemblyline/odm/models/config.py +++ b/assemblyline/odm/models/config.py @@ -1316,6 +1316,13 @@ class Verdicts(odm.Model): } +TEMPORARY_KEY_TYPE = [ + 'union', + 'overwrite', + 'ignore', +] + + @odm.model(index=False, store=False, description="Default values for parameters for submissions that may be overridden on a per submission basis") class Submission(odm.Model): @@ -1336,8 +1343,16 @@ class Submission(odm.Model): description="Tag types that show up in the submission summary") verdicts = odm.Compound(Verdicts, default=DEFAULT_VERDICTS, description="Minimum score value to get the specified verdict.") + temporary_keys: dict[str, str] = odm.mapping(odm.enum(TEMPORARY_KEY_TYPE), + description="Set the operation that will be used to update values " + "using this key in the temporary submission data.") +DEFAULT_TEMPORARY_KEYS = { + 'passwords': 'union', + 'ancestry': 'ignore', +} + DEFAULT_SUBMISSION = { 'default_max_extracted': 500, 'default_max_supplementary': 500, @@ -1350,7 +1365,8 @@ class Submission(odm.Model): 'max_temp_data_length': 4096, 'sha256_sources': [], 'tag_types': DEFAULT_TAG_TYPES, - 'verdicts': DEFAULT_VERDICTS + 'verdicts': DEFAULT_VERDICTS, + 'temporary_keys': DEFAULT_TEMPORARY_KEYS, } diff --git a/assemblyline/odm/models/result.py b/assemblyline/odm/models/result.py index be28dcb11..8dd561489 100644 --- a/assemblyline/odm/models/result.py +++ b/assemblyline/odm/models/result.py @@ -131,6 +131,7 @@ class Result(odm.Model): type = odm.Optional(odm.Keyword()) size = odm.Optional(odm.Integer()) drop_file = odm.Boolean(default=False, description="Use to not pass to other stages after this run") + partial = odm.Boolean(default=False, description="Invalidate the current result cache creation") from_archive = odm.Boolean(index=False, default=False, description="Was loaded from the archive") def build_key(self, service_tool_version=None, task=None): @@ -140,16 +141,17 @@ def build_key(self, service_tool_version=None, task=None): self.response.service_version, self.is_empty(), service_tool_version=service_tool_version, - task=task + task=task, + partial=self.partial ) @staticmethod - def help_build_key(sha256, service_name, service_version, is_empty, service_tool_version=None, task=None): + def help_build_key(sha256, service_name, service_version, is_empty, service_tool_version=None, task=None, partial=False): key_list = [ sha256, service_name.replace('.', '_'), f"v{service_version.replace('.', '_')}", - f"c{generate_conf_key(service_tool_version=service_tool_version, task=task)}", + f"c{generate_conf_key(service_tool_version=service_tool_version, task=task, partial_result=partial)}", ] if is_empty: @@ -173,6 +175,6 @@ def is_empty(self) -> bool: if len(self.response.extracted) == 0 and \ len(self.response.supplementary) == 0 and \ len(self.result.sections) == 0 and \ - self.result.score == 0: + self.result.score == 0 and not self.partial: return True return False diff --git a/assemblyline/odm/models/service.py b/assemblyline/odm/models/service.py index bc9e04015..2e1bf7991 100644 --- a/assemblyline/odm/models/service.py +++ b/assemblyline/odm/models/service.py @@ -139,6 +139,9 @@ class Service(odm.Model): default=False, description="Does this service use temp data from other services for analysis?") uses_metadata: bool = odm.Boolean( default=False, description="Does this service use submission metadata for analysis?") + monitored_keys: list[str] = odm.sequence( + odm.keyword(), default=[], + description="This service watches these temporary keys for changes when partial results are produced.") name: str = odm.Keyword(store=True, copyto="__text__", description="Name of service") version = odm.Keyword(store=True, description="Version of service") @@ -150,9 +153,8 @@ class Service(odm.Model): stage = odm.Keyword(store=True, default="CORE", copyto="__text__", description="Which execution stage does this service run in?") - submission_params: SubmissionParams = odm.List( - odm.Compound(SubmissionParams), - index=False, default=[], + submission_params: list[SubmissionParams] = odm.sequence( + odm.compound(SubmissionParams), index=False, default=[], description="Submission parameters of service") timeout: int = odm.Integer(default=60, description="Service task timeout, in seconds") diff --git a/assemblyline/odm/models/service_delta.py b/assemblyline/odm/models/service_delta.py index 5d699fa76..22f78ceea 100644 --- a/assemblyline/odm/models/service_delta.py +++ b/assemblyline/odm/models/service_delta.py @@ -105,6 +105,7 @@ class ServiceDelta(odm.Model): uses_tag_scores: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE) uses_temp_submission_data: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE) uses_metadata: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE) + monitored_keys = odm.optional(odm.sequence(odm.keyword())) name = odm.Optional(odm.Keyword(), store=True, copyto="__text__", description=REF_SERVICE) version = odm.Keyword(store=True, description=REF_SERVICE)