Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cd EventGate
```shell
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
pip3 install -r requirements.txt
```

## Run Pylint Tool Locally
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ jsonschema==4.25.1
PyJWT==2.10.1
requests==2.32.5
boto3==1.40.25
confluent-kafka==2.11.1
confluent-kafka==2.12.1
# psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use
psycopg2==2.9.10
72 changes: 52 additions & 20 deletions src/writer_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import json
import logging
import os
import time
from typing import Any, Dict, Optional, Tuple

from confluent_kafka import Producer

try: # KafkaException may not exist in stubbed test module
Expand All @@ -35,8 +35,10 @@ class KafkaException(Exception): # type: ignore


STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "producer": None}
# Configurable flush timeout (seconds) to avoid hanging indefinitely
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "5"))
# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely
_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7"))
_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3"))
_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5"))


def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -86,12 +88,14 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
"""
logger = STATE["logger"]
producer: Optional[Producer] = STATE.get("producer") # type: ignore[assignment]

if producer is None:
logger.debug("Kafka producer not initialized - skipping")
return True, None

errors: list[Any] = []
errors: list[str] = []
has_exception = False

# Produce step
try:
logger.debug("Sending to kafka %s", topic_name)
producer.produce(
Expand All @@ -100,23 +104,51 @@ def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]
value=json.dumps(message).encode("utf-8"),
callback=lambda err, msg: (errors.append(str(err)) if err is not None else None),
)
except KafkaException as e:
errors.append(f"Produce exception: {e}")
has_exception = True

# Flush step (always attempted)
remaining: Optional[int] = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
remaining = producer.flush(_KAFKA_FLUSH_TIMEOUT_SEC) # type: ignore[arg-type]
except TypeError: # Fallback for stub producers without timeout parameter
remaining = producer.flush() # type: ignore[call-arg]
# remaining can be number of undelivered messages (confluent_kafka returns int)
if not errors and isinstance(remaining, int) and remaining > 0:
timeout_msg = f"Kafka flush timeout after {_KAFKA_FLUSH_TIMEOUT_SEC}s: {remaining} message(s) still pending"
logger.error(timeout_msg)
return False, timeout_msg
except KafkaException as e: # narrow exception capture
err_msg = f"The Kafka writer failed with unknown error: {str(e)}"
logger.exception(err_msg)
return False, err_msg
remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC)
except KafkaException as e:
errors.append(f"Flush exception: {e}")
has_exception = True

# Treat None (flush returns None in some stubs) as success equivalent to 0 pending
if (remaining is None or remaining == 0) and not errors:
break
if attempt < _MAX_RETRIES:
logger.warning("Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES)
time.sleep(_RETRY_BACKOFF_SEC)

# Warn if messages still pending after retries
if isinstance(remaining, int) and remaining > 0:
logger.warning(
"Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining
)

if errors:
msg = "; ".join(errors)
logger.error(msg)
return False, msg
failure_text = "Kafka writer failed: " + "; ".join(errors)
(logger.exception if has_exception else logger.error)(failure_text)
return False, failure_text

return True, None


def flush_with_timeout(producer, timeout: float) -> Optional[int]:
"""Flush the Kafka producer with a timeout, handling TypeError for stubs.

Args:
producer: Kafka Producer instance.
timeout: Timeout in seconds.
Returns:
Number of messages still pending after the flush call (0 all messages delivered).
None is returned only if the underlying (stub/mock) producer.flush() does not provide a count.
"""
try:
return producer.flush(timeout)
except TypeError: # Fallback for stub producers without timeout parameter
return producer.flush()
81 changes: 80 additions & 1 deletion tests/test_writer_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
from types import SimpleNamespace
import src.writer_kafka as wk
Expand All @@ -23,6 +22,44 @@ def produce(self, topic, key, value, callback): # noqa: D401
callback("ERR", None)


class FakeProducerFlushSequence(FakeProducerSuccess):
def __init__(self, sequence): # sequence of remaining counts per flush call
super().__init__()
self.sequence = sequence
self.flush_calls = 0

def flush(self, *a, **kw):
# Simulate decreasing remaining messages
if self.flush_calls < len(self.sequence):
val = self.sequence[self.flush_calls]
else:
val = self.sequence[-1]
self.flush_calls += 1
return val


class FakeProducerTimeout(FakeProducerSuccess):
def __init__(self, remaining_value):
super().__init__()
self.remaining_value = remaining_value
self.flush_calls = 0

def flush(self, *a, **kw): # always returns same remaining >0 to force timeout warning
self.flush_calls += 1
return self.remaining_value


class FakeProducerTypeError(FakeProducerSuccess):
def __init__(self):
super().__init__()
self.flush_calls = 0

# Intentionally omit timeout parameter causing TypeError on first attempt inside flush_with_timeout
def flush(self): # noqa: D401
self.flush_calls += 1
return 0


def test_write_skips_when_producer_none(monkeypatch):
wk.STATE["logger"] = logging.getLogger("test")
wk.STATE["producer"] = None
Expand Down Expand Up @@ -60,3 +97,45 @@ def produce(self, *a, **kw): # noqa: D401
wk.STATE["producer"] = RaisingProducer()
ok, err = wk.write("topic", {"d": 4})
assert not ok and "boom" in err


def test_write_flush_retries_until_success(monkeypatch, caplog):
wk.STATE["logger"] = logging.getLogger("test")
caplog.set_level(logging.WARNING)
# Force smaller max retries for deterministic sequence length
monkeypatch.setattr(wk, "_MAX_RETRIES", 5, raising=False)
producer = FakeProducerFlushSequence([5, 4, 3, 1, 0])
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"e": 5})
assert ok and err is None
# It should break as soon as remaining == 0 (after flush call returning 0)
assert producer.flush_calls == 5 # sequence consumed until 0
# Warnings logged for attempts before success (flush_calls -1) because last attempt didn't warn
warn_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING]
assert any("attempt 1" in m or "attempt 2" in m for m in warn_messages)


def test_write_timeout_warning_when_remaining_after_retries(monkeypatch, caplog):
wk.STATE["logger"] = logging.getLogger("test")
caplog.set_level(logging.WARNING)
monkeypatch.setattr(wk, "_MAX_RETRIES", 3, raising=False)
producer = FakeProducerTimeout(2)
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"f": 6})
timeout_warnings = [
r.message for r in caplog.records if "timeout" in r.message
] # final warning should mention timeout
assert ok and err is None # function returns success even if timeout warning
assert timeout_warnings, "Expected timeout warning logged"
assert producer.flush_calls == 3 # retried 3 times


def test_flush_with_timeout_typeerror_fallback(monkeypatch):
wk.STATE["logger"] = logging.getLogger("test")
monkeypatch.setattr(wk, "_MAX_RETRIES", 4, raising=False)
producer = FakeProducerTypeError()
wk.STATE["producer"] = producer
ok, err = wk.write("topic", {"g": 7})
assert ok and err is None
# Since flush returns 0 immediately, only one flush call should be needed
assert producer.flush_calls == 1