Skip to content

Add cleanup to Kafka AwaitMessageTrigger for consumer management#64612

Open
jason810496 wants to merge 2 commits intoapache:mainfrom
jason810496:providers/apache-kafka/add-proper-cleanup-for-kafka-await-message
Open

Add cleanup to Kafka AwaitMessageTrigger for consumer management#64612
jason810496 wants to merge 2 commits intoapache:mainfrom
jason810496:providers/apache-kafka/add-proper-cleanup-for-kafka-await-message

Conversation

@jason810496
Copy link
Copy Markdown
Member

Why

When testing out Kafka external-event driven Dag for ##64205 scenario, I encounter the following error with only enabling one consumer Dag

max.poll.interval.ms default 300000 timeout ...

What

I verify locally that adding proper resource cleanup for Kafka Consumer can avoid the above error.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below) Codex for unit tests following the guidelines

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds explicit Kafka consumer cleanup to AwaitMessageTrigger to prevent lingering consumers (and related max.poll.interval.ms issues) when triggers finish or are removed by the triggerer.

Changes:

  • Store the created Kafka consumer on the trigger instance for later cleanup.
  • Add a cleanup() implementation that closes the consumer.
  • Add unit tests covering consumer close on cleanup and no-op cleanup when no consumer exists.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py Persist consumer reference and add cleanup() to close it.
providers/apache/kafka/tests/unit/apache/kafka/triggers/test_await_message.py Add unit tests validating cleanup behavior and extend mocked consumer with close().

@jason810496 jason810496 self-assigned this Apr 2, 2026
@jason810496 jason810496 force-pushed the providers/apache-kafka/add-proper-cleanup-for-kafka-await-message branch from a3fd51c to ab34e72 Compare April 7, 2026 02:38
self._consumer = None
try:
await sync_to_async(consumer.close)()
except Exception:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to narrow the exception?

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

Comment on lines +114 to +117
self._consumer = await async_get_consumer()

async_poll = sync_to_async(consumer.poll)
async_commit = sync_to_async(consumer.commit)
async_poll = sync_to_async(self._consumer.poll)
async_commit = sync_to_async(self._consumer.commit)
Comment on lines +149 to +156
async def cleanup(self) -> None:
consumer = self._consumer
if consumer is not None:
self._consumer = None
try:
await sync_to_async(consumer.close)()
except Exception:
log.warning("Failed to close Kafka consumer", exc_info=True)
try:
await sync_to_async(consumer.close)()
except Exception:
log.warning("Failed to close Kafka consumer", exc_info=True)
Comment on lines +164 to +167
generator = trigger.run()
await generator.__anext__()
await trigger.cleanup()
await generator.aclose()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants