From ad4d5d74670b4383c7f5eb7a7b0cacdb5be2ebe4 Mon Sep 17 00:00:00 2001 From: Oto Macenauer Date: Wed, 10 Sep 2025 11:22:13 +0200 Subject: [PATCH] Add TRACE logging level for verbose payload logging across event writers --- src/event_gate_lambda.py | 6 +++ src/logging_levels.py | 21 +++++++++ src/writer_eventbridge.py | 13 ++++++ src/writer_kafka.py | 12 ++++++ src/writer_postgres.py | 12 ++++++ tests/test_trace_logging.py | 86 +++++++++++++++++++++++++++++++++++++ 6 files changed, 150 insertions(+) create mode 100644 src/logging_levels.py create mode 100644 tests/test_trace_logging.py diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index 36f1664..e3e08d7 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -43,6 +43,12 @@ except ImportError: # fallback when executed outside package context import writer_eventbridge, writer_kafka, writer_postgres # type: ignore[no-redef] +# Register custom TRACE level before using LOG_LEVEL env var +try: + from .logging_levels import TRACE_LEVEL # noqa: F401 +except Exception: # pragma: no cover - defensive + TRACE_LEVEL = 5 # type: ignore + # Import configuration directory symbols with explicit ImportError fallback try: from .conf_path import CONF_DIR, INVALID_CONF_ENV # type: ignore[no-redef] diff --git a/src/logging_levels.py b/src/logging_levels.py new file mode 100644 index 0000000..84bb903 --- /dev/null +++ b/src/logging_levels.py @@ -0,0 +1,21 @@ +"""Custom logging levels. + +Adds a TRACE level below DEBUG for very verbose payload logging. +""" + +from __future__ import annotations +import logging + +TRACE_LEVEL = 5 + +# Register level name only once (idempotent) +if not hasattr(logging, "TRACE"): + logging.addLevelName(TRACE_LEVEL, "TRACE") + + def trace(self: logging.Logger, message: str, *args, **kws): # type: ignore[override] + if self.isEnabledFor(TRACE_LEVEL): + self._log(TRACE_LEVEL, message, args, **kws) # pylint: disable=protected-access + + logging.Logger.trace = trace # type: ignore[attr-defined] + +__all__ = ["TRACE_LEVEL"] diff --git a/src/writer_eventbridge.py b/src/writer_eventbridge.py index eae6332..9dbe8d8 100644 --- a/src/writer_eventbridge.py +++ b/src/writer_eventbridge.py @@ -26,6 +26,10 @@ import boto3 from botocore.exceptions import BotoCoreError, ClientError +# Ensure TRACE level is registered +from . import logging_levels # noqa: F401 +from .logging_levels import TRACE_LEVEL + STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None} @@ -68,6 +72,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] logger.debug("EventBridge client not initialized - skipping") return True, None + # TRACE-level payload logging + if logger.isEnabledFor(TRACE_LEVEL): + try: + logger.trace( # type: ignore[attr-defined] + "EventBridge payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":")) + ) + except Exception: # pragma: no cover - defensive serialization guard + logger.trace("EventBridge payload topic=%s ", topic_name) # type: ignore[attr-defined] + try: logger.debug("Sending to eventBridge %s", topic_name) response = client.put_events( diff --git a/src/writer_kafka.py b/src/writer_kafka.py index 692aeaa..1ba5057 100644 --- a/src/writer_kafka.py +++ b/src/writer_kafka.py @@ -26,6 +26,9 @@ from confluent_kafka import Producer +# Add TRACE level import +from .logging_levels import TRACE_LEVEL # type: ignore + try: # KafkaException may not exist in stubbed test module from confluent_kafka import KafkaException # type: ignore except (ImportError, ModuleNotFoundError): # pragma: no cover - fallback for test stub @@ -91,6 +94,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] logger.debug("Kafka producer not initialized - skipping") return True, None + # TRACE-level payload logging prior to produce + if logger.isEnabledFor(TRACE_LEVEL): + try: + logger.trace( # type: ignore[attr-defined] + "Kafka payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":")) + ) + except Exception: # pragma: no cover - defensive + logger.trace("Kafka payload topic=%s ", topic_name) # type: ignore[attr-defined] + errors: list[Any] = [] try: logger.debug("Sending to kafka %s", topic_name) diff --git a/src/writer_postgres.py b/src/writer_postgres.py index 28289e8..dbbc984 100644 --- a/src/writer_postgres.py +++ b/src/writer_postgres.py @@ -31,6 +31,9 @@ except ImportError: # pragma: no cover - environment without psycopg2 psycopg2 = None # type: ignore +# Ensure TRACE level is registered +from .logging_levels import TRACE_LEVEL # type: ignore + # Define a unified psycopg2 error base for safe exception handling even if psycopg2 missing if psycopg2 is not None: # type: ignore try: # pragma: no cover - attribute presence depends on installed psycopg2 variant @@ -271,6 +274,15 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str] _logger.debug("psycopg2 not available - skipping actual Postgres write") return True, None + # TRACE-level payload logging (only when we intend to write) + if _logger.isEnabledFor(TRACE_LEVEL): + try: + _logger.trace( # type: ignore[attr-defined] + "Postgres payload topic=%s payload=%s", topic_name, json.dumps(message, separators=(",", ":")) + ) + except Exception: # pragma: no cover - defensive + _logger.trace("Postgres payload topic=%s ", topic_name) # type: ignore[attr-defined] + with psycopg2.connect( # type: ignore[attr-defined] database=POSTGRES["database"], host=POSTGRES["host"], diff --git a/tests/test_trace_logging.py b/tests/test_trace_logging.py new file mode 100644 index 0000000..0ef99ba --- /dev/null +++ b/tests/test_trace_logging.py @@ -0,0 +1,86 @@ +import logging +from unittest.mock import MagicMock + +from src.logging_levels import TRACE_LEVEL +import src.writer_eventbridge as we +import src.writer_kafka as wk +import src.writer_postgres as wp + + +def test_trace_eventbridge(caplog): + logger = logging.getLogger("trace.eventbridge") + logger.setLevel(TRACE_LEVEL) + we.STATE["logger"] = logger + we.STATE["event_bus_arn"] = "arn:aws:events:region:acct:event-bus/test" + mock_client = MagicMock() + mock_client.put_events.return_value = {"FailedEntryCount": 0, "Entries": []} + we.STATE["client"] = mock_client + caplog.set_level(TRACE_LEVEL) + ok, err = we.write("topic.eb", {"k": 1}) + assert ok and err is None + assert any("EventBridge payload" in rec.message for rec in caplog.records) + + +def test_trace_kafka(caplog): + class FakeProducer: + def produce(self, *a, **kw): + cb = kw.get("callback") + if cb: + cb(None, object()) + + def flush(self, *a, **kw): # noqa: D401 + return 0 + + logger = logging.getLogger("trace.kafka") + logger.setLevel(TRACE_LEVEL) + wk.STATE["logger"] = logger + wk.STATE["producer"] = FakeProducer() + caplog.set_level(TRACE_LEVEL) + ok, err = wk.write("topic.kf", {"k": 2}) + assert ok and err is None + assert any("Kafka payload" in rec.message for rec in caplog.records) + + +def test_trace_postgres(caplog, monkeypatch): + # Prepare dummy psycopg2 connection machinery + store = [] + + class DummyCursor: + def execute(self, sql, params): + store.append((sql, params)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyConnection: + def cursor(self): + return DummyCursor() + + def commit(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class DummyPsycopg2: + def connect(self, **kwargs): # noqa: D401 + return DummyConnection() + + monkeypatch.setattr(wp, "psycopg2", DummyPsycopg2()) + + logger = logging.getLogger("trace.postgres") + logger.setLevel(TRACE_LEVEL) + wp._logger = logger # type: ignore + wp.POSTGRES = {"database": "db", "host": "h", "user": "u", "password": "p", "port": 5432} + + caplog.set_level(TRACE_LEVEL) + message = {"event_id": "e", "tenant_id": "t", "source_app": "a", "environment": "dev", "timestamp": 1} + ok, err = wp.write("public.cps.za.test", message) + assert ok and err is None + assert any("Postgres payload" in rec.message for rec in caplog.records)