Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ jobs:
- checkout
- run:
name: install dependencies
command: pip3 install -r requirements.txt --user
- run:
name: install test-dependencies
command: pip3 install -r test-requirements.txt --user
command: pip3 install -r requirements.txt -r test-requirements.txt --user
- run:
name: confirm black version
command: black --version
Expand All @@ -41,9 +38,6 @@ jobs:
- run:
name: install dependencies
command: pip3 install -r requirements.txt --user
- run:
name: install test-dependencies
command: pip3 install -r test-requirements.txt --user
- run:
name: run tests
command: python -m unittest
Expand Down Expand Up @@ -72,7 +66,7 @@ jobs:
- run:
name: install dependencies
command: >
pip3 install --user -r requirements.txt -r test-requirements.txt
pip3 install --user -r requirements.txt
- run:
name: check version
command: |
Expand Down Expand Up @@ -172,6 +166,7 @@ workflows:
requires:
- ensure_formatting
- linter
- test
- deploy:
requires:
- build
Expand Down
27 changes: 3 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ $ python3 -m pip install -e .[dev,doc]
$ pre-commit install
# Create your feature/fix
# Create tests for your changes
$ pytest
$ python -m unittest
# Push you feature/fix on Github
$ git add [file(s)]
$ git commit -m "[descriptive message]"
Expand All @@ -62,31 +62,10 @@ To learn about the methods available for executing queries and retrieving their

## Tests

### Install dependencies
The standard `unittest` library is used for running the tests.

```bash
$ pip install -r ./test-requirements.txt
```

[pytest](https://docs.pytest.org/en/7.2.x/) is used to launch the tests.

### Launch tests

#### Prerequisite

Your OpenBAS API should be running.
Your conftest.py should be configured with your API url, your token, and if applicable, your mTLS cert/key.

#### Launching

Unit tests
```bash
$ pytest ./tests/01-unit/
```

Integration testing
```bash
$ pytest ./tests/02-integration/
$ python -m unittest
```

## About
Expand Down
6 changes: 3 additions & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

# -- Project information -----------------------------------------------------

project = "OpenCTI client for Python"
project = "OpenBAS client for Python"
copyright = "2024, Filigran"
author = "OpenCTI Project"
author = "OpenBAS Project"

# The full version, including alpha/beta/rc tags
release = "5.12.20"
release = "1.10.1"

master_doc = "index"

Expand Down
6 changes: 3 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
OpenBAS client for Python
=========================

The pycti library is designed to help OpenBAS users and developers to interact
with the OpenBAS platform GraphQL API.
The pyobas library is designed to help OpenBAS users and developers to interact
with the OpenBAS platform API.

The Python library requires Python >= 3.

Expand All @@ -11,7 +11,7 @@ The Python library requires Python >= 3.
:caption: Contents:

client_usage/getting_started.rst
pycti/pycti
pyobas/pyobas


Indices and tables
Expand Down
2 changes: 2 additions & 0 deletions pyobas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
__title__,
)
from pyobas.client import OpenBAS # noqa: F401
from pyobas.configuration import * # noqa: F401,F403,F405
from pyobas.contracts import * # noqa: F401,F403,F405
from pyobas.exceptions import * # noqa: F401,F403,F405
from pyobas.signatures import * # noqa: F401,F403,F405

