Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
91a7eff
[client] Update dependency black to >=25.11.0,<25.12.0
renovate[bot] Dec 1, 2025
678799d
[client] fix(configuration): correctly eval config.data to avoid fall…
antoinemzs Dec 3, 2025
acc7189
Merge remote-tracking branch 'origin/main' into release/current
guillaumejparis Dec 8, 2025
9cedee6
[client] Update dependency isort to >=6.1.0,<6.2.0
renovate[bot] Dec 10, 2025
e365510
[client] Update dependency build to >=1.3.0,<1.4.0
renovate[bot] Dec 10, 2025
d7e3090
Merge remote-tracking branch 'origin/main' into release/current
guillaumejparis Dec 11, 2025
e0d05d9
Merge remote-tracking branch 'origin/main' into release/current
guillaumejparis Dec 15, 2025
cccff9e
[client] feat(SCV): add domains management on contract creations (#4266)
gabriel-peze Dec 16, 2025
931a072
Merge remote-tracking branch 'origin/main' into release/current
guillaumejparis Dec 16, 2025
8659f13
[client] fix(SCV): rename wrong domain name (#4266)
gabriel-peze Jan 6, 2026
ff0d61d
[client] fix(SCV): rename last wrong domain name (#4266)
gabriel-peze Jan 6, 2026
c4607f3
Merge remote-tracking branch 'origin/main' into release/current
guillaumejparis Jan 8, 2026
1de6fdd
[all] chore(merge): merge back 2.0.11
antoinemzs Jan 14, 2026
9294e7f
Merge branch 'main' into release/current
antoinemzs Jan 20, 2026
74cbbd3
[client] feat(backend): expose config schema (#4386)
camrrx Jan 20, 2026
bf5183d
Merge branch 'main' into release/current
antoinemzs Jan 21, 2026
c2a050b
[client] fix(settings): remove platform setting from base configs
antoinemzs Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pyoaev/configuration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
from .configuration import Configuration
from .settings_loader import (
BaseConfigModel,
ConfigLoaderCollector,
ConfigLoaderOAEV,
SettingsLoader,
)

__all__ = ["Configuration"]
__all__ = [
"Configuration",
"ConfigLoaderOAEV",
"ConfigLoaderCollector",
"SettingsLoader",
"BaseConfigModel",
]
27 changes: 26 additions & 1 deletion pyoaev/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import yaml
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings

from pyoaev.configuration.connector_config_schema_generator import (
ConnectorConfigSchemaGenerator,
)
from pyoaev.configuration.sources import DictionarySource, EnvironmentSource

CONFIGURATION_TYPES = str | int | bool | Any | None
Expand Down Expand Up @@ -111,6 +115,7 @@ def __init__(
config_hints: Dict[str, dict | str],
config_values: dict = None,
config_file_path: str = os.path.join(os.curdir, "config.yml"),
config_base_model: BaseSettings = None,
):
self.__config_hints = {
key: (
Expand All @@ -129,6 +134,8 @@ def __init__(

self.__config_values = (config_values or {}) | file_contents

self.__base_model = config_base_model

def get(self, config_key: str) -> CONFIGURATION_TYPES:
"""Gets the value pointed to by the configuration key. If the key is defined
with actual hints (as opposed to a discrete value), it will use those hints to
Expand All @@ -146,7 +153,12 @@ def get(self, config_key: str) -> CONFIGURATION_TYPES:
return None

return self.__process_value_to_type(
config.data or self.__dig_config_sources_for_key(config), config.is_number
(
self.__dig_config_sources_for_key(config)
if config.data is None
else config.data
),
config.is_number,
)

def set(self, config_key: str, value: CONFIGURATION_TYPES):
Expand All @@ -164,6 +176,19 @@ def set(self, config_key: str, value: CONFIGURATION_TYPES):
else:
self.__config_hints[config_key].data = value

def schema(self):
"""
Generates the complete connector schema using a custom schema generator compatible with Pydantic.
Isolate custom class generator, Pydantic expects a class, not an instance
Always subclass GenerateJsonSchema and pass the class to Pydantic, not an instance
:return: The generated connector schema as a dictionary.
"""
return self.__base_model.model_json_schema(
by_alias=False,
schema_generator=ConnectorConfigSchemaGenerator,
mode="validation",
)

@staticmethod
def __process_value_to_type(value: CONFIGURATION_TYPES, is_number_hint: bool):
if value is None:
Expand Down
127 changes: 127 additions & 0 deletions pyoaev/configuration/connector_config_schema_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
## ADAPTED FROM https://github.com/OpenCTI-Platform/connectors/blob/5c8cf1235f62f5651c9c08d0b67f1bd182662c8a/shared/tools/composer/generate_connectors_config_schemas/generate_connector_config_json_schema.py.sample

from copy import deepcopy
from typing import override

from pydantic.json_schema import GenerateJsonSchema

# attributes filtered from the connector configuration before generating the manifest
__FILTERED_ATTRIBUTES__ = [
# connector id is generated
"CONNECTOR_ID",
]


class ConnectorConfigSchemaGenerator(GenerateJsonSchema):
@staticmethod
def dereference_schema(schema_with_refs):
"""Return a new schema with all internal $ref resolved."""

def _resolve(schema, root):
if isinstance(schema, dict):
if "$ref" in schema:
ref_path = schema["$ref"]
if ref_path.startswith("#/$defs/"):
def_name = ref_path.split("/")[-1]
# Deep copy to avoid mutating $defs
resolved = deepcopy(root["$defs"][def_name])
return _resolve(resolved, root)
else:
raise ValueError(f"Unsupported ref format: {ref_path}")
else:
return {
schema_key: _resolve(schema_value, root)
for schema_key, schema_value in schema.items()
}
elif isinstance(schema, list):
return [_resolve(item, root) for item in schema]
else:
return schema

return _resolve(deepcopy(schema_with_refs), schema_with_refs)

@staticmethod
def flatten_config_loader_schema(root_schema: dict):
"""
Flatten config loader schema so all config vars are described at root level.

:param root_schema: Original schema.
:return: Flatten schema.
"""
flat_json_schema = {
"$schema": root_schema["$schema"],
"$id": root_schema["$id"],
"type": "object",
"properties": {},
"required": [],
"additionalProperties": root_schema.get("additionalProperties", True),
}

for (
config_loader_namespace_name,
config_loader_namespace_schema,
) in root_schema["properties"].items():
config_schema = config_loader_namespace_schema.get("properties", {})
required_config_vars = config_loader_namespace_schema.get("required", [])

for config_var_name, config_var_schema in config_schema.items():
property_name = (
f"{config_loader_namespace_name.upper()}_{config_var_name.upper()}"
)

config_var_schema.pop("title", None)

flat_json_schema["properties"][property_name] = config_var_schema

if config_var_name in required_config_vars:
flat_json_schema["required"].append(property_name)

return flat_json_schema

@staticmethod
def filter_schema(schema):
for filtered_attribute in __FILTERED_ATTRIBUTES__:
if filtered_attribute in schema["properties"]:
del schema["properties"][filtered_attribute]
schema.update(
{
"required": [
item
for item in schema["required"]
if item != filtered_attribute
]
}
)

return schema

@override
def generate(self, schema, mode="validation"):
json_schema = super().generate(schema, mode=mode)

json_schema["$schema"] = self.schema_dialect
json_schema["$id"] = "config.schema.json"
dereferenced_schema = self.dereference_schema(json_schema)
flattened_schema = self.flatten_config_loader_schema(dereferenced_schema)
return self.filter_schema(flattened_schema)

@override
def nullable_schema(self, schema):
"""Generates a JSON schema that matches a schema that allows null values.

Args:
schema: The core schema.

Returns:
The generated JSON schema.

Notes:
This method overrides `GenerateJsonSchema.nullable_schema` to generate schemas without `anyOf` keyword.
"""
null_schema = {"type": "null"}
inner_json_schema = self.generate_inner(schema["schema"])

if inner_json_schema == null_schema:
return null_schema
else:
return inner_json_schema
125 changes: 125 additions & 0 deletions pyoaev/configuration/settings_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
from abc import ABC
from datetime import timedelta
from pathlib import Path
from typing import Annotated, Literal

from pydantic import BaseModel, ConfigDict, Field, HttpUrl, PlainSerializer
from pydantic_settings import (
BaseSettings,
DotEnvSettingsSource,
PydanticBaseSettingsSource,
SettingsConfigDict,
YamlConfigSettingsSource,
)


class BaseConfigModel(BaseModel, ABC):
"""Base class for global config models
To prevent attributes from being modified after initialization.
"""

model_config = ConfigDict(extra="allow", frozen=True, validate_default=True)


class SettingsLoader(BaseSettings):
model_config = SettingsConfigDict(
frozen=True,
extra="allow",
env_nested_delimiter="_",
env_nested_max_split=1,
enable_decoding=False,
)

@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
"""Customise the sources of settings for the connector.

This method is called by the Pydantic BaseSettings class to determine the order of sources.
The configuration come in this order either from:
1. Environment variables
2. YAML file
3. .env file
4. Default values

The variables loading order will remain the same as in `pycti.get_config_variable()`:
1. If a config.yml file is found, the order will be: `ENV VAR` → config.yml → default value
2. If a .env file is found, the order will be: `ENV VAR` → .env → default value
"""
_main_path = os.curdir

settings_cls.model_config["env_file"] = f"{_main_path}/../.env"

if not settings_cls.model_config["yaml_file"]:
if Path(f"{_main_path}/config.yml").is_file():
settings_cls.model_config["yaml_file"] = f"{_main_path}/config.yml"
if Path(f"{_main_path}/../config.yml").is_file():
settings_cls.model_config["yaml_file"] = f"{_main_path}/../config.yml"

if Path(settings_cls.model_config["yaml_file"] or "").is_file(): # type: ignore
return (
env_settings,
YamlConfigSettingsSource(settings_cls),
)
if Path(settings_cls.model_config["env_file"] or "").is_file(): # type: ignore
return (
env_settings,
DotEnvSettingsSource(settings_cls),
)
return (env_settings,)


LogLevelToLower = Annotated[
Literal["debug", "info", "warn", "error"],
PlainSerializer(lambda v: "".join(v), return_type=str),
]

HttpUrlToString = Annotated[HttpUrl, PlainSerializer(str, return_type=str)]
TimedeltaInSeconds = Annotated[
timedelta, PlainSerializer(lambda v: int(v.total_seconds()), return_type=int)
]


class ConfigLoaderOAEV(BaseConfigModel):
"""OpenAEV/OpenAEV platform configuration settings.

Contains URL and authentication token for connecting to the OpenAEV platform.
"""

url: HttpUrlToString = Field(
description="The OpenAEV platform URL.",
)
token: str = Field(
description="The token for the OpenAEV platform.",
)


class ConfigLoaderCollector(BaseConfigModel):
"""Base collector configuration settings.

Contains common collector settings including identification, logging,
scheduling, and platform information.
"""

id: str = Field(description="ID of the collector.")

name: str = Field(description="Name of the collector")

log_level: LogLevelToLower | None = Field(
default="error",
description="Determines the verbosity of the logs.",
)
period: timedelta | None = Field(
default=timedelta(minutes=1),
description="Duration between two scheduled runs of the collector (ISO 8601 format).",
)
icon_filepath: str | None = Field(
description="Path to the icon file of the collector.",
)
13 changes: 13 additions & 0 deletions pyoaev/contracts/contract_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ class ContractConfig:
color_light: str


@dataclass
class Domain:
domain_id: str
domain_name: str
domain_color: str
domain_created_at: str
domain_updated_at: str


@dataclass
class Contract:
contract_id: str
Expand All @@ -141,6 +150,7 @@ class Contract:
is_atomic_testing: bool = True
platforms: List[str] = field(default_factory=list)
external_id: str = None
domains: List[Domain] = None

def add_attack_pattern(self, var: str):
self.contract_attack_patterns_external_ids.append(var)
Expand All @@ -163,6 +173,7 @@ def to_contract_add_input(self, source_id: str):
"contract_content": json.dumps(self, cls=utils.EnhancedJSONEncoder),
"is_atomic_testing": self.is_atomic_testing,
"contract_platforms": self.platforms,
"contract_domains": self.domains,
}

def to_contract_update_input(self):
Expand All @@ -174,6 +185,7 @@ def to_contract_update_input(self):
"contract_content": json.dumps(self, cls=utils.EnhancedJSONEncoder),
"is_atomic_testing": self.is_atomic_testing,
"contract_platforms": self.platforms,
"contract_domains": self.domains,
}


Expand Down Expand Up @@ -203,6 +215,7 @@ def prepare_contracts(contracts):
"contract_attack_patterns_external_ids": c.contract_attack_patterns_external_ids,
"contract_content": json.dumps(c, cls=utils.EnhancedJSONEncoder),
"contract_platforms": c.platforms,
"contract_domains": c.domains,
},
contracts,
)
Expand Down
Loading