-
Notifications
You must be signed in to change notification settings - Fork 0
Flush timeout error handling for Kafka writer #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flush timeout error handling for Kafka writer #76
Conversation
WalkthroughThe pull request adds resilience to Kafka message flushing by introducing retry logic with exponential backoff, timeout handling with fallback support, and improved error tracking. Dependencies are updated, and documentation is modified to use pip3. New test coverage validates retry behavior and edge cases. Changes
Sequence DiagramsequenceDiagram
participant Client
participant write_messages
participant flush_with_timeout
participant Producer
participant Kafka
Client->>write_messages: write_messages(records)
write_messages->>Producer: produce(record) [×N]
write_messages->>flush_with_timeout: flush_with_timeout(producer, timeout)
rect rgb(100, 150, 255)
Note over flush_with_timeout: Retry Loop (up to MAX_RETRIES)
loop Each retry
flush_with_timeout->>Producer: flush(timeout)
alt Messages pending
Producer-->>flush_with_timeout: remaining > 0
flush_with_timeout->>flush_with_timeout: wait RETRY_BACKOFF_SEC
else All flushed
Producer-->>flush_with_timeout: remaining = 0
flush_with_timeout-->>write_messages: success (0)
end
end
end
alt Retries exhausted & pending remain
flush_with_timeout-->>write_messages: warning + return remaining
else TypeError (stub fallback)
flush_with_timeout->>Producer: flush() [no timeout]
Producer-->>flush_with_timeout: result
end
write_messages->>write_messages: aggregate errors (Produce + Flush)
write_messages-->>Client: result (success/failure)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)src/writer_kafka.py (2)
🔇 Additional comments (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/test_writer_kafka.py (1)
52-60: Remove unusednoqadirective.The
FakeProducerTypeErrorcorrectly tests the TypeError fallback path. However, line 58 has an unnecessarynoqa: D401directive since there's no docstring to which D401 would apply.Apply this diff:
- def flush(self): # noqa: D401 + def flush(self): self.flush_calls += 1 return 0
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
DEVELOPER.md(1 hunks)requirements.txt(1 hunks)src/writer_kafka.py(3 hunks)tests/test_writer_kafka.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/test_writer_kafka.py (1)
src/writer_kafka.py (1)
write(80-133)
src/writer_kafka.py (2)
tests/test_event_gate_lambda.py (1)
flush(51-52)tests/test_writer_kafka.py (4)
flush(15-16)flush(31-38)flush(47-49)flush(58-60)
🪛 Ruff (0.14.3)
tests/test_writer_kafka.py
31-31: Unused method argument: a
(ARG002)
31-31: Unused method argument: kw
(ARG002)
47-47: Unused method argument: a
(ARG002)
47-47: Unused method argument: kw
(ARG002)
58-58: Unused noqa directive (non-enabled: D401)
Remove unused noqa directive
(RUF100)
src/writer_kafka.py
128-128: Consider moving this statement to an else block
(TRY300)
🔇 Additional comments (9)
DEVELOPER.md (1)
31-31: LGTM! Clear Python 3 specification.The change to
pip3aligns with the Python 3.13 requirement and ensures developers explicitly use the correct package manager.src/writer_kafka.py (4)
25-25: LGTM! Import required for retry backoff.The
timemodule is appropriately added to support the retry sleep mechanism.
38-41: LGTM! Well-designed configurable retry parameters.The configuration defaults are sensible:
- Increased flush timeout (5s → 7s) provides more tolerance for transient delays
- 3 retries with 0.5s backoff yields ~8.5s max wait time
- Environment-driven configuration enables tuning without code changes
105-128: Retry logic is well-implemented, but verify the pending-message success behavior.The retry mechanism correctly:
- Attempts flush up to
_MAX_RETRIEStimes with exponential backoff- Handles stub producers that return
Nonefrom flush- Aggregates errors from produce callbacks
- Logs appropriate warnings during retry attempts
However, lines 123-128 return
Trueeven when messages remain pending after all retries. This means the function succeeds despite potential message loss. Please confirm this aligns with the intended behavior for handling transient Kafka issues.Consider documenting this behavior in the function docstring to clarify that the function may return success with a warning when messages are still pending but no explicit errors occurred.
130-133: LGTM! Clear exception handling with proper logging.The KafkaException handling appropriately logs the full exception context and returns a descriptive error message.
tests/test_writer_kafka.py (3)
25-38: LGTM! Well-designed test fixture for retry scenarios.The
FakeProducerFlushSequencecorrectly simulates decreasing pending message counts across retry attempts. The*a, **kwparameters in the flush method are intentional to accept the timeout argument used in production code.
41-49: LGTM! Appropriate fixture for timeout scenarios.The
FakeProducerTimeouteffectively simulates persistent pending messages to test the timeout warning path. The*a, **kwsignature is intentional.
102-141: LGTM! Comprehensive test coverage for new retry and timeout behavior.The three new tests effectively cover:
- Retry until success (102-115): Verifies retry loop breaks on zero remaining messages and logs appropriate warnings
- Timeout warning (118-130): Confirms the function returns success with a warning when messages remain pending after all retries
- TypeError fallback (133-141): Tests the compatibility layer for stub producers without timeout parameter support
The test assertions are precise and the use of
monkeypatchfor configuration is appropriate.requirements.txt (1)
14-14: Confluent-kafka 2.12.1 is current and secure.The version 2.12.1 is the latest release on PyPI, and no security vulnerabilities were found. The minor version bump is backward compatible and requires no action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/writer_kafka.py (2)
38-41: Add validation for environment variable values.The configuration values are read from environment variables and cast to float/int without validation. Invalid or negative values could cause unexpected behavior or runtime errors.
Consider adding validation:
# Configurable flush timeouts and retries via env variables to avoid hanging indefinitely -_KAFKA_FLUSH_TIMEOUT_SEC = float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7")) -_MAX_RETRIES = int(os.environ.get("KAFKA_FLUSH_RETRIES", "3")) -_RETRY_BACKOFF_SEC = float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")) +_KAFKA_FLUSH_TIMEOUT_SEC = max(0.1, float(os.environ.get("KAFKA_FLUSH_TIMEOUT", "7"))) +_MAX_RETRIES = max(1, int(os.environ.get("KAFKA_FLUSH_RETRIES", "3"))) +_RETRY_BACKOFF_SEC = max(0.0, float(os.environ.get("KAFKA_RETRY_BACKOFF", "0.5")))
105-116: Consider breaking early when delivery errors are detected.The retry loop continues even when the produce callback has already reported delivery errors. Once a message fails to deliver (errors list is populated), retrying flush won't help since Kafka won't retry a failed delivery—it only helps with timeouts/pending messages.
Consider adding an early break on errors:
remaining: Optional[int] = None for attempt in range(1, _MAX_RETRIES + 1): remaining = flush_with_timeout(producer, _KAFKA_FLUSH_TIMEOUT_SEC) + # Break early if delivery failed + if errors: + break # Treat None (flush returns None in some stubs) as success equivalent to 0 pending - if (remaining is None or remaining == 0) and not errors: + if remaining is None or remaining == 0: break if attempt < _MAX_RETRIES: logger.warning( "Kafka flush pending (%s message(s) remain) attempt %d/%d", remaining, attempt, _MAX_RETRIES ) time.sleep(_RETRY_BACKOFF_SEC)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/writer_kafka.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/writer_kafka.py (3)
tests/test_extract_token.py (1)
get(37-38)tests/test_event_gate_lambda.py (2)
get(85-86)flush(51-52)tests/test_writer_kafka.py (4)
flush(15-16)flush(31-38)flush(47-49)flush(58-60)
🪛 Ruff (0.14.3)
src/writer_kafka.py
128-128: Consider moving this statement to an else block
(TRY300)
🔇 Additional comments (2)
src/writer_kafka.py (2)
122-128: Clarify success criteria when messages remain pending.The function returns
True(success) even whenremaining > 0(messages still pending after exhausting retries). This means messages that were never delivered are treated as successful writes, which could mask delivery failures from the caller's perspective.Is this intentional? If messages are still pending, should this be considered a failure rather than just a warning? Please verify that returning success for pending messages aligns with the expected behavior for downstream consumers of this API.
Note: The static analysis tool (Ruff TRY300) also suggests moving the return statement to an
elseblock for better structure:- # Log a warning if there are still pending messages after retries - if isinstance(remaining, int) and remaining > 0: - logger.warning( - "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining - ) - - return True, None + # Log a warning if there are still pending messages after retries + if isinstance(remaining, int) and remaining > 0: + logger.warning( + "Kafka flush timeout after %ss: %d message(s) still pending", _KAFKA_FLUSH_TIMEOUT_SEC, remaining + ) + else: + return True, NoneHowever, you may want to reconsider whether pending messages should return
Falseinstead.
136-149: LGTM! Return type correctly specified.The return type
Optional[int]correctly reflects that stub producers may returnNone. The TypeError fallback for stub producers is also handled appropriately.
oto-macenauer-absa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have a look at the publish flow, please.
|
Trivy has completed a full security repository scan ✅ You can find the analysis results for this PR branch on this overview.
|
Overview
This pull request focuses on making Kafka message delivery more robust against transient failures, timeouts and ensuring that warnings and errors are properly logged and handled.
Release Notes:
Closes #75
Summary by CodeRabbit
Improvements
Chores