Skip to content

Commit

Permalink
Add the example DAGs and fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Feb 8, 2024
1 parent 0087abd commit caca0f0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
57 changes: 42 additions & 15 deletions airflow/example_dags/example_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,47 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG for demonstrating behavior of Datasets feature.
Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and
dataset expression-based scheduling.
Notes on usage:
Turn on all the dags.
Turn on all the DAGs.
DAG dataset_produces_1 should run because it's on a schedule.
dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset
being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by
dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of
either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1.
After dataset_produces_1 runs, dataset_consumes_1 should be triggered immediately
because its only dataset dependency is managed by dataset_produces_1.
dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset
from dataset_produces_2, which has no schedule and must be manually triggered.
No other dags should be triggered. Note that even though dataset_consumes_1_and_2 depends on
the dataset in dataset_produces_1, it will not be triggered until dataset_produces_2 runs
(and dataset_produces_2 is left with no schedule so that we can trigger it manually).
After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should
run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be
triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated.
consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when
either dataset is updated.
Next, trigger dataset_produces_2. After dataset_produces_2 finishes,
dataset_consumes_1_and_2 should run.
consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic.
This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset
are updated. This example highlights the capability to combine updates from multiple datasets with logical
expressions for advanced scheduling.
Dags dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled should not run because
they depend on datasets that never get updated.
conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with
dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and
dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing
Airflow's versatility in handling mixed triggers for dataset and time-based scheduling.
The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run
automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks.
"""
from __future__ import annotations

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.models.dataset import DatasetAll, DatasetAny
from airflow.operators.bash import BashOperator
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
Expand Down Expand Up @@ -154,12 +167,26 @@
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_-2_and_3_with_dataset_expressions",
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag7:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_-2_and_3_with_dataset_expressions",
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
) as dag8:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
)
5 changes: 4 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,10 @@ def test_outlet_datasets(self, create_task_instance):
assert session.query(DatasetDagRunQueue.target_dag_id).filter_by(
dataset_id=event.dataset.id
).order_by(DatasetDagRunQueue.target_dag_id).all() == [
("dataset_and_time_based_timetable",),
("conditional_dataset_and_time_based_timetable",),
("consume_1_and_2_with_dataset_expressions",),
("consume_1_or_2_with_dataset_expressions",),
("consume_1_or_both_2_and_3_with_dataset_expressions",),
("dataset_consumes_1",),
("dataset_consumes_1_and_2",),
("dataset_consumes_1_never_scheduled",),
Expand Down

0 comments on commit caca0f0

Please sign in to comment.