__all__ = [
"__author__",
Expand Down
1 change: 1 addition & 0 deletions pyobas/apis/inject_expectation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .inject_expectation import * # noqa: F401,F403
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from typing import Any, Dict

from pyobas import exceptions as exc
from pyobas.apis.inject_expectation.model import (
DetectionExpectation,
ExpectationTypeEnum,
PreventionExpectation,
)
from pyobas.base import RESTManager, RESTObject
from pyobas.mixins import ListMixin, UpdateMixin
from pyobas.utils import RequiredOptional
Expand Down Expand Up @@ -29,6 +34,45 @@ def expectations_assets_for_source(
result = self.openbas.http_get(path, **kwargs)
return result

def expectations_models_for_source(self, source_id: str, **kwargs: Any):
"""Returns all expectations from OpenBAS that have had no result yet
from the source_id (e.g. collector).

:param source_id: the identifier of the collector requesting expectations
:type source_id: str
:param kwargs: additional data to pass to the endpoint
:type kwargs: dict, optional

:return: a list of expectation objects
:rtype: list[DetectionExpectation|PreventionExpectation]
"""
# TODO: we should implement a more clever mechanism to obtain
# specialised Expectation instances rather than just if/elseing
# through this list of possibilities.
expectations = []
for expectation_dict in self.expectations_assets_for_source(
source_id=source_id, **kwargs
):
if (
expectation_dict["inject_expectation_type"]
== ExpectationTypeEnum.Detection.value
):
expectations.append(
DetectionExpectation(**expectation_dict, api_client=self)
)
elif (
expectation_dict["inject_expectation_type"]
== ExpectationTypeEnum.Prevention.value
):
expectations.append(
PreventionExpectation(**expectation_dict, api_client=self)
)
else:
expectations.append(
PreventionExpectation(**expectation_dict, api_client=self)
)
return expectations

@exc.on_http_error(exc.OpenBASUpdateError)
def prevention_expectations_for_source(
self, source_id: str, **kwargs: Any
Expand Down
7 changes: 7 additions & 0 deletions pyobas/apis/inject_expectation/model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .expectation import (
DetectionExpectation,
ExpectationTypeEnum,
PreventionExpectation,
)

__all__ = ["DetectionExpectation", "ExpectationTypeEnum", "PreventionExpectation"]
172 changes: 172 additions & 0 deletions pyobas/apis/inject_expectation/model/expectation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from enum import Enum
from typing import List
from uuid import UUID

from pydantic import BaseModel
from thefuzz import fuzz

from pyobas.signatures.signature_type import SignatureType
from pyobas.signatures.types import MatchTypes, SignatureTypes


class ExpectationTypeEnum(str, Enum):
"""Types of Expectations"""

Detection = "DETECTION"
Prevention = "PREVENTION"
Other = "other"

@classmethod
def _missing_(cls, value):
return cls.Other


class ExpectationSignature(BaseModel):
"""An expectation signature describes a known marker potentially
found in alerting data in security software. For example, an
expectation signature can be a process image name, a command
line, or any other relevant piece of data.
"""

type: SignatureTypes
value: str


class Expectation(BaseModel):
"""An expectation represents an expected outcome of a BAS run.
For example, in the case of running an attack command line, the
expectation may be that security software has _detected_ it, while
another expectation may be that the attack was _prevented_.
"""

inject_expectation_id: UUID
inject_expectation_signatures: List[ExpectationSignature]

success_label: str = "Success"
failure_label: str = "Failure"

def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.__api_client = kw["api_client"]

def update(self, success, sender_id, metadata):
"""Update the expectation object in OpenBAS with the supplied outcome.

:param success: whether the expectation was fulfilled (true) or not (false)
:type success: bool
:param sender_id: identifier of the collector that is updating the expectation
:type sender_id: string
:param metadata: arbitrary dictionary of additional data relevant to updating the expectation
:type metadata: dict[string,string]
"""
self.__api_client.update(
self.inject_expectation_id,
inject_expectation={
"collector_id": sender_id,
"result": (self.success_label if success else self.failure_label),
"is_success": success,
"metadata": metadata,
},
)

def match_alert(self, relevant_signature_types: list[SignatureType], alert_data):
"""Matches an alert's data against the current expectation signatures
to see if the alert is relevant to the current expectation's inject,
i.e. this alert was triggered by the execution of the inject to which
belongs the expectation.

:param relevant_signature_types: filter of signature types that we want to consider.
Only the signature types listed in this collection may be checked for matching.
:type relevant_signature_types: list[SignatureType]
:param alert_data: list of possibly relevant markers found in an alert.
:type alert_data: dict[SignatureTypes, dict]

:return: whether the alert matches the expectation signatures or not.
:rtype: bool
"""
relevant_expectation_signatures = [
signature
for signature in self.inject_expectation_signatures
if signature.type in [type.label for type in relevant_signature_types]
]
if not any(relevant_expectation_signatures):
return False

for relevant_expectation_signature in relevant_expectation_signatures:
if not (
alert_signature_for_type := alert_data.get(
relevant_expectation_signature.type.value
)
):
return False

if alert_signature_for_type[
"type"
] == MatchTypes.MATCH_TYPE_FUZZY and not self.match_fuzzy(
alert_signature_for_type["data"],
relevant_expectation_signature.value,
alert_signature_for_type["score"],
):
return False
if alert_signature_for_type[
"type"
] == MatchTypes.MATCH_TYPE_SIMPLE and not self.match_simple(
alert_signature_for_type["data"], relevant_expectation_signature.value
):
return False

return True

@staticmethod
def match_fuzzy(tested: list[str], reference: str, threshold: int):
"""Applies a fuzzy match against a known reference to a list of candidates

:param tested: list of strings candidate for fuzzy matching
:type tested: list[str]
:param reference: the reference against which to try to fuzzy match
:type reference: str
:param threshold: string overlap percentage threshold above which to declare a match
:type threshold: int

:return: whether any of the candidate is a match against the reference
:rtype: bool
"""
actual_tested = [tested] if isinstance(tested, str) else tested
for value in actual_tested:
ratio = fuzz.ratio(value, reference)
if ratio >= threshold:
return True
return False

@staticmethod
def match_simple(tested: list[str], reference: str):
"""A simple strict, case-sensitive string matching between a list of
candidates and a reference.

:param tested: list of strings candidate for fuzzy matching
:type tested: list[str]
:param reference: the reference against which to try to fuzzy match
:type reference: str

:return: whether any of the candidate is a match against the reference
:rtype: bool
"""
return Expectation.match_fuzzy(tested, reference, threshold=100)


class DetectionExpectation(Expectation):
"""An expectation that is specific to Detection, i.e. that is used
by OpenBAS to assert that an inject's execution was detected.
"""

success_label: str = "Detected"
failure_label: str = "Not Detected"


class PreventionExpectation(Expectation):
"""An expectation that is specific to Prevention, i.e. that is used
by OpenBAS to assert that an inject's execution was prevented.
"""

success_label: str = "Prevented"
failure_label: str = "Not Prevented"
3 changes: 3 additions & 0 deletions pyobas/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .configuration import Configuration

__all__ = ["Configuration"]
Loading