Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 171 additions & 2 deletions pycti/connector/opencti_connector_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,21 @@


def killProgramHook(etype, value, tb):
"""Exception hook to terminate the program.

:param etype: Exception type
:param value: Exception value
:param tb: Traceback object
"""
os.kill(os.getpid(), signal.SIGTERM)


def start_loop(loop):
"""Start an asyncio event loop.

:param loop: The asyncio event loop to start
:type loop: asyncio.AbstractEventLoop
"""
asyncio.set_event_loop(loop)
loop.run_forever()

Expand Down Expand Up @@ -93,10 +104,24 @@ def get_config_variable(


def is_memory_certificate(certificate):
"""Check if a certificate is provided as a PEM string in memory.

:param certificate: The certificate data to check
:type certificate: str
:return: True if the certificate is a PEM string, False otherwise
:rtype: bool
"""
return certificate.startswith("-----BEGIN")


def ssl_verify_locations(ssl_context, certdata):
"""Load certificate verification locations into SSL context.

:param ssl_context: The SSL context to configure
:type ssl_context: ssl.SSLContext
:param certdata: Certificate data (file path or PEM string)
:type certdata: str or None
"""
if certdata is None:
return

Expand All @@ -106,9 +131,17 @@ def ssl_verify_locations(ssl_context, certdata):
ssl_context.load_verify_locations(cafile=certdata)


# As cert must be written in files to be loaded in ssl context
# Creates a temporary file in the most secure manner possible
def data_to_temp_file(data):
"""Write data to a temporary file securely.

Creates a temporary file in the most secure manner possible.
The file is readable and writable only by the creating user ID.

:param data: The data to write to the temporary file
:type data: str
:return: Path to the created temporary file
:rtype: str
"""
# The file is readable and writable only by the creating user ID.
# If the operating system uses permission bits to indicate whether a
# file is executable, the file is executable by no one. The file
Expand All @@ -121,6 +154,17 @@ def data_to_temp_file(data):


def ssl_cert_chain(ssl_context, cert_data, key_data, passphrase):
"""Load certificate chain into SSL context.

:param ssl_context: The SSL context to configure
:type ssl_context: ssl.SSLContext
:param cert_data: Certificate data (file path or PEM string)
:type cert_data: str or None
:param key_data: Private key data (file path or PEM string)
:type key_data: str or None
:param passphrase: Passphrase for the private key
:type passphrase: str or None
"""
if cert_data is None:
return

Expand All @@ -147,6 +191,13 @@ def ssl_cert_chain(ssl_context, cert_data, key_data, passphrase):


def create_callback_ssl_context(config) -> ssl.SSLContext:
"""Create SSL context for API callback server.

:param config: Configuration dictionary
:type config: dict
:return: Configured SSL context
:rtype: ssl.SSLContext
"""
listen_protocol_api_ssl_key = get_config_variable(
"LISTEN_PROTOCOL_API_SSL_KEY",
["connector", "listen_protocol_api_ssl_key"],
Expand Down Expand Up @@ -176,6 +227,13 @@ def create_callback_ssl_context(config) -> ssl.SSLContext:


def create_mq_ssl_context(config) -> ssl.SSLContext:
"""Create SSL context for message queue connections.

:param config: Configuration dictionary
:type config: dict
:return: Configured SSL context for MQ connections
:rtype: ssl.SSLContext
"""
use_ssl_ca = get_config_variable("MQ_USE_SSL_CA", ["mq", "use_ssl_ca"], config)
use_ssl_cert = get_config_variable(
"MQ_USE_SSL_CERT", ["mq", "use_ssl_cert"], config
Expand Down Expand Up @@ -292,6 +350,11 @@ def _process_message(self, channel, method, properties, body) -> None:
)

def _set_draft_id(self, draft_id):
"""Set the draft ID for the helper and API instances.

:param draft_id: The draft ID to set
:type draft_id: str
"""
self.helper.draft_id = draft_id
self.helper.api.set_draft_id(draft_id)
self.helper.api_impersonate.set_draft_id(draft_id)
Expand Down Expand Up @@ -546,6 +609,11 @@ def run(self) -> None:
raise ValueError("Unsupported listen protocol type")

def stop(self):
"""Stop the ListenQueue thread and close connections.

This method sets the exit event, closes the RabbitMQ connection,
and waits for the processing thread to complete.
"""
self.helper.connector_logger.info("Preparing ListenQueue for clean shutdown")
self.exit_event.set()
self.pika_connection.close()
Expand Down Expand Up @@ -794,6 +862,10 @@ def run(self) -> None: # pylint: disable=too-many-branches
sys.excepthook(*sys.exc_info())

def stop(self):
"""Stop the ListenStream thread.

This method sets the exit event to signal the stream listening thread to stop.
"""
self.helper.connector_logger.info("Preparing ListenStream for clean shutdown")
self.exit_event.set()

Expand All @@ -817,6 +889,11 @@ def __init__(

@property
def all_details(self):
"""Get all connector information details as a dictionary.

:return: Dictionary containing all connector status information
:rtype: dict
"""
return {
"run_and_terminate": self._run_and_terminate,
"buffering": self._buffering,
Expand All @@ -832,6 +909,11 @@ def run_and_terminate(self) -> bool:

@run_and_terminate.setter
def run_and_terminate(self, value):
"""Set the run_and_terminate flag.

:param value: Whether the connector should run once and terminate
:type value: bool
"""
self._run_and_terminate = value

@property
Expand All @@ -840,6 +922,11 @@ def buffering(self) -> bool:

@buffering.setter
def buffering(self, value):
"""Set the buffering status.

:param value: Whether the connector is currently buffering
:type value: bool
"""
self._buffering = value

@property
Expand All @@ -848,6 +935,11 @@ def queue_threshold(self) -> float:

@queue_threshold.setter
def queue_threshold(self, value):
"""Set the queue threshold value.

:param value: The queue size threshold in MB
:type value: float
"""
self._queue_threshold = value

@property
Expand All @@ -856,6 +948,11 @@ def queue_messages_size(self) -> float:

@queue_messages_size.setter
def queue_messages_size(self, value):
"""Set the current queue messages size.

:param value: The current size of messages in the queue in MB
:type value: float
"""
self._queue_messages_size = value

@property
Expand All @@ -864,6 +961,11 @@ def next_run_datetime(self) -> datetime:

@next_run_datetime.setter
def next_run_datetime(self, value):
"""Set the next scheduled run datetime.

:param value: The datetime for the next scheduled run
:type value: datetime
"""
self._next_run_datetime = value

@property
Expand All @@ -872,6 +974,11 @@ def last_run_datetime(self) -> datetime:

@last_run_datetime.setter
def last_run_datetime(self, value):
"""Set the last run datetime.

:param value: The datetime of the last run
:type value: datetime
"""
self._last_run_datetime = value


Expand Down Expand Up @@ -1244,6 +1351,11 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
self.listen_queue = None

def stop(self) -> None:
"""Stop the connector and clean up resources.

This method stops all running threads (listen queue, ping thread) and
unregisters the connector from OpenCTI.
"""
self.connector_logger.info("Preparing connector for clean shutdown")
if self.listen_queue:
self.listen_queue.stop()
Expand All @@ -1253,9 +1365,20 @@ def stop(self) -> None:
self.api.connector.unregister(self.connector_id)

def get_name(self) -> Optional[Union[bool, int, str]]:
"""Get the connector name.

:return: The name of the connector
:rtype: Optional[Union[bool, int, str]]
"""
return self.connect_name

def get_stream_collection(self):
"""Get the stream collection configuration.

:return: Stream collection configuration dictionary
:rtype: dict
:raises ValueError: If no stream is connected
"""
if self.connect_live_stream_id is not None:
if self.connect_live_stream_id in ["live", "raw"]:
return {
Expand Down Expand Up @@ -1290,12 +1413,27 @@ def get_stream_collection(self):
raise ValueError("This connector is not connected to any stream")

def get_only_contextual(self) -> Optional[Union[bool, int, str]]:
"""Get the only_contextual configuration value.

:return: Whether the connector processes only contextual data
:rtype: Optional[Union[bool, int, str]]
"""
return self.connect_only_contextual

def get_run_and_terminate(self) -> Optional[Union[bool, int, str]]:
"""Get the run_and_terminate configuration value.

:return: Whether the connector should run once and terminate
:rtype: Optional[Union[bool, int, str]]
"""
return self.connect_run_and_terminate

def get_validate_before_import(self) -> Optional[Union[bool, int, str]]:
"""Get the validate_before_import configuration value.

:return: Whether to validate data before importing
:rtype: Optional[Union[bool, int, str]]
"""
return self.connect_validate_before_import

def set_state(self, state) -> None:
Expand Down Expand Up @@ -1326,6 +1464,11 @@ def get_state(self) -> Optional[Dict]:
return None

def force_ping(self):
"""Force a ping to the OpenCTI API to update connector state.

This method manually triggers a ping to synchronize the connector state
with the OpenCTI platform.
"""
try:
initial_state = self.get_state()
connector_info = self.connector_info.all_details
Expand Down Expand Up @@ -1738,12 +1881,27 @@ def listen_stream(
return self.listen_stream

def get_opencti_url(self) -> Optional[Union[bool, int, str]]:
"""Get the OpenCTI URL.

:return: The URL of the OpenCTI platform
:rtype: Optional[Union[bool, int, str]]
"""
return self.opencti_url

def get_opencti_token(self) -> Optional[Union[bool, int, str]]:
"""Get the OpenCTI API token.

:return: The API token for OpenCTI authentication
:rtype: Optional[Union[bool, int, str]]
"""
return self.opencti_token

def get_connector(self) -> OpenCTIConnector:
"""Get the OpenCTIConnector instance.

:return: The OpenCTIConnector instance
:rtype: OpenCTIConnector
"""
return self.connector

def date_now(self) -> str:
Expand Down Expand Up @@ -2293,6 +2451,17 @@ def get_attribute_in_mitre_extension(key, object) -> any:
return None

def get_data_from_enrichment(self, data, standard_id, opencti_entity):
"""Extract STIX entity and objects from enrichment data.

:param data: The enrichment data containing a bundle
:type data: dict
:param standard_id: The STIX standard ID of the entity
:type standard_id: str
:param opencti_entity: The OpenCTI entity object
:type opencti_entity: dict
:return: Dictionary containing stix_entity and stix_objects
:rtype: dict
"""
bundle = data.get("bundle", None)
# Extract main entity from bundle in case of playbook
if bundle is None:
Expand Down
Loading