From 47cd48231ab971f14705583c3d174e8ce9893536 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 11 Nov 2025 13:05:17 +0100 Subject: [PATCH 1/2] New project folder structure. --- src/event_gate_lambda.py | 14 +- src/utils/__init__.py | 15 + src/{ => utils}/conf_path.py | 10 +- src/{ => utils}/logging_levels.py | 0 src/{ => utils}/trace_logging.py | 4 +- src/writers/__init__.py | 15 + src/{ => writers}/writer_eventbridge.py | 198 +++--- src/{ => writers}/writer_kafka.py | 316 ++++----- src/{ => writers}/writer_postgres.py | 606 +++++++++--------- tests/__init__.py | 15 + tests/utils/__init__.py | 15 + tests/{ => utils}/test_conf_path.py | 21 +- tests/{ => utils}/test_conf_validation.py | 2 +- tests/{ => utils}/test_extract_token.py | 0 tests/{ => utils}/test_safe_serialization.py | 2 - tests/{ => utils}/test_trace_logging.py | 8 +- tests/writers/__init__.py | 15 + .../{ => writers}/test_writer_eventbridge.py | 4 +- tests/{ => writers}/test_writer_kafka.py | 2 +- tests/{ => writers}/test_writer_postgres.py | 3 +- 20 files changed, 670 insertions(+), 595 deletions(-) create mode 100644 src/utils/__init__.py rename src/{ => utils}/conf_path.py (78%) rename src/{ => utils}/logging_levels.py (100%) rename src/{ => utils}/trace_logging.py (94%) create mode 100644 src/writers/__init__.py rename src/{ => writers}/writer_eventbridge.py (95%) rename src/{ => writers}/writer_kafka.py (96%) rename src/{ => writers}/writer_postgres.py (96%) create mode 100644 tests/__init__.py create mode 100644 tests/utils/__init__.py rename tests/{ => utils}/test_conf_path.py (88%) rename tests/{ => utils}/test_conf_validation.py (99%) rename tests/{ => utils}/test_extract_token.py (100%) rename tests/{ => utils}/test_safe_serialization.py (99%) rename tests/{ => utils}/test_trace_logging.py (93%) create mode 100644 tests/writers/__init__.py rename tests/{ => writers}/test_writer_eventbridge.py (95%) rename tests/{ => writers}/test_writer_kafka.py (99%) rename tests/{ => writers}/test_writer_postgres.py (99%) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 6f19ef5..163e119 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -33,19 +33,17 @@ # Import writer modules with explicit ImportError fallback try: - from . import writer_eventbridge - from . import writer_kafka - from . import writer_postgres + from .writers import writer_eventbridge, writer_kafka, writer_postgres except ImportError: # fallback when executed outside package context - import writer_eventbridge # type: ignore[no-redef] - import writer_kafka # type: ignore[no-redef] - import writer_postgres # type: ignore[no-redef] + import src.writers.writer_eventbridge # type: ignore[no-redef] + import src.writers.writer_kafka # type: ignore[no-redef] + import src.writers.writer_postgres # type: ignore[no-redef] # Import configuration directory symbols with explicit ImportError fallback try: - from .conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] + from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] except ImportError: # fallback when executed outside package context - from conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] + from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] # Internal aliases used by rest of module _CONF_DIR = CONF_DIR diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/src/conf_path.py b/src/utils/conf_path.py similarity index 78% rename from src/conf_path.py rename to src/utils/conf_path.py index e1c37fc..0998987 100644 --- a/src/conf_path.py +++ b/src/utils/conf_path.py @@ -17,7 +17,7 @@ """Module providing reusable configuration directory resolution. Resolution order: 1. CONF_DIR env var if it exists and points to a directory -2. /conf (project_root = parent of this file's directory) +2. /conf 3. /conf (flattened deployment) 4. Fallback to /conf even if missing (subsequent file operations will raise) """ @@ -34,9 +34,12 @@ def resolve_conf_dir(env_var: str = "CONF_DIR"): Tuple (conf_dir, invalid_env) where conf_dir is the chosen directory path and invalid_env is the rejected env var path if provided but invalid, else None. """ - project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + # Simplified project root: two levels up from this file (../../) + parent_utils_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + project_root = os.path.abspath(os.path.join(parent_utils_dir, "..")) current_dir = os.path.dirname(__file__) + # Scenario 1: Environment variable if it exists and points to a directory env_conf = os.environ.get(env_var) invalid_env = None conf_dir = None @@ -48,16 +51,19 @@ def resolve_conf_dir(env_var: str = "CONF_DIR"): else: invalid_env = candidate + # Scenario 2: Use /conf if present and not already satisfied by env var if conf_dir is None: parent_conf = os.path.join(project_root, "conf") if os.path.isdir(parent_conf): conf_dir = parent_conf + # Scenario 3: Use /conf for flattened deployments. if conf_dir is None: current_conf = os.path.join(current_dir, "conf") if os.path.isdir(current_conf): conf_dir = current_conf + # Scenario 4: Final fallback to /conf even if it does not exist. if conf_dir is None: conf_dir = os.path.join(project_root, "conf") diff --git a/src/logging_levels.py b/src/utils/logging_levels.py similarity index 100% rename from src/logging_levels.py rename to src/utils/logging_levels.py diff --git a/src/trace_logging.py b/src/utils/trace_logging.py similarity index 94% rename from src/trace_logging.py rename to src/utils/trace_logging.py index 4fd2a1e..9684734 100644 --- a/src/trace_logging.py +++ b/src/utils/trace_logging.py @@ -22,8 +22,8 @@ import logging from typing import Any, Dict -from .logging_levels import TRACE_LEVEL -from .safe_serialization import safe_serialize_for_log +from src.utils.logging_levels import TRACE_LEVEL +from src.safe_serialization import safe_serialize_for_log def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None: diff --git a/src/writers/__init__.py b/src/writers/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/src/writers/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/src/writer_eventbridge.py b/src/writers/writer_eventbridge.py similarity index 95% rename from src/writer_eventbridge.py rename to src/writers/writer_eventbridge.py index 66da65c..37e28a8 100644 --- a/src/writer_eventbridge.py +++ b/src/writers/writer_eventbridge.py @@ -1,99 +1,99 @@ -# -# Copyright 2025 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""EventBridge writer module. - -Provides initialization and write functionality for publishing events to AWS EventBridge. -""" - -import json -import logging -from typing import Any, Dict, Optional, Tuple, List - -import boto3 -from botocore.exceptions import BotoCoreError, ClientError - -from .trace_logging import log_payload_at_trace - -STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} - - -def init(logger: logging.Logger, config: Dict[str, Any]) -> None: - """Initialize the EventBridge writer. - - Args: - logger: Shared application logger. - config: Configuration dictionary (expects optional 'event_bus_arn'). - """ - STATE["logger"] = logger - STATE["client"] = boto3.client("events") - STATE["event_bus_arn"] = config.get("event_bus_arn", "") - STATE["logger"].debug("Initialized EVENTBRIDGE writer") - - -def _format_failed_entries(entries: List[Dict[str, Any]]) -> str: - failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e] - # Keep message concise but informative - return json.dumps(failed) if failed else "[]" - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Publish a message to EventBridge. - - Args: - topic_name: Source topic name used as event Source. - message: JSON-serializable payload. - Returns: - Tuple of success flag and optional error message. - """ - logger = STATE["logger"] - event_bus_arn = STATE["event_bus_arn"] - client = STATE["client"] - - if not event_bus_arn: - logger.debug("No EventBus Arn - skipping") - return True, None - if client is None: # defensive - logger.debug("EventBridge client not initialized - skipping") - return True, None - - log_payload_at_trace(logger, "EventBridge", topic_name, message) - - try: - logger.debug("Sending to eventBridge %s", topic_name) - response = client.put_events( - Entries=[ - { - "Source": topic_name, - "DetailType": "JSON", - "Detail": json.dumps(message), - "EventBusName": event_bus_arn, - } - ] - ) - failed_count = response.get("FailedEntryCount", 0) - if failed_count > 0: - entries = response.get("Entries", []) - failed_repr = _format_failed_entries(entries) - msg = f"{failed_count} EventBridge entries failed: {failed_repr}" - logger.error(msg) - return False, msg - except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors - logger.exception("EventBridge put_events call failed") - return False, str(err) - - # Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400) - return True, None +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""EventBridge writer module. + +Provides initialization and write functionality for publishing events to AWS EventBridge. +""" + +import json +import logging +from typing import Any, Dict, Optional, Tuple, List + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +from src.utils.trace_logging import log_payload_at_trace + +STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} + + +def init(logger: logging.Logger, config: Dict[str, Any]) -> None: + """Initialize the EventBridge writer. + + Args: + logger: Shared application logger. + config: Configuration dictionary (expects optional 'event_bus_arn'). + """ + STATE["logger"] = logger + STATE["client"] = boto3.client("events") + STATE["event_bus_arn"] = config.get("event_bus_arn", "") + STATE["logger"].debug("Initialized EVENTBRIDGE writer") + + +def _format_failed_entries(entries: List[Dict[str, Any]]) -> str: + failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e] + # Keep message concise but informative + return json.dumps(failed) if failed else "[]" + + +def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """Publish a message to EventBridge. + + Args: + topic_name: Source topic name used as event Source. + message: JSON-serializable payload. + Returns: + Tuple of success flag and optional error message. + """ + logger = STATE["logger"] + event_bus_arn = STATE["event_bus_arn"] + client = STATE["client"] + + if not event_bus_arn: + logger.debug("No EventBus Arn - skipping") + return True, None + if client is None: # defensive + logger.debug("EventBridge client not initialized - skipping") + return True, None + + log_payload_at_trace(logger, "EventBridge", topic_name, message) + + try: + logger.debug("Sending to eventBridge %s", topic_name) + response = client.put_events( + Entries=[ + { + "Source": topic_name, + "DetailType": "JSON", + "Detail": json.dumps(message), + "EventBusName": event_bus_arn, + } + ] + ) + failed_count = response.get("FailedEntryCount", 0) + if failed_count > 0: + entries = response.get("Entries", []) + failed_repr = _format_failed_entries(entries) + msg = f"{failed_count} EventBridge entries failed: {failed_repr}" + logger.error(msg) + return False, msg + except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors + logger.exception("EventBridge put_events call failed") + return False, str(err) + + # Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400) + return True, None diff --git a/src/writer_kafka.py b/src/writers/writer_kafka.py similarity index 96% rename from src/writer_kafka.py rename to src/writers/writer_kafka.py index 1b4b526..ad85935 100644 --- a/src/writer_kafka.py +++ b/src/writers/writer_kafka.py @@ -1,158 +1,158 @@ -# -# Copyright 2025 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Kafka writer module. - -Initializes a Confluent Kafka Producer and publishes messages for a topic. -""" - -import json -import logging -import os -import time -from typing import Any, Dict, Optional, Tuple -from confluent_kafka import Producer - -from .trace_logging import log_payload_at_trace - -try: # KafkaException may not exist in stubbed test module - from confluent_kafka import KafkaException # type: ignore -except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub - - class KafkaException(Exception): # type: ignore - """Fallback KafkaException if confluent_kafka is not installed.""" - - -STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None} -# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely -_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) -_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) -_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) - - -def init(logger: logging.Logger, config: Dict[str, Any]) -> None: - """Initialize Kafka producer. - - Args: - logger: Shared application logger. - config: Configuration dictionary (expects 'kafka_bootstrap_server' plus optional SASL/SSL fields). - Raises: - ValueError: if required 'kafka_bootstrap_server' is missing or empty. - """ - STATE["logger"] = logger - - if "kafka_bootstrap_server" not in config or not config.get("kafka_bootstrap_server"): - raise ValueError("Missing required config: kafka_bootstrap_server") - bootstrap = config["kafka_bootstrap_server"] - - producer_config: Dict[str, Any] = {"bootstrap.servers": bootstrap} - if "kafka_sasl_kerberos_principal" in config and "kafka_ssl_key_path" in config: - producer_config.update( - { - "security.protocol": "SASL_SSL", - "sasl.mechanism": "GSSAPI", - "sasl.kerberos.service.name": "kafka", - "sasl.kerberos.keytab": config["kafka_sasl_kerberos_keytab_path"], - "sasl.kerberos.principal": config["kafka_sasl_kerberos_principal"], - "ssl.ca.location": config["kafka_ssl_ca_path"], - "ssl.certificate.location": config["kafka_ssl_cert_path"], - "ssl.key.location": config["kafka_ssl_key_path"], - "ssl.key.password": config["kafka_ssl_key_password"], - } - ) - STATE["logger"].debug("Kafka producer will use SASL_SSL") - - STATE["producer"] = Producer(producer_config) - STATE["logger"].debug("Initialized KAFKA writer") - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Publish a message to Kafka. - - Args: - topic_name: Kafka topic to publish to. - message: JSON-serializable payload. - Returns: - Tuple[success flag, optional error message]. - """ - logger = STATE["logger"] - producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment] - if producer is None: - logger.debug("Kafka producer not initialized - skipping") - return True, None - - log_payload_at_trace(logger, "Kafka", topic_name, message) - - errors: list[str] = [] - has_exception = False - - # Produce step - try: - logger.debug("Sending to kafka %s", topic_name) - producer.produce( - topic_name, - key="", - value=json.dumps(message).encode("utf-8"), - callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), - ) - except KafkaException as e: - errors.append(f"Produce exception: {e}") - has_exception = True - - # Flush step (always attempted) - remaining: Optional[int] = None - for attempt in range(1, _MAX_RETRIES + 1): - try: - remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) - except KafkaException as e: - errors.append(f"Flush exception: {e}") - has_exception = True - - # Treat None (flush returns None in some stubs) as success equivalent to 0 pending - if (remaining is None or remaining == 0) and not errors: - break - if attempt < _MAX_RETRIES: - logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES) - time.sleep(_RETRY_BACKOFF_SEC) - - # Warn if messages still pending after retries - if isinstance(remaining, int) and remaining > 0: - logger.warning( - "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining - ) - - if errors: - failure_text = "Kafka writer failed: " + "; ".join(errors) - (logger.exception if has_exception else logger.error)(failure_text) - return False, failure_text - - return True, None - - -def flush_with_timeout(producer, timeout: float) -> Optional[int]: - """Flush the Kafka producer with a timeout, handling TypeError for stubs. - - Args: - producer: Kafka Producer instance. - timeout: Timeout in seconds. - Returns: - Number of messages still pending after the flush call (0 all messages delivered). - None is returned only if the underlying (stub/mock) producer.flush() does not provide a count. - """ - try: - return producer.flush(timeout) - except TypeError: # Fallback for stub producers without timeout parameter - return producer.flush() +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Kafka writer module. + +Initializes a Confluent Kafka Producer and publishes messages for a topic. +""" + +import json +import logging +import os +import time +from typing import Any, Dict, Optional, Tuple +from confluent_kafka import Producer + +from src.utils.trace_logging import log_payload_at_trace + +try: # KafkaException may not exist in stubbed test module + from confluent_kafka import KafkaException # type: ignore +except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub + + class KafkaException(Exception): # type: ignore + """Fallback KafkaException if confluent_kafka is not installed.""" + + +STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None} +# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely +_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) +_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) +_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) + + +def init(logger: logging.Logger, config: Dict[str, Any]) -> None: + """Initialize Kafka producer. + + Args: + logger: Shared application logger. + config: Configuration dictionary (expects 'kafka_bootstrap_server' plus optional SASL/SSL fields). + Raises: + ValueError: if required 'kafka_bootstrap_server' is missing or empty. + """ + STATE["logger"] = logger + + if "kafka_bootstrap_server" not in config or not config.get("kafka_bootstrap_server"): + raise ValueError("Missing required config: kafka_bootstrap_server") + bootstrap = config["kafka_bootstrap_server"] + + producer_config: Dict[str, Any] = {"bootstrap.servers": bootstrap} + if "kafka_sasl_kerberos_principal" in config and "kafka_ssl_key_path" in config: + producer_config.update( + { + "security.protocol": "SASL_SSL", + "sasl.mechanism": "GSSAPI", + "sasl.kerberos.service.name": "kafka", + "sasl.kerberos.keytab": config["kafka_sasl_kerberos_keytab_path"], + "sasl.kerberos.principal": config["kafka_sasl_kerberos_principal"], + "ssl.ca.location": config["kafka_ssl_ca_path"], + "ssl.certificate.location": config["kafka_ssl_cert_path"], + "ssl.key.location": config["kafka_ssl_key_path"], + "ssl.key.password": config["kafka_ssl_key_password"], + } + ) + STATE["logger"].debug("Kafka producer will use SASL_SSL") + + STATE["producer"] = Producer(producer_config) + STATE["logger"].debug("Initialized KAFKA writer") + + +def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """Publish a message to Kafka. + + Args: + topic_name: Kafka topic to publish to. + message: JSON-serializable payload. + Returns: + Tuple[success flag, optional error message]. + """ + logger = STATE["logger"] + producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment] + if producer is None: + logger.debug("Kafka producer not initialized - skipping") + return True, None + + log_payload_at_trace(logger, "Kafka", topic_name, message) + + errors: list[str] = [] + has_exception = False + + # Produce step + try: + logger.debug("Sending to kafka %s", topic_name) + producer.produce( + topic_name, + key="", + value=json.dumps(message).encode("utf-8"), + callback=lambda err, msg: (errors.append(str(err)) if err is not None else None), + ) + except KafkaException as e: + errors.append(f"Produce exception: {e}") + has_exception = True + + # Flush step (always attempted) + remaining: Optional[int] = None + for attempt in range(1, _MAX_RETRIES + 1): + try: + remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) + except KafkaException as e: + errors.append(f"Flush exception: {e}") + has_exception = True + + # Treat None (flush returns None in some stubs) as success equivalent to 0 pending + if (remaining is None or remaining == 0) and not errors: + break + if attempt < _MAX_RETRIES: + logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES) + time.sleep(_RETRY_BACKOFF_SEC) + + # Warn if messages still pending after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) + + if errors: + failure_text = "Kafka writer failed: " + "; ".join(errors) + (logger.exception if has_exception else logger.error)(failure_text) + return False, failure_text + + return True, None + + +def flush_with_timeout(producer, timeout: float) -> Optional[int]: + """Flush the Kafka producer with a timeout, handling TypeError for stubs. + + Args: + producer: Kafka Producer instance. + timeout: Timeout in seconds. + Returns: + Number of messages still pending after the flush call (0 all messages delivered). + None is returned only if the underlying (stub/mock) producer.flush() does not provide a count. + """ + try: + return producer.flush(timeout) + except TypeError: # Fallback for stub producers without timeout parameter + return producer.flush() diff --git a/src/writer_postgres.py b/src/writers/writer_postgres.py similarity index 96% rename from src/writer_postgres.py rename to src/writers/writer_postgres.py index 6f8d603..ff46e34 100644 --- a/src/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -1,303 +1,303 @@ -# -# Copyright 2025 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Postgres writer module. - -Handles optional initialization via AWS Secrets Manager and topic-based inserts into Postgres. -""" - -import json -import os -import logging -from typing import Any, Dict, Tuple, Optional - -import boto3 - -try: - import psycopg2 # noqa: F401 -except ImportError: # pragma: no cover - environment without psycopg2 - psycopg2 = None # type: ignore - -from .trace_logging import log_payload_at_trace - -# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing -if psycopg2 is not None: # type: ignore - try: # pragma: no cover - attribute presence depends on installed psycopg2 variant - PsycopgError = psycopg2.Error # type: ignore[attr-defined] - except AttributeError: # pragma: no cover - - class PsycopgError(Exception): # type: ignore - """Shim psycopg2 error base when psycopg2 provides no Error attribute.""" - -else: # fallback shim when psycopg2 absent - - class PsycopgError(Exception): # type: ignore - """Shim psycopg2 error base when psycopg2 is not installed.""" - - -# Module level globals for typing -logger: logging.Logger = logging.getLogger(__name__) -POSTGRES: Dict[str, Any] = {"database": ""} - - -def init(logger_instance: logging.Logger) -> None: - """Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config. - - Args: - logger_instance: Shared application logger. - """ - global logger # pylint: disable=global-statement - global POSTGRES # pylint: disable=global-statement - - logger = logger_instance - - secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") - secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") - - if secret_name and secret_region: - aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region) - postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] - POSTGRES = json.loads(postgres_secret) - else: - POSTGRES = {"database": ""} - - logger.debug("Initialized POSTGRES writer") - - -def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: - """Insert a dlchange style event row. - - Args: - cursor: Database cursor. - table: Target table name. - message: Event payload. - """ - logger.debug("Sending to Postgres - %s", table) - cursor.execute( - f""" - INSERT INTO {table} - ( - event_id, - tenant_id, - source_app, - source_app_version, - environment, - timestamp_event, - catalog_id, - operation, - "location", - "format", - format_options, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_event"], - message["catalog_id"], - message["operation"], - message.get("location"), - message["format"], - (json.dumps(message.get("format_options")) if "format_options" in message else None), - (json.dumps(message.get("additional_info")) if "additional_info" in message else None), - ), - ) - - -def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None: - """Insert a run event row plus related job rows. - - Args: - cursor: Database cursor. - table_runs: Runs table name. - table_jobs: Jobs table name. - message: Event payload (includes jobs array). - """ - logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) - cursor.execute( - f""" - INSERT INTO {table_runs} - ( - event_id, - job_ref, - tenant_id, - source_app, - source_app_version, - environment, - timestamp_start, - timestamp_end - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["job_ref"], - message["tenant_id"], - message["source_app"], - message["source_app_version"], - message["environment"], - message["timestamp_start"], - message["timestamp_end"], - ), - ) - - for job in message["jobs"]: - cursor.execute( - f""" - INSERT INTO {table_jobs} - ( - event_id, - catalog_id, - status, - timestamp_start, - timestamp_end, - message, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - job["catalog_id"], - job["status"], - job["timestamp_start"], - job["timestamp_end"], - job.get("message"), - (json.dumps(job.get("additional_info")) if "additional_info" in job else None), - ), - ) - - -def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: - """Insert a test topic row. - - Args: - cursor: Database cursor. - table: Target table name. - message: Event payload. - """ - logger.debug("Sending to Postgres - %s", table) - cursor.execute( - f""" - INSERT INTO {table} - ( - event_id, - tenant_id, - source_app, - environment, - timestamp_event, - additional_info - ) - VALUES - ( - %s, - %s, - %s, - %s, - %s, - %s - )""", - ( - message["event_id"], - message["tenant_id"], - message["source_app"], - message["environment"], - message["timestamp"], - (json.dumps(message.get("additional_info")) if "additional_info" in message else None), - ), - ) - - -def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """Dispatch insertion for a topic into the correct Postgres table(s). - - Skips if Postgres not configured or psycopg2 unavailable. Returns success flag and optional error. - - Args: - topic_name: Incoming topic identifier. - message: Event payload. - """ - try: - if not POSTGRES.get("database"): - logger.debug("No Postgres - skipping") - return True, None - if psycopg2 is None: # type: ignore - logger.debug("psycopg2 not available - skipping actual Postgres write") - return True, None - - log_payload_at_trace(logger, "Postgres", topic_name, message) - - with psycopg2.connect( # type: ignore[attr-defined] - database=POSTGRES["database"], - host=POSTGRES["host"], - user=POSTGRES["user"], - password=POSTGRES["password"], - port=POSTGRES["port"], - ) as connection: # type: ignore[call-arg] - with connection.cursor() as cursor: # type: ignore - if topic_name == "public.cps.za.dlchange": - postgres_edla_write(cursor, "public_cps_za_dlchange", message) - elif topic_name == "public.cps.za.runs": - postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) - elif topic_name == "public.cps.za.test": - postgres_test_write(cursor, "public_cps_za_test", message) - else: - msg = f"unknown topic for postgres {topic_name}" - logger.error(msg) - return False, msg - - connection.commit() # type: ignore - except (RuntimeError, PsycopgError) as e: # narrowed exception set - err_msg = f"The Postgres writer with failed unknown error: {str(e)}" - logger.exception(err_msg) - return False, err_msg - - return True, None +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Postgres writer module. + +Handles optional initialization via AWS Secrets Manager and topic-based inserts into Postgres. +""" + +import json +import os +import logging +from typing import Any, Dict, Tuple, Optional + +import boto3 + +try: + import psycopg2 # noqa: F401 +except ImportError: # pragma: no cover - environment without psycopg2 + psycopg2 = None # type: ignore + +from src.utils.trace_logging import log_payload_at_trace + +# Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing +if psycopg2 is not None: # type: ignore + try: # pragma: no cover - attribute presence depends on installed psycopg2 variant + PsycopgError = psycopg2.Error # type: ignore[attr-defined] + except AttributeError: # pragma: no cover + + class PsycopgError(Exception): # type: ignore + """Shim psycopg2 error base when psycopg2 provides no Error attribute.""" + +else: # fallback shim when psycopg2 absent + + class PsycopgError(Exception): # type: ignore + """Shim psycopg2 error base when psycopg2 is not installed.""" + + +# Module level globals for typing +logger: logging.Logger = logging.getLogger(__name__) +POSTGRES: Dict[str, Any] = {"database": ""} + + +def init(logger_instance: logging.Logger) -> None: + """Initialize Postgres credentials either from AWS Secrets Manager or fallback empty config. + + Args: + logger_instance: Shared application logger. + """ + global logger # pylint: disable=global-statement + global POSTGRES # pylint: disable=global-statement + + logger = logger_instance + + secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") + secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") + + if secret_name and secret_region: + aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region) + postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"] + POSTGRES = json.loads(postgres_secret) + else: + POSTGRES = {"database": ""} + + logger.debug("Initialized POSTGRES writer") + + +def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: + """Insert a dlchange style event row. + + Args: + cursor: Database cursor. + table: Target table name. + message: Event payload. + """ + logger.debug("Sending to Postgres - %s", table) + cursor.execute( + f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_event, + catalog_id, + operation, + "location", + "format", + format_options, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_event"], + message["catalog_id"], + message["operation"], + message.get("location"), + message["format"], + (json.dumps(message.get("format_options")) if "format_options" in message else None), + (json.dumps(message.get("additional_info")) if "additional_info" in message else None), + ), + ) + + +def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None: + """Insert a run event row plus related job rows. + + Args: + cursor: Database cursor. + table_runs: Runs table name. + table_jobs: Jobs table name. + message: Event payload (includes jobs array). + """ + logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) + cursor.execute( + f""" + INSERT INTO {table_runs} + ( + event_id, + job_ref, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_start, + timestamp_end + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + message["job_ref"], + message["tenant_id"], + message["source_app"], + message["source_app_version"], + message["environment"], + message["timestamp_start"], + message["timestamp_end"], + ), + ) + + for job in message["jobs"]: + cursor.execute( + f""" + INSERT INTO {table_jobs} + ( + event_id, + catalog_id, + status, + timestamp_start, + timestamp_end, + message, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + job["catalog_id"], + job["status"], + job["timestamp_start"], + job["timestamp_end"], + job.get("message"), + (json.dumps(job.get("additional_info")) if "additional_info" in job else None), + ), + ) + + +def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: + """Insert a test topic row. + + Args: + cursor: Database cursor. + table: Target table name. + message: Event payload. + """ + logger.debug("Sending to Postgres - %s", table) + cursor.execute( + f""" + INSERT INTO {table} + ( + event_id, + tenant_id, + source_app, + environment, + timestamp_event, + additional_info + ) + VALUES + ( + %s, + %s, + %s, + %s, + %s, + %s + )""", + ( + message["event_id"], + message["tenant_id"], + message["source_app"], + message["environment"], + message["timestamp"], + (json.dumps(message.get("additional_info")) if "additional_info" in message else None), + ), + ) + + +def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """Dispatch insertion for a topic into the correct Postgres table(s). + + Skips if Postgres not configured or psycopg2 unavailable. Returns success flag and optional error. + + Args: + topic_name: Incoming topic identifier. + message: Event payload. + """ + try: + if not POSTGRES.get("database"): + logger.debug("No Postgres - skipping") + return True, None + if psycopg2 is None: # type: ignore + logger.debug("psycopg2 not available - skipping actual Postgres write") + return True, None + + log_payload_at_trace(logger, "Postgres", topic_name, message) + + with psycopg2.connect( # type: ignore[attr-defined] + database=POSTGRES["database"], + host=POSTGRES["host"], + user=POSTGRES["user"], + password=POSTGRES["password"], + port=POSTGRES["port"], + ) as connection: # type: ignore[call-arg] + with connection.cursor() as cursor: # type: ignore + if topic_name == "public.cps.za.dlchange": + postgres_edla_write(cursor, "public_cps_za_dlchange", message) + elif topic_name == "public.cps.za.runs": + postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message) + elif topic_name == "public.cps.za.test": + postgres_test_write(cursor, "public_cps_za_test", message) + else: + msg = f"unknown topic for postgres {topic_name}" + logger.error(msg) + return False, msg + + connection.commit() # type: ignore + except (RuntimeError, PsycopgError) as e: # narrowed exception set + err_msg = f"The Postgres writer with failed unknown error: {str(e)}" + logger.exception(err_msg) + return False, err_msg + + return True, None diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/tests/utils/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/test_conf_path.py b/tests/utils/test_conf_path.py similarity index 88% rename from tests/test_conf_path.py rename to tests/utils/test_conf_path.py index ff97e3b..77ee12d 100644 --- a/tests/test_conf_path.py +++ b/tests/utils/test_conf_path.py @@ -4,9 +4,7 @@ import tempfile from pathlib import Path -import pytest - -import src.conf_path as conf_path_module +import src.utils.conf_path as conf_path_module def test_env_var_valid_directory(monkeypatch): @@ -23,8 +21,9 @@ def test_env_var_invalid_directory_falls_back_parent(monkeypatch): missing_path = "/nonexistent/path/xyz_does_not_exist" monkeypatch.setenv("CONF_DIR", missing_path) conf_dir, invalid = conf_path_module.resolve_conf_dir() - # Should fall back to repository conf directory - assert conf_dir.endswith(os.path.join("EventGate", "conf")) + # Should fall back to repository conf directory /conf + expected_root_conf = (Path(conf_path_module.__file__).resolve().parent.parent.parent / "conf").resolve() + assert Path(conf_dir).resolve() == expected_root_conf assert invalid == os.path.abspath(missing_path) @@ -34,7 +33,6 @@ def _load_isolated_conf_path(structure_builder): structure_builder receives base temp directory and returns path to module directory containing conf_path.py and the code to write (copied from original). """ - import inspect code = Path(conf_path_module.__file__).read_text(encoding="utf-8") tmp = tempfile.TemporaryDirectory() @@ -82,8 +80,8 @@ def build(base: Path, code: str): try: conf_dir, invalid = mod.resolve_conf_dir() # Parent conf path returned even though it does not exist - expected_parent_conf = (Path(mod.__file__).parent.parent / "conf").resolve() - assert Path(conf_dir).resolve() == expected_parent_conf + expected_root_conf = (Path(mod.__file__).resolve().parent.parent.parent / "conf").resolve() + assert Path(conf_dir).resolve() == expected_root_conf assert invalid is None finally: mod._tmp.cleanup() # type: ignore[attr-defined] @@ -124,8 +122,8 @@ def build(base: Path, code: str): bad_path = "/also/not/there/xyz987" monkeypatch.setenv("CONF_DIR", bad_path) conf_dir, invalid = mod.resolve_conf_dir() - expected_parent_conf = (Path(mod.__file__).parent.parent / "conf").resolve() - assert Path(conf_dir).resolve() == expected_parent_conf + expected_root_conf = (Path(mod.__file__).resolve().parent.parent.parent / "conf").resolve() + assert Path(conf_dir).resolve() == expected_root_conf assert invalid == os.path.abspath(bad_path) finally: mod._tmp.cleanup() # type: ignore[attr-defined] @@ -155,5 +153,6 @@ def test_module_level_constants_env_invalid(monkeypatch): sys.modules[spec.name] = mod spec.loader.exec_module(mod) # type: ignore[attr-defined] # Module constant should fall back to repository conf directory - assert mod.CONF_DIR.endswith(os.path.join("EventGate", "conf")) # type: ignore[attr-defined] + expected_root_conf = (Path(mod.__file__).resolve().parent.parent.parent / "conf").resolve() + assert Path(mod.CONF_DIR).resolve() == expected_root_conf # type: ignore[attr-defined] assert mod.INVALID_CONF_ENV == os.path.abspath(bad_path) # type: ignore[attr-defined] diff --git a/tests/test_conf_validation.py b/tests/utils/test_conf_validation.py similarity index 99% rename from tests/test_conf_validation.py rename to tests/utils/test_conf_validation.py index 01db885..7ed62ec 100644 --- a/tests/test_conf_validation.py +++ b/tests/utils/test_conf_validation.py @@ -19,7 +19,7 @@ from glob import glob import pytest -CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "conf") +CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "..", "conf") REQUIRED_CONFIG_KEYS = { "access_config", diff --git a/tests/test_extract_token.py b/tests/utils/test_extract_token.py similarity index 100% rename from tests/test_extract_token.py rename to tests/utils/test_extract_token.py diff --git a/tests/test_safe_serialization.py b/tests/utils/test_safe_serialization.py similarity index 99% rename from tests/test_safe_serialization.py rename to tests/utils/test_safe_serialization.py index 638bc1b..bb34c5c 100644 --- a/tests/test_safe_serialization.py +++ b/tests/utils/test_safe_serialization.py @@ -20,8 +20,6 @@ import os from unittest.mock import patch -import pytest - from src.safe_serialization import safe_serialize_for_log diff --git a/tests/test_trace_logging.py b/tests/utils/test_trace_logging.py similarity index 93% rename from tests/test_trace_logging.py rename to tests/utils/test_trace_logging.py index 751c35d..a5cce96 100644 --- a/tests/test_trace_logging.py +++ b/tests/utils/test_trace_logging.py @@ -1,10 +1,10 @@ import logging from unittest.mock import MagicMock -from src.logging_levels import TRACE_LEVEL -import src.writer_eventbridge as we -import src.writer_kafka as wk -import src.writer_postgres as wp +from src.utils.logging_levels import TRACE_LEVEL +import src.writers.writer_eventbridge as we +import src.writers.writer_kafka as wk +import src.writers.writer_postgres as wp def test_trace_eventbridge(caplog): diff --git a/tests/writers/__init__.py b/tests/writers/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/tests/writers/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/test_writer_eventbridge.py b/tests/writers/test_writer_eventbridge.py similarity index 95% rename from tests/test_writer_eventbridge.py rename to tests/writers/test_writer_eventbridge.py index 0d67ea9..737b4af 100644 --- a/tests/test_writer_eventbridge.py +++ b/tests/writers/test_writer_eventbridge.py @@ -1,7 +1,7 @@ import logging -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock -import src.writer_eventbridge as we +import src.writers.writer_eventbridge as we def test_write_skips_when_no_event_bus(): diff --git a/tests/test_writer_kafka.py b/tests/writers/test_writer_kafka.py similarity index 99% rename from tests/test_writer_kafka.py rename to tests/writers/test_writer_kafka.py index bb880bb..7cad07b 100644 --- a/tests/test_writer_kafka.py +++ b/tests/writers/test_writer_kafka.py @@ -1,6 +1,6 @@ import logging from types import SimpleNamespace -import src.writer_kafka as wk +import src.writers.writer_kafka as wk class FakeProducerSuccess: diff --git a/tests/test_writer_postgres.py b/tests/writers/test_writer_postgres.py similarity index 99% rename from tests/test_writer_postgres.py rename to tests/writers/test_writer_postgres.py index f7329d7..678b664 100644 --- a/tests/test_writer_postgres.py +++ b/tests/writers/test_writer_postgres.py @@ -18,9 +18,8 @@ import os import types import pytest -from unittest.mock import patch -from src import writer_postgres +from src.writers import writer_postgres @pytest.fixture(scope="module", autouse=True) From 1acf137d8219a68e081591e77fb07e43720837ac Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 11 Nov 2025 13:23:15 +0100 Subject: [PATCH 2/2] Pylint project fixes. --- src/event_gate_lambda.py | 15 +--- src/{ => utils}/safe_serialization.py | 0 src/utils/trace_logging.py | 2 +- src/writers/writer_postgres.py | 94 +++++++++++++------------- tests/utils/test_safe_serialization.py | 4 +- 5 files changed, 52 insertions(+), 63 deletions(-) rename src/{ => utils}/safe_serialization.py (100%) diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 163e119..34dcb81 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -31,19 +31,8 @@ from jsonschema import validate from jsonschema.exceptions import ValidationError -# Import writer modules with explicit ImportError fallback -try: - from .writers import writer_eventbridge, writer_kafka, writer_postgres -except ImportError: # fallback when executed outside package context - import src.writers.writer_eventbridge # type: ignore[no-redef] - import src.writers.writer_kafka # type: ignore[no-redef] - import src.writers.writer_postgres # type: ignore[no-redef] - -# Import configuration directory symbols with explicit ImportError fallback -try: - from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] -except ImportError: # fallback when executed outside package context - from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] +from src.writers import writer_eventbridge, writer_kafka, writer_postgres +from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV # Internal aliases used by rest of module _CONF_DIR = CONF_DIR diff --git a/src/safe_serialization.py b/src/utils/safe_serialization.py similarity index 100% rename from src/safe_serialization.py rename to src/utils/safe_serialization.py diff --git a/src/utils/trace_logging.py b/src/utils/trace_logging.py index 9684734..2117600 100644 --- a/src/utils/trace_logging.py +++ b/src/utils/trace_logging.py @@ -23,7 +23,7 @@ from typing import Any, Dict from src.utils.logging_levels import TRACE_LEVEL -from src.safe_serialization import safe_serialize_for_log +from src.utils.safe_serialization import safe_serialize_for_log def log_payload_at_trace(logger: logging.Logger, writer_name: str, topic_name: str, message: Dict[str, Any]) -> None: diff --git a/src/writers/writer_postgres.py b/src/writers/writer_postgres.py index ff46e34..ecce377 100644 --- a/src/writers/writer_postgres.py +++ b/src/writers/writer_postgres.py @@ -88,34 +88,34 @@ def postgres_edla_write(cursor, table: str, message: Dict[str, Any]) -> None: logger.debug("Sending to Postgres - %s", table) cursor.execute( f""" - INSERT INTO {table} + INSERT INTO {table} ( - event_id, - tenant_id, - source_app, - source_app_version, - environment, - timestamp_event, - catalog_id, - operation, - "location", - "format", - format_options, + event_id, + tenant_id, + source_app, + source_app_version, + environment, + timestamp_event, + catalog_id, + operation, + "location", + "format", + format_options, additional_info - ) + ) VALUES ( - %s, - %s, - %s, - %s, - %s, - %s, - %s, - %s, %s, - %s, - %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, + %s, %s )""", ( @@ -147,7 +147,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs) cursor.execute( f""" - INSERT INTO {table_runs} + INSERT INTO {table_runs} ( event_id, job_ref, @@ -157,16 +157,16 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s environment, timestamp_start, timestamp_end - ) + ) VALUES ( - %s, - %s, - %s, - %s, %s, - %s, - %s, + %s, + %s, + %s, + %s, + %s, + %s, %s )""", ( @@ -184,7 +184,7 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s for job in message["jobs"]: cursor.execute( f""" - INSERT INTO {table_jobs} + INSERT INTO {table_jobs} ( event_id, catalog_id, @@ -193,15 +193,15 @@ def postgres_run_write(cursor, table_runs: str, table_jobs: str, message: Dict[s timestamp_end, message, additional_info - ) + ) VALUES ( - %s, - %s, - %s, - %s, - %s, - %s, + %s, + %s, + %s, + %s, + %s, + %s, %s )""", ( @@ -227,22 +227,22 @@ def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None: logger.debug("Sending to Postgres - %s", table) cursor.execute( f""" - INSERT INTO {table} + INSERT INTO {table} ( event_id, tenant_id, - source_app, - environment, - timestamp_event, + source_app, + environment, + timestamp_event, additional_info - ) + ) VALUES ( %s, %s, - %s, - %s, - %s, + %s, + %s, + %s, %s )""", ( diff --git a/tests/utils/test_safe_serialization.py b/tests/utils/test_safe_serialization.py index bb34c5c..f979c3d 100644 --- a/tests/utils/test_safe_serialization.py +++ b/tests/utils/test_safe_serialization.py @@ -20,7 +20,7 @@ import os from unittest.mock import patch -from src.safe_serialization import safe_serialize_for_log +from src.utils.safe_serialization import safe_serialize_for_log class TestSafeSerializeForLog: @@ -151,7 +151,7 @@ def __repr__(self): # Mock json.dumps to raise an exception message = {"obj": "normal"} - with patch("src.safe_serialization.json.dumps", side_effect=TypeError("Cannot serialize")): + with patch("src.utils.safe_serialization.json.dumps", side_effect=TypeError("Cannot serialize")): result = safe_serialize_for_log(message, redact_keys=[], max_bytes=10000) assert result == ""