Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update datasets.rst issue with running example code #35035

Merged
merged 1 commit into from Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Expand Up @@ -230,8 +230,8 @@ Example:
@task
def print_triggering_dataset_events(triggering_dataset_events=None):
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list, dataset_list[dataset])
print(dataset_list[dataset][0].source_dag_run.dag_run_id)
print(dataset, dataset_list)
print(dataset_list[0].source_dag_run.dag_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the signature of triggering_dataset_events in airflow/utils/context.pyi is defined with typing Mapping[str, Collection[DatasetEvent | DatasetEventPydantic]].

I assume the example is also after correction not correct, it rather needs to be (see airflow/models/dataset.py:275):

Suggested change
print(dataset_list[0].source_dag_run.dag_id)
print(dataset_list[0].source_dag_id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are correct:

Running:

        for dataset, dataset_list in triggering_dataset_events.items():
            print(dataset, dataset_list)
            print(f"{dataset_list[0].source_dag_run.dag_id=}")
            print(f"{dataset_list[0].source_dag_id=}")

results in:

[2023-10-18, 19:34:38 UTC] {logging_mixin.py:151} INFO - local://some_file.txt [DatasetEvent(id=6, dataset_id=6, extra={}, source_task_id='producer_task', source_dag_id='producer_dag', source_run_id='manual__2023-10-18T19:34:37.507108+00:00', source_map_index=-1)]
[2023-10-18, 19:34:38 UTC] {logging_mixin.py:151} INFO - dataset_list[0].source_dag_run.dag_id='producer_dag'
[2023-10-18, 19:34:38 UTC] {logging_mixin.py:151} INFO - dataset_list[0].source_dag_id='producer_dag'

(Granted my test dag is producer_dag and not load_snowflake_data from the demo.

I'm open to either but I like showing source_dag_run because it can also hint at getting items like. source_dag_run.data_interval_start which could be useful for the consumer dag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah, yes, you are right. I over-looked /home/jscheffl/Workspace/airflow/airflow/models/dataset.py:304 where the relation is modelled. Danger-zone is that from typing it can be also class of type DatasetEventPydantic if airflow switches to use internal API and this does not carry the relation parameter. Internal API is coming, adding a bit of more API complexity here for the future benefit of multi tenancy.


print_triggering_dataset_events()

Expand Down