diff --git a/.circleci/config.yml b/.circleci/config.yml index 6e68194..250ca71 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -12,10 +12,7 @@ jobs: - checkout - run: name: install dependencies - command: pip3 install -r requirements.txt --user - - run: - name: install test-dependencies - command: pip3 install -r test-requirements.txt --user + command: pip3 install -r requirements.txt -r test-requirements.txt --user - run: name: confirm black version command: black --version @@ -41,9 +38,6 @@ jobs: - run: name: install dependencies command: pip3 install -r requirements.txt --user - - run: - name: install test-dependencies - command: pip3 install -r test-requirements.txt --user - run: name: run tests command: python -m unittest @@ -72,7 +66,7 @@ jobs: - run: name: install dependencies command: > - pip3 install --user -r requirements.txt -r test-requirements.txt + pip3 install --user -r requirements.txt - run: name: check version command: | @@ -172,6 +166,7 @@ workflows: requires: - ensure_formatting - linter + - test - deploy: requires: - build diff --git a/README.md b/README.md index c723f00..3c60390 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ $ python3 -m pip install -e .[dev,doc] $ pre-commit install # Create your feature/fix # Create tests for your changes -$ pytest +$ python -m unittest # Push you feature/fix on Github $ git add [file(s)] $ git commit -m "[descriptive message]" @@ -62,31 +62,10 @@ To learn about the methods available for executing queries and retrieving their ## Tests -### Install dependencies +The standard `unittest` library is used for running the tests. ```bash -$ pip install -r ./test-requirements.txt -``` - -[pytest](https://docs.pytest.org/en/7.2.x/) is used to launch the tests. - -### Launch tests - -#### Prerequisite - -Your OpenBAS API should be running. -Your conftest.py should be configured with your API url, your token, and if applicable, your mTLS cert/key. - -#### Launching - -Unit tests -```bash -$ pytest ./tests/01-unit/ -``` - -Integration testing -```bash -$ pytest ./tests/02-integration/ +$ python -m unittest ``` ## About diff --git a/docs/conf.py b/docs/conf.py index f247155..ead483a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -18,12 +18,12 @@ # -- Project information ----------------------------------------------------- -project = "OpenCTI client for Python" +project = "OpenBAS client for Python" copyright = "2024, Filigran" -author = "OpenCTI Project" +author = "OpenBAS Project" # The full version, including alpha/beta/rc tags -release = "5.12.20" +release = "1.10.1" master_doc = "index" diff --git a/docs/index.rst b/docs/index.rst index c77519c..0b72a3d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,8 +1,8 @@ OpenBAS client for Python ========================= -The pycti library is designed to help OpenBAS users and developers to interact -with the OpenBAS platform GraphQL API. +The pyobas library is designed to help OpenBAS users and developers to interact +with the OpenBAS platform API. The Python library requires Python >= 3. @@ -11,7 +11,7 @@ The Python library requires Python >= 3. :caption: Contents: client_usage/getting_started.rst - pycti/pycti + pyobas/pyobas Indices and tables diff --git a/pyobas/__init__.py b/pyobas/__init__.py index 7b6db6b..95672b0 100644 --- a/pyobas/__init__.py +++ b/pyobas/__init__.py @@ -9,8 +9,10 @@ __title__, ) from pyobas.client import OpenBAS # noqa: F401 +from pyobas.configuration import * # noqa: F401,F403,F405 from pyobas.contracts import * # noqa: F401,F403,F405 from pyobas.exceptions import * # noqa: F401,F403,F405 +from pyobas.signatures import * # noqa: F401,F403,F405 __all__ = [ "__author__", diff --git a/pyobas/apis/inject_expectation/__init__.py b/pyobas/apis/inject_expectation/__init__.py new file mode 100644 index 0000000..183bc0e --- /dev/null +++ b/pyobas/apis/inject_expectation/__init__.py @@ -0,0 +1 @@ +from .inject_expectation import * # noqa: F401,F403 diff --git a/pyobas/apis/inject_expectation.py b/pyobas/apis/inject_expectation/inject_expectation.py similarity index 53% rename from pyobas/apis/inject_expectation.py rename to pyobas/apis/inject_expectation/inject_expectation.py index 24af0c9..df41bf6 100644 --- a/pyobas/apis/inject_expectation.py +++ b/pyobas/apis/inject_expectation/inject_expectation.py @@ -1,6 +1,11 @@ from typing import Any, Dict from pyobas import exceptions as exc +from pyobas.apis.inject_expectation.model import ( + DetectionExpectation, + ExpectationTypeEnum, + PreventionExpectation, +) from pyobas.base import RESTManager, RESTObject from pyobas.mixins import ListMixin, UpdateMixin from pyobas.utils import RequiredOptional @@ -29,6 +34,45 @@ def expectations_assets_for_source( result = self.openbas.http_get(path, **kwargs) return result + def expectations_models_for_source(self, source_id: str, **kwargs: Any): + """Returns all expectations from OpenBAS that have had no result yet + from the source_id (e.g. collector). + + :param source_id: the identifier of the collector requesting expectations + :type source_id: str + :param kwargs: additional data to pass to the endpoint + :type kwargs: dict, optional + + :return: a list of expectation objects + :rtype: list[DetectionExpectation|PreventionExpectation] + """ + # TODO: we should implement a more clever mechanism to obtain + # specialised Expectation instances rather than just if/elseing + # through this list of possibilities. + expectations = [] + for expectation_dict in self.expectations_assets_for_source( + source_id=source_id, **kwargs + ): + if ( + expectation_dict["inject_expectation_type"] + == ExpectationTypeEnum.Detection.value + ): + expectations.append( + DetectionExpectation(**expectation_dict, api_client=self) + ) + elif ( + expectation_dict["inject_expectation_type"] + == ExpectationTypeEnum.Prevention.value + ): + expectations.append( + PreventionExpectation(**expectation_dict, api_client=self) + ) + else: + expectations.append( + PreventionExpectation(**expectation_dict, api_client=self) + ) + return expectations + @exc.on_http_error(exc.OpenBASUpdateError) def prevention_expectations_for_source( self, source_id: str, **kwargs: Any diff --git a/pyobas/apis/inject_expectation/model/__init__.py b/pyobas/apis/inject_expectation/model/__init__.py new file mode 100644 index 0000000..8cedb2d --- /dev/null +++ b/pyobas/apis/inject_expectation/model/__init__.py @@ -0,0 +1,7 @@ +from .expectation import ( + DetectionExpectation, + ExpectationTypeEnum, + PreventionExpectation, +) + +__all__ = ["DetectionExpectation", "ExpectationTypeEnum", "PreventionExpectation"] diff --git a/pyobas/apis/inject_expectation/model/expectation.py b/pyobas/apis/inject_expectation/model/expectation.py new file mode 100644 index 0000000..739b2bf --- /dev/null +++ b/pyobas/apis/inject_expectation/model/expectation.py @@ -0,0 +1,172 @@ +from enum import Enum +from typing import List +from uuid import UUID + +from pydantic import BaseModel +from thefuzz import fuzz + +from pyobas.signatures.signature_type import SignatureType +from pyobas.signatures.types import MatchTypes, SignatureTypes + + +class ExpectationTypeEnum(str, Enum): + """Types of Expectations""" + + Detection = "DETECTION" + Prevention = "PREVENTION" + Other = "other" + + @classmethod + def _missing_(cls, value): + return cls.Other + + +class ExpectationSignature(BaseModel): + """An expectation signature describes a known marker potentially + found in alerting data in security software. For example, an + expectation signature can be a process image name, a command + line, or any other relevant piece of data. + """ + + type: SignatureTypes + value: str + + +class Expectation(BaseModel): + """An expectation represents an expected outcome of a BAS run. + For example, in the case of running an attack command line, the + expectation may be that security software has _detected_ it, while + another expectation may be that the attack was _prevented_. + """ + + inject_expectation_id: UUID + inject_expectation_signatures: List[ExpectationSignature] + + success_label: str = "Success" + failure_label: str = "Failure" + + def __init__(self, *a, **kw): + super().__init__(*a, **kw) + self.__api_client = kw["api_client"] + + def update(self, success, sender_id, metadata): + """Update the expectation object in OpenBAS with the supplied outcome. + + :param success: whether the expectation was fulfilled (true) or not (false) + :type success: bool + :param sender_id: identifier of the collector that is updating the expectation + :type sender_id: string + :param metadata: arbitrary dictionary of additional data relevant to updating the expectation + :type metadata: dict[string,string] + """ + self.__api_client.update( + self.inject_expectation_id, + inject_expectation={ + "collector_id": sender_id, + "result": (self.success_label if success else self.failure_label), + "is_success": success, + "metadata": metadata, + }, + ) + + def match_alert(self, relevant_signature_types: list[SignatureType], alert_data): + """Matches an alert's data against the current expectation signatures + to see if the alert is relevant to the current expectation's inject, + i.e. this alert was triggered by the execution of the inject to which + belongs the expectation. + + :param relevant_signature_types: filter of signature types that we want to consider. + Only the signature types listed in this collection may be checked for matching. + :type relevant_signature_types: list[SignatureType] + :param alert_data: list of possibly relevant markers found in an alert. + :type alert_data: dict[SignatureTypes, dict] + + :return: whether the alert matches the expectation signatures or not. + :rtype: bool + """ + relevant_expectation_signatures = [ + signature + for signature in self.inject_expectation_signatures + if signature.type in [type.label for type in relevant_signature_types] + ] + if not any(relevant_expectation_signatures): + return False + + for relevant_expectation_signature in relevant_expectation_signatures: + if not ( + alert_signature_for_type := alert_data.get( + relevant_expectation_signature.type.value + ) + ): + return False + + if alert_signature_for_type[ + "type" + ] == MatchTypes.MATCH_TYPE_FUZZY and not self.match_fuzzy( + alert_signature_for_type["data"], + relevant_expectation_signature.value, + alert_signature_for_type["score"], + ): + return False + if alert_signature_for_type[ + "type" + ] == MatchTypes.MATCH_TYPE_SIMPLE and not self.match_simple( + alert_signature_for_type["data"], relevant_expectation_signature.value + ): + return False + + return True + + @staticmethod + def match_fuzzy(tested: list[str], reference: str, threshold: int): + """Applies a fuzzy match against a known reference to a list of candidates + + :param tested: list of strings candidate for fuzzy matching + :type tested: list[str] + :param reference: the reference against which to try to fuzzy match + :type reference: str + :param threshold: string overlap percentage threshold above which to declare a match + :type threshold: int + + :return: whether any of the candidate is a match against the reference + :rtype: bool + """ + actual_tested = [tested] if isinstance(tested, str) else tested + for value in actual_tested: + ratio = fuzz.ratio(value, reference) + if ratio >= threshold: + return True + return False + + @staticmethod + def match_simple(tested: list[str], reference: str): + """A simple strict, case-sensitive string matching between a list of + candidates and a reference. + + :param tested: list of strings candidate for fuzzy matching + :type tested: list[str] + :param reference: the reference against which to try to fuzzy match + :type reference: str + + :return: whether any of the candidate is a match against the reference + :rtype: bool + """ + return Expectation.match_fuzzy(tested, reference, threshold=100) + + +class DetectionExpectation(Expectation): + """An expectation that is specific to Detection, i.e. that is used + by OpenBAS to assert that an inject's execution was detected. + """ + + success_label: str = "Detected" + failure_label: str = "Not Detected" + + +class PreventionExpectation(Expectation): + """An expectation that is specific to Prevention, i.e. that is used + by OpenBAS to assert that an inject's execution was prevented. + """ + + success_label: str = "Prevented" + failure_label: str = "Not Prevented" diff --git a/pyobas/configuration/__init__.py b/pyobas/configuration/__init__.py new file mode 100644 index 0000000..f9bbec1 --- /dev/null +++ b/pyobas/configuration/__init__.py @@ -0,0 +1,3 @@ +from .configuration import Configuration + +__all__ = ["Configuration"] diff --git a/pyobas/configuration/configuration.py b/pyobas/configuration/configuration.py new file mode 100644 index 0000000..bbc541d --- /dev/null +++ b/pyobas/configuration/configuration.py @@ -0,0 +1,188 @@ +import os +import os.path +from typing import Dict, Optional + +import yaml +from pydantic import BaseModel, Field + +from pyobas.configuration.sources import DictionarySource, EnvironmentSource + +CONFIGURATION_TYPES = str | int | bool | None + + +def is_truthy(value: str) -> bool: + """Asserts whether a given string signals a "True" value + + :param value: value to test + :type value: str + + :return: whether the string represents True or not. + :rtype: bool + """ + return value.lower() in ["yes", "true"] + + +def is_falsy(value: str) -> bool: + """Asserts whether a given string signals a "False" value + + :param value: value to test + :type value: str + + :return: whether the string represents False or not. + :rtype: bool + """ + return value.lower() in ["no", "false"] + + +class ConfigurationHint(BaseModel): + """An individual configuration hint. This allows for specifying + where any given configuration key can be found, in env vars, + config files. Additionally, it may define a default value or + a discrete override value. + """ + + data: Optional[CONFIGURATION_TYPES] = Field(default=None) + """Override value; when set, getting the configuration value for + the key described in this instance returns this value. + """ + env: Optional[str] = Field(default=None) + """Defines which env var should be read for getting the value + """ + file_path: Optional[list[str]] = Field(default=None) + """Defines a JSON path (nested keys) to follow in the provided + config file for reaching the value. + + Example: ["toplevel", "subkey"] will hint for searching for + the config key at { "toplevel": { "subkey": { "config_key"}} + """ + is_number: Optional[bool] = Field(default=False) + """Hints at whteher the configuration value should be + interpreted as a number. + """ + default: Optional[CONFIGURATION_TYPES] = Field(default=None) + """When defined, provides a default value for whenever none of the + hinted locations or the data field have a value. + """ + + +class Configuration: + """A configuration object providing ways to an interface for getting + configuration values. It should be provided with a collection of hints + to enable its behaviour. + + :param config_hints: a dictionary of hints, for which the key is the + desired configuration key (e.g. "log_level") and the value is either + a dictionary of hints (see ConfigurationHint) or a standalone string. + In the latter case, the string will be interpreted as a default value. + + Example: + .. code-block:: python + { + "my_config_key": { + "env" : "MY_CONFIG_VALUE_ENV_VAR", + "file_path": ["first_level", "second_level"] + }, + "my_other_config_key: "discrete value" + } + :type config_hints: Dict[str, dict | str] + :param config_values: dictionary of config values to preemptively load into the + Configuration object. The format of this dictionary should follow the patterns + chosen in the file_path property of ConfigurationHint object passed + as config_hints, defaults to None + + Example: + .. code-block:: python + { + "first_level": { + "second_level": { + "my_config_key": "some value" + } + } + } + :type config_values: dict (json), optional + :param config_file_path: path to the configuration file. The file should + contain a json structure that matches the format of the config_values param, + defaults to './config.yml' (relative path). + :type config_file_path: str + """ + + def __init__( + self, + config_hints: Dict[str, dict | str], + config_values: dict = None, + config_file_path: str = os.path.join(os.curdir, "config.yml"), + ): + self.__config_hints = { + key: ( + ConfigurationHint(**value) + if isinstance(value, dict) + else ConfigurationHint(**{"default": value}) + ) + for key, value in config_hints.items() + } + + file_contents = ( + yaml.load(open(config_file_path), Loader=yaml.FullLoader) + if os.path.isfile(config_file_path) + else {} + ) + + self.__config_values = (config_values or {}) | file_contents + + def get(self, config_key: str) -> CONFIGURATION_TYPES: + """Gets the value pointed to by the configuration key. If the key is defined + with actual hints (as opposed to a discrete value), it will use those hints to + potentially find a value. If the key was not defined as part of the supplied + config_hints, this will always return None. + + :param config_key: the configuration key to search a value for. + :type config_key: str + + :return: the value pointed to by the configuration key, or None if not found + :rtype: CONFIGURATION_TYPES + """ + config = self.__config_hints.get(config_key) + if config is None: + return None + + return self.__process_value_to_type( + config.data or self.__dig_config_sources_for_key(config), config.is_number + ) + + def set(self, config_key: str, value: CONFIGURATION_TYPES): + """Sets an arbitrary value in the Configuration object, for + the supplied configuration key, after which any request for the value + of that key will return this new value. + + :param config_key: the configuration key to set a value for. + :type config_key: str + :param value: the new value to set for the configuration key. + :type value: CONFIGURATION_TYPES + """ + if config_key not in self.__config_hints: + self.__config_hints[config_key] = ConfigurationHint(**{"data": value}) + else: + self.__config_hints[config_key].data = value + + @staticmethod + def __process_value_to_type(value: CONFIGURATION_TYPES, is_number_hint: bool): + if value is None: + return value + if isinstance(value, int) or is_number_hint: + return int(value) + if isinstance(value, str): + if is_truthy(value): + return True + if is_falsy(value): + return False + if len(value) == 0: + return None + return value + + def __dig_config_sources_for_key( + self, config: ConfigurationHint + ) -> CONFIGURATION_TYPES: + result = EnvironmentSource.get(config.env) or DictionarySource.get( + config.file_path, self.__config_values + ) + return result or config.default diff --git a/pyobas/configuration/sources.py b/pyobas/configuration/sources.py new file mode 100644 index 0000000..43a5b88 --- /dev/null +++ b/pyobas/configuration/sources.py @@ -0,0 +1,44 @@ +import os + + +class EnvironmentSource: + """A utility for fecthing a value in the env vars.""" + + @classmethod + def get(cls, env_var: str) -> str | None: + """Gets the value for the specified env var + + :param env_var: the name of the env var to query + :type env_var: str + + :return: value of the env var, or None if not found + :rtype: str | None + """ + return os.getenv(env_var) + + +class DictionarySource: + """A utility for fetching a value from within a JSON-like (nested dict) structure""" + + # this is quite hacky + # it only strictly handles two levels of keys in a dict + @classmethod + def get(cls, config_key_path: list[str], source_dict: dict) -> str | None: + """Gets the value for the specified env var + + :param config_key_path: the two-level dictionary path to the config key + :type config_key_path: list[str] + :param source_dict: JSON-like (nested dict) structure containing config values. + :type source_dict: dict + + :return: value for the config key at specified path, or None if not found + :rtype: str | None + """ + assert ( + isinstance(config_key_path, list) + and len(config_key_path) == 2 + and all([len(path_part) > 0 for path_part in config_key_path]) + ) + return source_dict.get(config_key_path[0], {config_key_path[1]: None}).get( + config_key_path[1] + ) diff --git a/pyobas/daemons/__init__.py b/pyobas/daemons/__init__.py new file mode 100644 index 0000000..4a0be2a --- /dev/null +++ b/pyobas/daemons/__init__.py @@ -0,0 +1,4 @@ +from .base_daemon import BaseDaemon +from .collector_daemon import CollectorDaemon + +__all__ = ["BaseDaemon", "CollectorDaemon"] diff --git a/pyobas/daemons/base_daemon.py b/pyobas/daemons/base_daemon.py new file mode 100644 index 0000000..7d004ce --- /dev/null +++ b/pyobas/daemons/base_daemon.py @@ -0,0 +1,131 @@ +from abc import ABC, abstractmethod +from inspect import signature +from types import FunctionType + +from pyobas.client import OpenBAS +from pyobas.configuration import Configuration +from pyobas.exceptions import OpenBASError +from pyobas.utils import logger + + +class BaseDaemon(ABC): + """A base class for implementing a kind of daemon that periodically polls + a given callback. + + :param configuration: configuration to provide the daemon + (allowing for looking up values within the callback for example) + :type configuration: Configuration + :param callback: a method or function to periodically call, defaults to None. + :type callback: callable, optional + :param logger: a logger object, to log events. if not supplied, a default logger + will be spawned to provide this functionality. + :type logger: Any + :param api_client: an API client that will provide connectivity with other systems. + :type api_client: Any + """ + + def __init__( + self, + configuration: Configuration, + callback: callable = None, + logger=None, + api_client=None, + ): + self._configuration = configuration + self._callback = callback + self.api = api_client or BaseDaemon.__get_default_api_client( + url=self._configuration.get("openbas_url"), + token=self._configuration.get("openbas_token"), + ) + + # logging + # compatibility layer: in order for older configs to still work, search for legacy names + actual_log_level = ( + self._configuration.get("log_level") + or self._configuration.get("collector_log_level") + or self._configuration.get("injector_log_level") + or "info" + ) + actual_log_name = ( + self._configuration.get("name") + or self._configuration.get("collector_name") + or self._configuration.get("injector_name") + or "daemon" + ) + self.logger = logger or BaseDaemon.__get_default_logger( + actual_log_level.upper(), + actual_log_name, + ) + + @abstractmethod + def _setup(self): + """A run-once method that inheritors must implement. This serves to instantiate + all useful objects and functionality for the implementor to run. + """ + pass + + @abstractmethod + def _start_loop(self): + """Starts the daemon's main execution loop. Implementors should implement + the main execution logic in here. + """ + pass + + def _try_callback(self): + """Tries to call the configured callback. Note that if any error is thrown, + it is immediately swallowed (but still logged) allowing the collector to keep + running. This is useful for any transient issue (e.g. API endpoint down...). + """ + try: + # this is some black magic to allow injecting the collector daemon instance + # into an arbitrary callback that has a specific argument name + # this allow for avoiding subclassing the CollectorDaemon class just to provide the callback + # Example: + # + # def standalone_func(collector): + # collector.api.call_openbas() + # + # CollectorDaemon(config=, standalone_func).start() + if ( + isinstance(self._callback, FunctionType) + and "collector" in signature(self._callback).parameters + ): + self._callback(collector=self) + else: + self._callback() + except Exception as err: # pylint: disable=broad-except + self.logger.error(f"Error calling: {err}") + + def start(self): + """Start the daemon. This will run the implementor's run-once setup method and + follow-up with the main execution loop. Note that at this point, if there is no + configured callback, the method will abort and kill the daemon. + """ + if self._callback is None: + raise OpenBASError("This daemon has no configured callback.") + self._setup() + self._start_loop() + + def set_callback(self, callback: callable): + """Configures a callback to call in the main execution loop. If the callback + was not provided in the daemon's ctor, this should be set before calling start(). + """ + self._callback = callback + + def get_id(self): + """Returns the daemon instance's ID contained in configuration. Configuration + must define any of these keys: `id`, `collector_id`, `injector_id`. + """ + return ( + self._configuration.get("id") + or self._configuration.get("collector_id") + or self._configuration.get("injector_id") + ) + + @classmethod + def __get_default_api_client(cls, url, token): + return OpenBAS(url=url, token=token) + + @classmethod + def __get_default_logger(cls, log_level, name): + return logger(log_level)(name) diff --git a/pyobas/daemons/collector_daemon.py b/pyobas/daemons/collector_daemon.py new file mode 100644 index 0000000..ffa33d5 --- /dev/null +++ b/pyobas/daemons/collector_daemon.py @@ -0,0 +1,73 @@ +import sched +import time + +from pyobas.daemons import BaseDaemon +from pyobas.utils import PingAlive + + +class CollectorDaemon(BaseDaemon): + """Implementation of a daemon of Collector type. Note that it requires + specific configuration keys to run its setup. + `collector_icon_filepath`: relative path to an icon image (preferably PNG) + `collector_id`: unique identifier for the collector (UUIDv4) + `collector_period`: time to wait in seconds between each loop execution; note + that this time is added to the time the loop takes to run, so the actual total + time between each loop start is time_of_loop+period. + """ + + def _setup(self): + icon_path = self._configuration.get("collector_icon_filepath") + icon_name = self._configuration.get("collector_id") + ".png" + with open(icon_path, "rb") as icon_file_handle: + collector_icon = (icon_name, icon_file_handle, "image/png") + document = self.api.document.upsert(document={}, file=collector_icon) + if self._configuration.get("collector_platform") is not None: + security_platform = self.api.security_platform.upsert( + { + "asset_name": self._configuration.get("collector_name"), + "asset_external_reference": self._configuration.get( + "collector_id" + ), + "security_platform_type": self._configuration.get( + "collector_platform" + ), + "security_platform_logo_light": document.get("document_id"), + "security_platform_logo_dark": document.get("document_id"), + } + ) + else: + security_platform = {} + security_platform_id = security_platform.get("asset_id") + config = { + "collector_id": self._configuration.get("collector_id"), + "collector_name": self._configuration.get("collector_name"), + "collector_type": self._configuration.get("collector_type"), + "collector_period": self._configuration.get("collector_period"), + "collector_security_platform": security_platform_id, + } + with open(icon_path, "rb") as icon_file_handle: + collector_icon = (icon_name, icon_file_handle, "image/png") + self.api.collector.create(config, collector_icon) + + PingAlive(self.api, config, self.logger, "collector").start() + + def _start_loop(self): + scheduler = sched.scheduler(time.time, time.sleep) + delay = self._configuration.get("collector_period") + self._try_callback() + scheduler.enter( + delay=delay, + priority=1, + action=self.__schedule, + argument=(scheduler, self._try_callback, delay), + ) + scheduler.run() + + def __schedule(self, scheduler, callback, delay): + callback() + scheduler.enter( + delay=delay, + priority=1, + action=self.__schedule, + argument=(scheduler, callback, delay), + ) diff --git a/pyobas/exceptions.py b/pyobas/exceptions.py index dd6fc66..ebd8560 100644 --- a/pyobas/exceptions.py +++ b/pyobas/exceptions.py @@ -67,6 +67,10 @@ class OpenBASCreateError(OpenBASError): pass +class ConfigurationError(OpenBASError): + pass + + # For an explanation of how these type-hints work see: # https://mypy.readthedocs.io/en/stable/generics.html#declaring-decorators # @@ -90,6 +94,7 @@ def wrapped_f(*args: Any, **kwargs: Any) -> Any: # Export manually to keep mypy happy __all__ = [ + "ConfigurationError", "OpenBASAuthenticationError", "OpenBASHttpError", "OpenBASParsingError", diff --git a/pyobas/helpers.py b/pyobas/helpers.py index 8225f36..d07251c 100644 --- a/pyobas/helpers.py +++ b/pyobas/helpers.py @@ -1,6 +1,7 @@ import base64 import json import os +import os.path import re import sched import ssl @@ -8,14 +9,15 @@ import threading import time import traceback -from datetime import datetime, timezone -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List import pika -import yaml from thefuzz import fuzz from pyobas import OpenBAS, utils +from pyobas.configuration import Configuration +from pyobas.daemons import CollectorDaemon +from pyobas.exceptions import ConfigurationError TRUTHY: List[str] = ["yes", "true", "True"] FALSY: List[str] = ["no", "false", "False"] @@ -75,69 +77,39 @@ def ssl_verify_locations(ssl_context, certdata): ssl_context.load_verify_locations(cafile=certdata) -def get_config_variable( - env_var: str, - yaml_path: List, - config: Dict = {}, - isNumber: Optional[bool] = False, - default=None, - required=False, -) -> Union[bool, int, None, str]: - """[summary] - - :param env_var: environment variable name - :param yaml_path: path to yaml config - :param config: client config dict, defaults to {} - :param isNumber: specify if the variable is a number, defaults to False - :param default: default value - """ - - if os.getenv(env_var) is not None: - result = os.getenv(env_var) - elif yaml_path is not None: - if yaml_path[0] in config and yaml_path[1] in config[yaml_path[0]]: - result = config[yaml_path[0]][yaml_path[1]] - else: - return default - else: - return default - - if result in TRUTHY: - return True - if result in FALSY: - return False - if isNumber: - return int(result) - - if ( - required - and default is None - and (result is None or (isinstance(result, str) and len(result) == 0)) - ): - raise ValueError("The configuration " + env_var + " is required") - - if isinstance(result, str) and len(result) == 0: - return default - - return result - - def create_mq_ssl_context(config) -> ssl.SSLContext: - use_ssl_ca = get_config_variable("MQ_USE_SSL_CA", ["mq", "use_ssl_ca"], config) - use_ssl_cert = get_config_variable( - "MQ_USE_SSL_CERT", ["mq", "use_ssl_cert"], config - ) - use_ssl_key = get_config_variable("MQ_USE_SSL_KEY", ["mq", "use_ssl_key"], config) - use_ssl_reject_unauthorized = get_config_variable( - "MQ_USE_SSL_REJECT_UNAUTHORIZED", - ["mq", "use_ssl_reject_unauthorized"], - config, - False, - False, - ) - use_ssl_passphrase = get_config_variable( - "MQ_USE_SSL_PASSPHRASE", ["mq", "use_ssl_passphrase"], config + config_obj = Configuration( + config_hints={ + "MQ_USE_SSL_CA": { + "env": "MQ_USE_SSL_CA", + "file_path": ["mq", "use_ssl_ca"], + }, + "MQ_USE_SSL_CERT": { + "env": "MQ_USE_SSL_CERT", + "file_path": ["mq", "use_ssl_cert"], + }, + "MQ_USE_SSL_KEY": { + "env": "MQ_USE_SSL_KEY", + "file_path": ["mq", "use_ssl_key"], + }, + "MQ_USE_SSL_REJECT_UNAUTHORIZED": { + "env": "MQ_USE_SSL_REJECT_UNAUTHORIZED", + "file_path": ["mq", "use_ssl_reject_unauthorized"], + "is_number": False, + "default": False, + }, + "MQ_USE_SSL_PASSPHRASE": { + "env": "MQ_USE_SSL_PASSPHRASE", + "file_path": ["mq", "use_ssl_passphrase"], + }, + }, + config_values=config, ) + use_ssl_ca = config_obj.get("MQ_USE_SSL_CA") + use_ssl_cert = config_obj.get("MQ_USE_SSL_CERT") + use_ssl_key = config_obj.get("MQ_USE_SSL_KEY") + use_ssl_reject_unauthorized = config_obj.get("MQ_USE_SSL_REJECT_UNAUTHORIZED") + use_ssl_passphrase = config_obj.get("MQ_USE_SSL_PASSPHRASE") ssl_context = ssl.create_default_context() # If no rejection allowed, use private function to generate unverified context if not use_ssl_reject_unauthorized: @@ -154,7 +126,7 @@ class ListenQueue(threading.Thread): def __init__( self, config: Dict, - injector_config: Dict, + injector_config, logger, callback, ) -> None: @@ -250,64 +222,38 @@ def stop(self): self.thread.join() -class PingAlive(threading.Thread): - def __init__(self, api, config, logger, ping_type) -> None: - threading.Thread.__init__(self) - self.ping_type = ping_type - self.api = api - self.config = config - self.logger = logger - self.in_error = False - self.exit_event = threading.Event() - - def ping(self) -> None: - while not self.exit_event.is_set(): - try: - if self.ping_type == "injector": - self.api.injector.create(self.config, False) - else: - self.api.collector.create(self.config, False) - except Exception as err: # pylint: disable=broad-except - self.logger.error("Error pinging the API: " + str(err)) - self.exit_event.wait(40) - - def run(self) -> None: - self.logger.info("Starting PingAlive thread") - self.ping() - - def stop(self) -> None: - self.logger.info("Preparing PingAlive for clean shutdown") - self.exit_event.set() +class PingAlive(utils.PingAlive): + pass +### DEPRECATED class OpenBASConfigHelper: def __init__(self, base_path, variables: Dict): - config_file_path = os.path.dirname(os.path.abspath(base_path)) + "/config.yml" - self.file_config = ( - yaml.load(open(config_file_path), Loader=yaml.FullLoader) - if os.path.isfile(config_file_path) - else {} + self.__config_obj = Configuration( + config_hints=variables, + config_file_path=os.path.join( + os.path.dirname(os.path.abspath(base_path)), "config.yml" + ), ) - self.variables = variables def get_conf(self, variable, is_number=None, default=None, required=None): - var = self.variables.get(variable) - if var is None: - return default - # If direct variable - if var.get("data") is not None: - return var.get("data") - # Else if file or env variable - return get_config_variable( - env_var=var["env"], - yaml_path=var["file_path"], - config=self.file_config, - isNumber=var["is_number"] if "is_number" in var else is_number, - default=var["default"] if "default" in var else default, - required=required, - ) + result = None + try: + result = self.__config_obj.get(variable) or default + except ConfigurationError: + result = default + finally: + if result is None and default is None and required: + raise ValueError( + f"Could not find required key {variable} with no available default." + ) + return result + def to_configuration(self): + return self.__config_obj + +### DEPRECATED class OpenBASCollectorHelper: def __init__( self, @@ -316,81 +262,36 @@ def __init__( security_platform_type=None, connect_run_and_terminate: bool = False, ) -> None: - self.config_helper = config - self.api = OpenBAS( - url=config.get_conf("openbas_url"), - token=config.get_conf("openbas_token"), - ) + config_obj = config.to_configuration() + # ensure the icon path is set in config + config_obj.set("collector_icon_filepath", icon) + # override the platform in config if passed this way + if security_platform_type is not None: + config_obj.set("collector_platform", security_platform_type) - self.logger_class = utils.logger( - config.get_conf("collector_log_level", default="info").upper(), - config.get_conf("collector_json_logging", default=True), + self.__daemon = CollectorDaemon( + configuration=config_obj, + callback=None, ) - self.collector_logger = self.logger_class(config.get_conf("collector_name")) - - icon_name = config.get_conf("collector_id") + ".png" - security_platform_id = None - if security_platform_type is not None: - collector_icon = (icon_name, open(icon, "rb"), "image/png") - document = self.api.document.upsert(document={}, file=collector_icon) - security_platform = self.api.security_platform.upsert( - { - "asset_name": config.get_conf("collector_name"), - "asset_external_reference": config.get_conf("collector_id"), - "security_platform_type": security_platform_type, - "security_platform_logo_light": document.get("document_id"), - "security_platform_logo_dark": document.get("document_id"), - } - ) - security_platform_id = security_platform.get("asset_id") + self.__daemon.logger.warning( + f"DEPRECATED: this collector should be migrated to use {CollectorDaemon}." + ) + # backwards compatibility + self.collector_logger = self.__daemon.logger + self.api = self.__daemon.api + self.config_helper = config self.config = { - "collector_id": config.get_conf("collector_id"), - "collector_name": config.get_conf("collector_name"), - "collector_type": config.get_conf("collector_type"), - "collector_period": config.get_conf("collector_period"), - "collector_security_platform": security_platform_id, + "collector_id": config_obj.get("collector_id"), + "collector_name": config_obj.get("collector_name"), + "collector_type": config_obj.get("collector_type"), + "collector_period": config_obj.get("collector_period"), } - collector_icon = (icon_name, open(icon, "rb"), "image/png") - self.api.collector.create(self.config, collector_icon) - # self.api.injector.create(self.config) - self.scheduler = sched.scheduler(time.time, time.sleep) - # Start ping thread - if not connect_run_and_terminate: - self.ping = PingAlive( - self.api, self.config, self.collector_logger, "collector" - ) - self.ping.start() - self.listen_queue = None - - def _schedule(self, scheduler, message_callback, delay): - # Execute - try: - message_callback() - except Exception as err: # pylint: disable=broad-except - self.collector_logger.error("Error collecting: " + str(err)) - - # Then schedule the next execution - scheduler.enter(delay, 1, self._schedule, (scheduler, message_callback, delay)) - def schedule(self, message_callback, delay): - # Start execution directly - try: - message_callback() - now = datetime.now(timezone.utc).isoformat() - self.api.collector.update( - self.config_helper.get_conf("collector_id"), - {"collector_last_execution": now}, - ) - except Exception as err: # pylint: disable=broad-except - self.collector_logger.error("Error collecting: " + str(err)) - # Then schedule the next execution - self.scheduler.enter( - delay, 1, self._schedule, (self.scheduler, message_callback, delay) - ) - self.scheduler.run() + self.__daemon.set_callback(message_callback) + self.__daemon.start() class OpenBASInjectorHelper: diff --git a/pyobas/signatures/signature_type.py b/pyobas/signatures/signature_type.py index 8b2c410..a4471bf 100644 --- a/pyobas/signatures/signature_type.py +++ b/pyobas/signatures/signature_type.py @@ -3,6 +3,18 @@ class SignatureType: + """Describes a signature of some time and a matching policy + + :param label: Type specifier + :type label: SignatureTypes + :param match_type: the matching policy to use when trying + to match this signature type, e.g. fuzzy, simple... + :type match_type: MatchTypes + :param match_score: if the matching type is fuzzy, this is + the score to use as threshold, defaults to None + :type match_score: int, optional + """ + def __init__( self, label: SignatureTypes, @@ -12,15 +24,22 @@ def __init__( self.label = label self.match_policy = SignatureMatch(match_type, match_score) - # provided some `data`, formats a dictionary specifying the matching - # policy to use by the helper to match expected signatures (from expectations) - # with actual, alert signatures (from the security software) - # Output: { - # "type": str, - # "data": any, - # "score": (optional) int - # } def make_struct_for_matching(self, data): + """Provided some `data`, formats a dictionary specifying the matching + policy to use by the helper to match expected signatures (from expectations) + with actual, alert signatures (from the security software) + + :param data: arbitrary data, but most often string or a number primitive + :type: Any + + :return: dictionary of matching specifiers:: + { + "type": str, + "data": any, + "score": (optional) int + } + :rtype: dict + """ struct = { "type": self.match_policy.match_type.value, "data": data, diff --git a/pyobas/signatures/types.py b/pyobas/signatures/types.py index 195a720..cd2b7f3 100644 --- a/pyobas/signatures/types.py +++ b/pyobas/signatures/types.py @@ -1,11 +1,16 @@ from enum import Enum -class MatchTypes(Enum): +class MatchTypes(str, Enum): MATCH_TYPE_FUZZY = "fuzzy" MATCH_TYPE_SIMPLE = "simple" -class SignatureTypes(Enum): +class SignatureTypes(str, Enum): SIG_TYPE_PARENT_PROCESS_NAME = "parent_process_name" SIG_TYPE_HOSTNAME = "hostname" + SIG_TYPE_PROCESS_NAME = "process_name" + SIG_TYPE_COMMAND_LINE = "command_line" + SIG_TYPE_FILE_NAME = "file_name" + SIG_TYPE_IPV4 = "ipv4_address" + SIG_TYPE_IPV6 = "ipv6_address" diff --git a/pyobas/utils.py b/pyobas/utils.py index dbdb21c..2f51b82 100644 --- a/pyobas/utils.py +++ b/pyobas/utils.py @@ -3,6 +3,7 @@ import email.message import json import logging +import threading import urllib.parse from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union @@ -170,3 +171,33 @@ def error(self, message, meta=None): ) return AppLogger + + +class PingAlive(threading.Thread): + def __init__(self, api, config, logger, ping_type) -> None: + threading.Thread.__init__(self) + self.ping_type = ping_type + self.api = api + self.config = config + self.logger = logger + self.in_error = False + self.exit_event = threading.Event() + + def ping(self) -> None: + while not self.exit_event.is_set(): + try: + if self.ping_type == "injector": + self.api.injector.create(self.config, False) + else: + self.api.collector.create(self.config, False) + except Exception as err: # pylint: disable=broad-except + self.logger.error("Error pinging the API: " + str(err)) + self.exit_event.wait(40) + + def run(self) -> None: + self.logger.info("Starting PingAlive thread") + self.ping() + + def stop(self) -> None: + self.logger.info("Preparing PingAlive for clean shutdown") + self.exit_event.set() diff --git a/requirements.txt b/requirements.txt index a4d66db..06b28e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ python-magic~=0.4.27; sys_platform == 'linux' or sys_platform == 'darwin' python-magic-bin~=0.4.14; sys_platform == 'win32' python_json_logger~=2.0.4 PyYAML~=6.0 +pydantic~=2.10.4 requests~=2.32.3 setuptools~=70.3.0 cachetools~=5.5.0 diff --git a/setup.cfg b/setup.cfg index c4e017e..9ab919c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,8 +27,12 @@ python_requires = >=3.7 packages = pyobas pyobas.apis + pyobas.apis.inject_expectation + pyobas.apis.inject_expectation.model pyobas.backends + pyobas.configuration pyobas.contracts + pyobas.daemons pyobas.signatures include_package_data = True @@ -40,6 +44,7 @@ install_requires = python-magic-bin~=0.4.14; sys_platform == 'win32' python_json_logger~=2.0.4 PyYAML~=6.0 + pydantic~=2.10.4 requests~=2.32.3 setuptools~=70.3.0 cachetools~=5.5.0 diff --git a/test-requirements.txt b/test-requirements.txt index 21f33f4..9b1a1db 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,12 +1,2 @@ black==24.10.0 -build>=0.7 -isort>=5.10 -pre-commit~=3.3 -pytest~=8.1 -pytest-cases~=3.6 -pytest-cov~=4.1 -pytest_randomly~=3.8 -types-python-dateutil>=2.8 -types-pytz>=2021.3.5 -wheel~=0.36 -thefuzz~=0.22 \ No newline at end of file +isort>=5.10 \ No newline at end of file diff --git a/test/apis/__init__.py b/test/apis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/apis/expectation/__init__.py b/test/apis/expectation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/apis/expectation/test_expectation.py b/test/apis/expectation/test_expectation.py new file mode 100644 index 0000000..5bcbe0b --- /dev/null +++ b/test/apis/expectation/test_expectation.py @@ -0,0 +1,335 @@ +import unittest.mock +from uuid import uuid4 + +from pyobas.apis.inject_expectation.model import ( + DetectionExpectation, + PreventionExpectation, +) +from pyobas.signatures.signature_type import SignatureType +from pyobas.signatures.types import MatchTypes, SignatureTypes + + +def create_mock_api_client(): + return unittest.mock.MagicMock() + + +class TestExpectation(unittest.TestCase): + def test_when_detection_model_when_success_update_labels_correct(self): + inject_expectation_id = uuid4() + sender_id = uuid4() + api_client = create_mock_api_client() + model = DetectionExpectation( + **{ + "inject_expectation_id": inject_expectation_id, + "inject_expectation_signatures": [], + }, + api_client=api_client, + ) + + model.update(success=True, sender_id=sender_id, metadata={}) + + api_client.update.assert_called_once_with( + inject_expectation_id, + inject_expectation={ + "collector_id": sender_id, + "result": "Detected", + "is_success": True, + "metadata": {}, + }, + ) + + def test_when_detection_model_when_failure_update_labels_correct(self): + inject_expectation_id = uuid4() + sender_id = uuid4() + api_client = create_mock_api_client() + model = DetectionExpectation( + **{ + "inject_expectation_id": inject_expectation_id, + "inject_expectation_signatures": [], + }, + api_client=api_client, + ) # we don't care to construct a functional object + + model.update(success=False, sender_id=sender_id, metadata={}) + + api_client.update.assert_called_once_with( + inject_expectation_id, + inject_expectation={ + "collector_id": sender_id, + "result": "Not Detected", + "is_success": False, + "metadata": {}, + }, + ) + + def test_when_prevention_model_when_success_update_labels_correct(self): + inject_expectation_id = uuid4() + sender_id = uuid4() + api_client = create_mock_api_client() + model = PreventionExpectation( + **{ + "inject_expectation_id": inject_expectation_id, + "inject_expectation_signatures": [], + }, + api_client=api_client, + ) # we don't care to construct a functional object + + model.update(success=True, sender_id=sender_id, metadata={}) + + api_client.update.assert_called_once_with( + inject_expectation_id, + inject_expectation={ + "collector_id": sender_id, + "result": "Prevented", + "is_success": True, + "metadata": {}, + }, + ) + + def test_when_prevention_model_when_failure_update_labels_correct(self): + inject_expectation_id = uuid4() + sender_id = uuid4() + api_client = create_mock_api_client() + model = PreventionExpectation( + **{ + "inject_expectation_id": inject_expectation_id, + "inject_expectation_signatures": [], + }, + api_client=api_client, + ) # we don't care to construct a functional object + + model.update(success=False, sender_id=sender_id, metadata={}) + + api_client.update.assert_called_once_with( + inject_expectation_id, + inject_expectation={ + "collector_id": sender_id, + "result": "Not Prevented", + "is_success": False, + "metadata": {}, + }, + ) + + def test_when_no_expectation_signature_is_relevant_match_alert_return_false(self): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + ], + }, + api_client=create_mock_api_client(), + ) + + relevant_signature_types = [ + SignatureType( + label=SignatureTypes.SIG_TYPE_HOSTNAME, + match_type=MatchTypes.MATCH_TYPE_SIMPLE, + ) + ] + + alert_data = ( + { # irrelevant but aligned with expectation sigs to validate no match + SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME.value: { + "type": MatchTypes.MATCH_TYPE_FUZZY.value, + "data": "parent.exe", + "score": 95, + }, + } + ) + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertFalse(matched) + + def test_when_relevant_signature_when_none_match_alert_return_false(self): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + ], + }, + api_client=create_mock_api_client(), + ) + + parent_process_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + relevant_signature_types = [parent_process_signature_type] + + alert_data = { + parent_process_signature_type.label.value: parent_process_signature_type.make_struct_for_matching( + data="not_parent.exe" + ) + } + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertFalse(matched) + + def test_when_relevant_signature_when_all_signatures_match_alert_return_true(self): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + ], + }, + api_client=create_mock_api_client(), + ) + + parent_process_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + relevant_signature_types = [parent_process_signature_type] + + alert_data = { + parent_process_signature_type.label.value: parent_process_signature_type.make_struct_for_matching( + data="parent.exe" + ) + } + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertTrue(matched) + + def test_when_relevant_signature_when_all_signatures_match_alert_when_passing_array_return_true( + self, + ): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + ], + }, + api_client=create_mock_api_client(), + ) + + parent_process_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + relevant_signature_types = [parent_process_signature_type] + + alert_data = { + parent_process_signature_type.label.value: parent_process_signature_type.make_struct_for_matching( + data=["parent.exe", "some_other_process"] + ) + } + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertTrue(matched) + + def test_when_relevant_signatures_when_alert_data_missing_for_some_relevant_signatures_return_false( + self, + ): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + {"type": SignatureTypes.SIG_TYPE_FILE_NAME, "value": "filename"}, + ], + }, + api_client=create_mock_api_client(), + ) + + parent_process_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + file_name_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_FILE_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + relevant_signature_types = [ + parent_process_signature_type, + file_name_signature_type, + ] + + alert_data = { + parent_process_signature_type.label.value: parent_process_signature_type.make_struct_for_matching( + data="parent.exe" + ) + } + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertFalse(matched) + + def test_when_relevant_signatures_when_some_alert_data_dont_match_return_false( + self, + ): + model = DetectionExpectation( + **{ + "inject_expectation_id": uuid4(), + "inject_expectation_signatures": [ + { + "type": SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + "value": "parent.exe", + }, + { + "type": SignatureTypes.SIG_TYPE_FILE_NAME, + "value": "some_file.odt", + }, + ], + }, + api_client=create_mock_api_client(), + ) + + parent_process_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_PARENT_PROCESS_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + file_name_signature_type = SignatureType( + label=SignatureTypes.SIG_TYPE_FILE_NAME, + match_type=MatchTypes.MATCH_TYPE_FUZZY, + match_score=95, + ) + relevant_signature_types = [ + parent_process_signature_type, + file_name_signature_type, + ] + + alert_data = { + parent_process_signature_type.label.value: parent_process_signature_type.make_struct_for_matching( + data="parent.exe" + ), + file_name_signature_type.label.value: file_name_signature_type.make_struct_for_matching( + data="some_other_file.doc" + ), + } + + matched = model.match_alert(relevant_signature_types, alert_data) + + self.assertFalse(matched) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/configuration/__init__.py b/test/configuration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/configuration/test_configuration.py b/test/configuration/test_configuration.py new file mode 100644 index 0000000..3bf7fc1 --- /dev/null +++ b/test/configuration/test_configuration.py @@ -0,0 +1,257 @@ +import os +import unittest +from unittest.mock import patch + +from pyobas.configuration import Configuration + +TEST_CONFIG_HINTS = { + "string_config_direct": {"data": "this is string_config_direct"}, + "string_config_no_default": { + "env": "PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_NO_DEFAULT", + "file_path": ["pyobas_test_configuration", "string_config_no_default"], + }, + "int_config_no_default": { + "env": "PYOBAS_TEST_CONFIGURATION_INT_CONFIG_NO_DEFAULT", + "file_path": ["pyobas_test_configuration", "int_config_no_default"], + "is_number": True, + }, + "int_config_no_default_not_marked_number": { + "env": "PYOBAS_TEST_CONFIGURATION_INT_CONFIG_NO_DEFAULT", + "file_path": [ + "pyobas_test_configuration", + "int_config_no_default_not_marked_number", + ], + }, + "string_config_with_default": { + "env": "PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_WITH_DEFAULT", + "file_path": ["pyobas_test_configuration", "string_config_with_default"], + "default": "default string config", + }, + "int_config_with_default": { + "env": "PYOBAS_TEST_CONFIGURATION_INT_CONFIG_WITH_DEFAULT", + "file_path": ["pyobas_test_configuration", "int_config_with_default"], + "is_number": True, + "default": 777_777, + }, + "bool_config_with_default": { + "env": "PYOBAS_TEST_CONFIGURATION_BOOL_CONFIG_WITH_DEFAULT", + "file_path": ["pyobas_test_configuration", "bool_config_with_default"], + "default": True, + }, +} + + +class TestConfiguration(unittest.TestCase): + def test_when_string_config_has_no_default_when_key_not_found_return_None(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("string_config_no_default") + + self.assertIsNone(value) + + def test_when_config_has_no_default_when_key_is_set_as_bool_return_bool(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + config_obj.set("string_config_no_default", True) + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, True) + + def test_when_config_has_no_default_when_key_is_set_as_string_bool_return_bool( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + config_obj.set("string_config_no_default", "yes") + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, True) + + def test_when_string_config_has_default_when_key_not_found_return_default(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("string_config_with_default") + + self.assertEqual(value, "default string config") + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_NO_DEFAULT": "actual value"}, + clear=True, + ) + def test_when_string_config_has_no_default_when_key_is_in_env_return_env_value( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, "actual value") + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_WITH_DEFAULT": "actual value"}, + clear=True, + ) + def test_when_string_config_has_default_when_key_is_in_env_return_env_value(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("string_config_with_default") + + self.assertEqual(value, "actual value") + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_NO_DEFAULT": "actual value"}, + clear=True, + ) + def test_when_key_is_in_both_env_and_file_return_env_value(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + config_values={ + "pyobas_test_configuration": { + "string_config_no_default": "another value" + } + }, + ) + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, "actual value") + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_NO_DEFAULT": "env value"}, + clear=True, + ) + def test_when_key_is_in_both_env_and_file_when_value_is_set_return_set_value(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + config_values={ + "pyobas_test_configuration": {"string_config_no_default": "file value"} + }, + ) + config_obj.set("string_config_no_default", "set value") + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, "set value") + + def test_when_string_config_has_no_default_when_key_is_in_file_return_file_value( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + config_values={ + "pyobas_test_configuration": { + "string_config_no_default": "another value" + } + }, + ) + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, "another value") + + def test_when_int_config_has_no_default_when_key_is_not_found_return_None(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("int_config_no_default") + + self.assertIsNone(value) + + def test_when_int_config_has_default_when_key_is_not_found_return_default(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("int_config_with_default") + + self.assertEqual(value, 777_777) + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_INT_CONFIG_NO_DEFAULT": "1234"}, + clear=True, + ) + def test_when_int_config_has_no_default_when_key_is_in_env_return_env_value(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("int_config_no_default") + + self.assertEqual(value, 1234) + + def test_when_int_config_has_no_default_when_key_is_passed_as_int_return_int_value( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + config_values={ + "pyobas_test_configuration": {"int_config_no_default": 456_123} + }, + ) + + value = config_obj.get("int_config_no_default") + + self.assertEqual(value, 456_123) + + def test_when_int_config_has_no_default_when_key_is_passed_as_int_when_config_is_not_marked_as_number_return_int_value( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + config_values={ + "pyobas_test_configuration": { + "int_config_no_default_not_marked_number": 842_204 + } + }, + ) + + value = config_obj.get("int_config_no_default_not_marked_number") + + self.assertEqual(value, 842_204) + + @patch.dict( + os.environ, + values={"PYOBAS_TEST_CONFIGURATION_STRING_CONFIG_NO_DEFAULT": "yes"}, + clear=True, + ) + def test_when_string_config_has_no_default_when_string_is_boolean_when_key_is_in_env_return_env_value( + self, + ): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("string_config_no_default") + + self.assertEqual(value, True) + + def test_when_bool_config_has_default_when_key_is_not_found_return_default(self): + config_obj = Configuration( + config_hints=TEST_CONFIG_HINTS, + ) + + value = config_obj.get("bool_config_with_default") + + self.assertEqual(value, True) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/configuration/test_sources.py b/test/configuration/test_sources.py new file mode 100644 index 0000000..692a029 --- /dev/null +++ b/test/configuration/test_sources.py @@ -0,0 +1,69 @@ +import os +import unittest +from unittest.mock import patch + +from pyobas.configuration.sources import DictionarySource, EnvironmentSource + +TEST_ENV_VAR: str = "PYOBAS_TEST_ENV_VAR" + + +class TestEnvironmentSource(unittest.TestCase): + @patch.dict(os.environ, clear=True) + def test_when_env_var_not_set_return_none(self): + result = EnvironmentSource.get(TEST_ENV_VAR) + + self.assertIsNone(result) + + @patch.dict(os.environ, values={TEST_ENV_VAR: "some_value"}, clear=True) + def test_when_env_var_is_set_return_value(self): + result = EnvironmentSource.get(TEST_ENV_VAR) + + self.assertEqual(result, "some_value") + + +class TestDictionarySource(unittest.TestCase): + def test_when_config_key_path_is_not_list_it_throws(self): + string_key_path = "some string" + with self.assertRaises(AssertionError): + DictionarySource.get(string_key_path, {}) + + def test_when_config_key_path_has_not_2_elements_it_throws(self): + key_path = ["element 1"] + with self.assertRaises(AssertionError): + DictionarySource.get(key_path, {}) + + def test_when_config_key_path_has_empty_elements_it_throws(self): + key_path = ["element 1", ""] + with self.assertRaises(AssertionError): + DictionarySource.get(key_path, {}) + + def test_when_config_missing_first_path_part_return_None(self): + expected_value = None + key_path = ["element 1", "element 2"] + values = {"not element 1": None} + + result = DictionarySource.get(key_path, values) + + self.assertEqual(result, expected_value) + + def test_when_config_missing_second_path_part_return_None(self): + expected_value = None + key_path = ["element 1", "element 2"] + values = {"element 1": {"not element 2": "some value"}} + + result = DictionarySource.get(key_path, values) + + self.assertEqual(result, expected_value) + + def test_when_config_found_return_value(self): + expected_value = "expected!" + key_path = ["element 1", "element 2"] + values = {"element 1": {"element 2": expected_value}} + + result = DictionarySource.get(key_path, values) + + self.assertEqual(result, expected_value) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/daemons/__init__.py b/test/daemons/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/daemons/test_base_daemon.py b/test/daemons/test_base_daemon.py new file mode 100644 index 0000000..a85c40f --- /dev/null +++ b/test/daemons/test_base_daemon.py @@ -0,0 +1,109 @@ +import unittest +import unittest.mock + +from pyobas.configuration import Configuration +from pyobas.daemons import BaseDaemon +from pyobas.exceptions import OpenBASError + +TEST_DAEMON_CONFIG_HINTS = { + "openbas_url": {"data": "http://example.com"}, + "openbas_token": {"data": "test"}, + "log_level": {"data": "info"}, + "name": {"data": "my test daemon"}, +} + +TEST_DAEMON_CONFIGURATION = Configuration(config_hints=TEST_DAEMON_CONFIG_HINTS) + + +class DaemonForTest(BaseDaemon): + def _setup(self): + pass + + def _start_loop(self): + pass + + +def create_mock_daemon(callback: callable = None): + mock_setup = unittest.mock.MagicMock() + mock_start_loop = unittest.mock.MagicMock() + inner_mock_func = unittest.mock.MagicMock() + + class MockDaemon(BaseDaemon): + def _setup(self): + mock_setup() + + def _start_loop(self): + mock_start_loop() + + def bound_method(self): + inner_mock_func() + + return ( + MockDaemon( + configuration=TEST_DAEMON_CONFIGURATION, + callback=callback, + # silence logging in tests + logger=unittest.mock.MagicMock(), + ), + mock_setup, + mock_start_loop, + inner_mock_func, + ) + + +class TestBaseDaemon(unittest.TestCase): + def test_when_no_callback_when_complete_config_ctor_ok(self): + daemon = DaemonForTest(configuration=TEST_DAEMON_CONFIGURATION) + + self.assertIsInstance(daemon, BaseDaemon) + + def test_when_no_callback_when_lacking_config_key_ctor_throws(self): + with self.assertRaises(Exception): + DaemonForTest(configuration=Configuration(config_hints={})) + + def test_when_no_callback_daemon_cant_start(self): + daemon, mock_setup, mock_start_loop, _ = create_mock_daemon() + + with self.assertRaises(OpenBASError): + daemon.start() + + mock_setup.assert_not_called() + mock_start_loop.assert_not_called() + + def test_when_callback_daemon_can_start(self): + daemon, mock_setup, mock_start_loop, _ = create_mock_daemon(lambda: None) + + daemon.start() + + mock_setup.assert_called_once() + mock_start_loop.assert_called_once() + + def test_when_callback_is_bound_method_daemon_can_call(self): + daemon, mock_setup, mock_start_loop, inner_mock_func = create_mock_daemon() + daemon.set_callback(daemon.bound_method) + + daemon._try_callback() + + inner_mock_func.assert_called_once() + + def test_when_callback_is_func_with_collector_parameter_daemon_can_call(self): + daemon, mock_setup, mock_start_loop, _ = create_mock_daemon() + inner_mock_func = unittest.mock.MagicMock() + daemon.set_callback(lambda collector: inner_mock_func()) + + daemon._try_callback() + + inner_mock_func.assert_called_once() + + def test_when_callback_is_func_with_other_parameter_daemon_cant_call(self): + daemon, mock_setup, mock_start_loop, _ = create_mock_daemon() + inner_mock_func = unittest.mock.MagicMock() + daemon.set_callback(lambda other_parameter: inner_mock_func()) + + daemon._try_callback() + + inner_mock_func.assert_not_called() + + +if __name__ == "__main__": + unittest.main()