Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partial results #1346

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions assemblyline/common/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def __init__(self, timeout: float, expiry_rate: float = 5, raise_on_error: bool
self.raise_on_error = raise_on_error
self.cache: dict[Hashable, T] = {}
self.timeout_list: list[Tuple[float, Hashable]] = []
timeout_thread = threading.Thread(target=self._process_timeouts, name="_process_timeouts")
timeout_thread.setDaemon(True)
timeout_thread = threading.Thread(target=self._process_timeouts, name="_process_timeouts", daemon=True)
timeout_thread.start()

def __len__(self):
Expand Down Expand Up @@ -150,7 +149,7 @@ def keys(self):
return self.cache.keys()


def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional[Task] = None) -> str:
def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional[Task] = None, partial_result: bool = False) -> str:
ignore_salt = None
service_config = None
submission_params_str = None
Expand All @@ -165,7 +164,7 @@ def generate_conf_key(service_tool_version: Optional[str] = None, task: Optional
}
submission_params_str = json.dumps(sorted(submission_params.items()))

if task.ignore_cache:
if task.ignore_cache or partial_result:
ignore_salt = get_random_id()

if service_tool_version is None and \
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/datastore/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def get_single_result(self, key, cl_engine=forge.get_classification(), as_obj=Fa
if key.endswith(".e"):
data = self.create_empty_result_from_key(key, cl_engine, as_obj=as_obj)
else:
data = self.result.get(key, as_obj=False)
data = self.result.get(key, as_obj=as_obj)

return data

Expand Down
6 changes: 5 additions & 1 deletion assemblyline/odm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# Imports that have the same effect as some part of the one above so that
# type checking can use this file properly.
from assemblyline.odm.base import Keyword, Optional, Boolean, Integer, List, Compound, Mapping, Date
from assemblyline.odm.base import Keyword, Optional, Boolean, Integer, List, Compound, Mapping, Date, Enum
from datetime import datetime

_InnerType = typing.TypeVar("_InnerType")
Expand Down Expand Up @@ -53,3 +53,7 @@ def mapping(child_type: _InnerType, **kwargs) -> dict[str, _InnerType]:

def compound(child_type: typing.Callable[..., _InnerType], **kwargs) -> _InnerType:
return typing.cast(_InnerType, Compound(child_type, **kwargs))


def enum(values: typing.Iterable[str], **kwargs) -> str:
return typing.cast(str, Enum(values, **kwargs))
18 changes: 17 additions & 1 deletion assemblyline/odm/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,13 @@ class Verdicts(odm.Model):
}


TEMPORARY_KEY_TYPE = [
'union',
'overwrite',
'ignore',
]


@odm.model(index=False, store=False,
description="Default values for parameters for submissions that may be overridden on a per submission basis")
class Submission(odm.Model):
Expand All @@ -1336,8 +1343,16 @@ class Submission(odm.Model):
description="Tag types that show up in the submission summary")
verdicts = odm.Compound(Verdicts, default=DEFAULT_VERDICTS,
description="Minimum score value to get the specified verdict.")
temporary_keys: dict[str, str] = odm.mapping(odm.enum(TEMPORARY_KEY_TYPE),
description="Set the operation that will be used to update values "
"using this key in the temporary submission data.")


DEFAULT_TEMPORARY_KEYS = {
'passwords': 'union',
'ancestry': 'ignore',
}

DEFAULT_SUBMISSION = {
'default_max_extracted': 500,
'default_max_supplementary': 500,
Expand All @@ -1350,7 +1365,8 @@ class Submission(odm.Model):
'max_temp_data_length': 4096,
'sha256_sources': [],
'tag_types': DEFAULT_TAG_TYPES,
'verdicts': DEFAULT_VERDICTS
'verdicts': DEFAULT_VERDICTS,
'temporary_keys': DEFAULT_TEMPORARY_KEYS,
}


Expand Down
10 changes: 6 additions & 4 deletions assemblyline/odm/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class Result(odm.Model):
type = odm.Optional(odm.Keyword())
size = odm.Optional(odm.Integer())
drop_file = odm.Boolean(default=False, description="Use to not pass to other stages after this run")
partial = odm.Boolean(default=False, description="Invalidate the current result cache creation")
from_archive = odm.Boolean(index=False, default=False, description="Was loaded from the archive")

def build_key(self, service_tool_version=None, task=None):
Expand All @@ -140,16 +141,17 @@ def build_key(self, service_tool_version=None, task=None):
self.response.service_version,
self.is_empty(),
service_tool_version=service_tool_version,
task=task
task=task,
partial=self.partial
)

@staticmethod
def help_build_key(sha256, service_name, service_version, is_empty, service_tool_version=None, task=None):
def help_build_key(sha256, service_name, service_version, is_empty, service_tool_version=None, task=None, partial=False):
key_list = [
sha256,
service_name.replace('.', '_'),
f"v{service_version.replace('.', '_')}",
f"c{generate_conf_key(service_tool_version=service_tool_version, task=task)}",
f"c{generate_conf_key(service_tool_version=service_tool_version, task=task, partial_result=partial)}",
]

if is_empty:
Expand All @@ -173,6 +175,6 @@ def is_empty(self) -> bool:
if len(self.response.extracted) == 0 and \
len(self.response.supplementary) == 0 and \
len(self.result.sections) == 0 and \
self.result.score == 0:
self.result.score == 0 and not self.partial:
return True
return False
8 changes: 5 additions & 3 deletions assemblyline/odm/models/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class Service(odm.Model):
default=False, description="Does this service use temp data from other services for analysis?")
uses_metadata: bool = odm.Boolean(
default=False, description="Does this service use submission metadata for analysis?")
monitored_keys: list[str] = odm.sequence(
odm.keyword(), default=[],
description="This service watches these temporary keys for changes when partial results are produced.")

name: str = odm.Keyword(store=True, copyto="__text__", description="Name of service")
version = odm.Keyword(store=True, description="Version of service")
Expand All @@ -150,9 +153,8 @@ class Service(odm.Model):

stage = odm.Keyword(store=True, default="CORE", copyto="__text__",
description="Which execution stage does this service run in?")
submission_params: SubmissionParams = odm.List(
odm.Compound(SubmissionParams),
index=False, default=[],
submission_params: list[SubmissionParams] = odm.sequence(
odm.compound(SubmissionParams), index=False, default=[],
description="Submission parameters of service")
timeout: int = odm.Integer(default=60, description="Service task timeout, in seconds")

Expand Down
1 change: 1 addition & 0 deletions assemblyline/odm/models/service_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class ServiceDelta(odm.Model):
uses_tag_scores: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE)
uses_temp_submission_data: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE)
uses_metadata: bool = odm.Optional(odm.Boolean(), description=REF_SERVICE)
monitored_keys = odm.optional(odm.sequence(odm.keyword()))

name = odm.Optional(odm.Keyword(), store=True, copyto="__text__", description=REF_SERVICE)
version = odm.Keyword(store=True, description=REF_SERVICE)
Expand Down