Skip to content

Returning TriggerEvent's on exception in BaseEventTrigger children #54804

@jroachgolf84

Description

@jroachgolf84

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I'm working on adding the PubsubPullTrigger as a supported queue to be used by the MessageQueueTrigger for event-driven scheduling. However, in the code snippet below, a TriggerEvent is yielded when an exception occurs in the run method. This actually triggers a DAG run when used for Asset-driven scheduling. In my case, the exception was a missing project_id when instantiating the Trigger. This triggered DAGs runs over and over until I provided a project_id.

async def run(self) -> AsyncIterator[TriggerEvent]:
try:
while True:
if pulled_messages := await self.hook.pull(
project_id=self.project_id,
subscription=self.subscription,
max_messages=self.max_messages,
return_immediately=True,
):
if self.ack_messages:
await self.message_acknowledgement(pulled_messages)
messages_json = [ReceivedMessage.to_dict(m) for m in pulled_messages]
yield TriggerEvent({"status": "success", "message": messages_json})
return
self.log.info("Sleeping for %s seconds.", self.poke_interval)
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return

When a TriggerEvent is not yielded, the Triggerer throws an exception that says something along the lines of "TriggerEvent was not returned".

Not really a bug, more of a question around weird behavior.

What you think should happen instead?

When a child of the BaseEventTrigger class returns a TriggerEvent with "status": "error", an AssetEvent should not be created.

How to reproduce

Please see the existing PR for details around the changes that are needed to reproduce this.

#54684

Operating System

MacOS

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    AIP-82External event driven scheduling in Airflowarea:Triggererarea:coreneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions