Skip to content

Implement comm handlers for callback supervisor#65269

Open
ferruzzi wants to merge 3 commits intoapache:mainfrom
aws-mwaa:ferruzzi/executor-callbacks/comms-v2
Open

Implement comm handlers for callback supervisor#65269
ferruzzi wants to merge 3 commits intoapache:mainfrom
aws-mwaa:ferruzzi/executor-callbacks/comms-v2

Conversation

@ferruzzi
Copy link
Copy Markdown
Contributor

@ferruzzi ferruzzi commented Apr 14, 2026

This enables limited API access from within the callback supervisor. With this change, callbacks can use GetVariable, GetConnection, GetXCom, GetAsset (ByName and ByUri), and MaskSecret. We can add more as they are requested, but this felt like a good foundation. These are common with the existing TaskInstance supervisor so those are extracted out into helpers in a new shared request_handlers.py location.

One thing to note is that Triggerer previously did not mask secrets conn.password or conn.extra and will now, bringing it inline with the other two. This is a change but felt like a bugfix since it's reasonable to assume that passwords, etc are masked by default.

Testing:
One way to test this manually, create a new dag with a custom callback:

In your dag bag:

@task.bash(task_id="sleep_task")
def sleep_10_secs():
    return "sleep 10"
    
with DAG(
    dag_id="comms_channel_testing",
    deadline=DeadlineAlert(
        reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
        interval=timedelta(0),
        callback=SyncCallback(get_variable),
    ),
):
    sleep_10_secs()

place the get_variable callback in a file in your plugins directory:

def get_variable():
    # Be sure to manually create the Variable in the UI or CLI; environment variables use a different code path
    from airflow.sdk.definitions.variable import Variable
    val = Variable.get("test_key", default="no comms!")
    print(f"**************  Successful callback! Variable value: {val}  ************** ")

launch Airflow or Breeze and add the Variable named test_key via the UI or CLI (note that setting it via EnvVar is a different code path and won't hit this change).

Run the Dag. Since the fixed_datetime deadline is in the past, it will fire the callback immediately and you should see the success message bubble up into the Scheduler logs within a few seconds.

@kaxil - This is the final step to enable the Connections, Variables, etc that I believe you (and others) were asking about now that the previous PRs are merged.

Followup to #62645


Was generative AI tooling used to co-author this PR?
  • [ x ] Yes (please specify the tool below)
    Some assistance from Cline/Claude

Comment thread airflow-core/src/airflow/jobs/triggerer_job_runner.py
Comment thread task-sdk/src/airflow/sdk/execution_time/request_handlers.py
# This is a minimal subset of ToSupervisor: read-only access to Connections,
# Variables, XCom, and Assets, plus MaskSecret for the secrets masker.
CallbackToSupervisor = Annotated[
GetAssetByName | GetAssetByUri | GetConnection | GetVariable | GetXCom | MaskSecret,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetXCom requires dag_id, run_id, and task_id, but callbacks (deadline callbacks especially) do not have an implicit task context. A user calling XCom.get() inside a callback would need to know and pass these explicitly. Might be worth a note in the CallbackToSupervisor comment explaining that XCom access requires explicit identifiers since there is no task context -- or documenting which message types are practical in callbacks vs included for completeness.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually debated including this at all. Do you think it's useful to have in a callback?

Comment thread task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants