diff --git a/chapter06/dags/figure_6_1.py b/chapter06/dags/figure_6_1.py index 2ffd8351..2a7cda5b 100644 --- a/chapter06/dags/figure_6_1.py +++ b/chapter06/dags/figure_6_1.py @@ -1,18 +1,17 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator -dag = DAG( +with DAG( dag_id="figure_6_01", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", + schedule="0 16 * * *", description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.", default_args={"depends_on_past": True}, -) +): + create_metrics = EmptyOperator(task_id="create_metrics") -create_metrics = DummyOperator(task_id="create_metrics", dag=dag) - -for supermarket_id in [1, 2, 3, 4]: - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - copy >> process >> create_metrics + for supermarket_id in [1, 2, 3, 4]: + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + copy >> process >> create_metrics diff --git a/chapter06/dags/figure_6_11.py b/chapter06/dags/figure_6_11.py index 8afea435..dff13049 100644 --- a/chapter06/dags/figure_6_11.py +++ b/chapter06/dags/figure_6_11.py @@ -2,17 +2,9 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.python import PythonSensor -dag = DAG( - dag_id="figure_6_11", - start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", - description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", - default_args={"depends_on_past": True}, -) - def _wait_for_supermarket(supermarket_id_): supermarket_path = Path("/data/" + supermarket_id_) @@ -21,15 +13,21 @@ def _wait_for_supermarket(supermarket_id_): return data_files and success_file.exists() -for supermarket_id in range(1, 5): - wait = PythonSensor( - task_id=f"wait_for_supermarket_{supermarket_id}", - python_callable=_wait_for_supermarket, - op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, - timeout=600, - dag=dag, - ) - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - create_metrics = DummyOperator(task_id=f"create_metrics_{supermarket_id}", dag=dag) - wait >> copy >> process >> create_metrics +with DAG( + dag_id="figure_6_11", + start_date=pendulum.today("UTC").add(days=-3), + schedule="0 16 * * *", + description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", + default_args={"depends_on_past": True}, +): + for supermarket_id in range(1, 5): + wait = PythonSensor( + task_id=f"wait_for_supermarket_{supermarket_id}", + python_callable=_wait_for_supermarket, + op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, + timeout=600, + ) + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + create_metrics = EmptyOperator(task_id=f"create_metrics_{supermarket_id}") + wait >> copy >> process >> create_metrics diff --git a/chapter06/dags/figure_6_12.py b/chapter06/dags/figure_6_12.py index db69cb90..42f96cd4 100644 --- a/chapter06/dags/figure_6_12.py +++ b/chapter06/dags/figure_6_12.py @@ -2,17 +2,9 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.python import PythonSensor -dag = DAG( - dag_id="figure_6_12", - start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", - description="A batch workflow for ingesting supermarket promotions data.", - default_args={"depends_on_past": True}, -) - def _wait_for_supermarket(supermarket_id_): supermarket_path = Path("/data/" + supermarket_id_) @@ -21,28 +13,34 @@ def _wait_for_supermarket(supermarket_id_): return data_files and success_file.exists() -for supermarket_id in [1, 2, 3, 4]: - wait = PythonSensor( - task_id=f"wait_for_supermarket_{supermarket_id}", - python_callable=_wait_for_supermarket, - op_kwargs={"supermarket_id": f"supermarket{supermarket_id}"}, - dag=dag, - ) - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - generate_metrics = DummyOperator(task_id=f"generate_metrics_supermarket_{supermarket_id}", dag=dag) - compute_differences = DummyOperator(task_id=f"compute_differences_supermarket_{supermarket_id}", dag=dag) - update_dashboard = DummyOperator(task_id=f"update_dashboard_supermarket_{supermarket_id}", dag=dag) - notify_new_data = DummyOperator(task_id=f"notify_new_data_supermarket_{supermarket_id}", dag=dag) +with DAG( + dag_id="figure_6_12", + start_date=pendulum.today("UTC").add(days=-3), + schedule="0 16 * * *", + description="A batch workflow for ingesting supermarket promotions data.", + default_args={"depends_on_past": True}, +): + for supermarket_id in [1, 2, 3, 4]: + wait = PythonSensor( + task_id=f"wait_for_supermarket_{supermarket_id}", + python_callable=_wait_for_supermarket, + op_kwargs={"supermarket_id": f"supermarket{supermarket_id}"}, + ) + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + generate_metrics = EmptyOperator(task_id=f"generate_metrics_supermarket_{supermarket_id}") + compute_differences = EmptyOperator(task_id=f"compute_differences_supermarket_{supermarket_id}") + update_dashboard = EmptyOperator(task_id=f"update_dashboard_supermarket_{supermarket_id}") + notify_new_data = EmptyOperator(task_id=f"notify_new_data_supermarket_{supermarket_id}") - ( - wait - >> copy - >> process - >> generate_metrics - >> [ - compute_differences, - notify_new_data, - ] - ) - compute_differences >> update_dashboard + ( + wait + >> copy + >> process + >> generate_metrics + >> [ + compute_differences, + notify_new_data, + ] + ) + compute_differences >> update_dashboard diff --git a/chapter06/dags/figure_6_17.py b/chapter06/dags/figure_6_17.py index 0f8f8d86..15ac6ab5 100644 --- a/chapter06/dags/figure_6_17.py +++ b/chapter06/dags/figure_6_17.py @@ -1,6 +1,6 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator @@ -9,15 +9,15 @@ example_1_dag_1 = DAG( dag_id="figure_6_17_example_1_dag_1", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) example_1_dag_2 = DAG( dag_id="figure_6_17_example_1_dag_2", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) -DummyOperator(task_id="etl", dag=example_1_dag_1) >> TriggerDagRunOperator( +EmptyOperator(task_id="etl", dag=example_1_dag_1) >> TriggerDagRunOperator( task_id="trigger_dag2", trigger_dag_id="figure_6_17_example_1_dag_2", dag=example_1_dag_1, @@ -29,26 +29,26 @@ example_2_dag_1 = DAG( dag_id="figure_6_17_example_2_dag_1", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) example_2_dag_2 = DAG( dag_id="figure_6_17_example_2_dag_2", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) example_2_dag_3 = DAG( dag_id="figure_6_17_example_2_dag_3", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) example_2_dag_4 = DAG( dag_id="figure_6_17_example_2_dag_4", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) for dag_ in [example_2_dag_1, example_2_dag_2, example_2_dag_3]: - DummyOperator(task_id="etl", dag=dag_) >> TriggerDagRunOperator( + EmptyOperator(task_id="etl", dag=dag_) >> TriggerDagRunOperator( task_id="trigger_dag4", trigger_dag_id="figure_6_17_example_2_dag_4", dag=dag_ ) @@ -59,25 +59,25 @@ example_3_dag_1 = DAG( dag_id="figure_6_17_example_3_dag_1", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) example_3_dag_2 = DAG( dag_id="figure_6_17_example_3_dag_2", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) example_3_dag_3 = DAG( dag_id="figure_6_17_example_3_dag_3", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) example_3_dag_4 = DAG( dag_id="figure_6_17_example_3_dag_4", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) -DummyOperator(task_id="etl", dag=example_3_dag_1) >> [ +EmptyOperator(task_id="etl", dag=example_3_dag_1) >> [ TriggerDagRunOperator( task_id="trigger_dag2", trigger_dag_id="figure_6_17_example_3_dag_2", diff --git a/chapter06/dags/figure_6_19.py b/chapter06/dags/figure_6_19.py index bc00e7e3..969735de 100644 --- a/chapter06/dags/figure_6_19.py +++ b/chapter06/dags/figure_6_19.py @@ -1,33 +1,33 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.sensors.external_task import ExternalTaskSensor dag1 = DAG( dag_id="figure_6_19_dag_1", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) dag2 = DAG( dag_id="figure_6_19_dag_2", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) dag3 = DAG( dag_id="figure_6_19_dag_3", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 0 * * *", + schedule="0 0 * * *", ) dag4 = DAG( dag_id="figure_6_19_dag_4", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval=None, + schedule=None, ) -DummyOperator(task_id="etl", dag=dag1) -DummyOperator(task_id="etl", dag=dag2) -DummyOperator(task_id="etl", dag=dag3) +EmptyOperator(task_id="etl", dag=dag1) +EmptyOperator(task_id="etl", dag=dag2) +EmptyOperator(task_id="etl", dag=dag3) [ ExternalTaskSensor( task_id="wait_for_etl_dag1", diff --git a/chapter06/dags/figure_6_20.py b/chapter06/dags/figure_6_20.py index 127a2c42..ae9b7bb7 100644 --- a/chapter06/dags/figure_6_20.py +++ b/chapter06/dags/figure_6_20.py @@ -2,21 +2,21 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskSensor dag1 = DAG( dag_id="figure_6_20_dag_1", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", + schedule="0 16 * * *", ) dag2 = DAG( dag_id="figure_6_20_dag_2", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 18 * * *", + schedule="0 18 * * *", ) -DummyOperator(task_id="copy_to_raw", dag=dag1) >> DummyOperator(task_id="process_supermarket", dag=dag1) +EmptyOperator(task_id="copy_to_raw", dag=dag1) >> EmptyOperator(task_id="process_supermarket", dag=dag1) wait = ExternalTaskSensor( task_id="wait_for_process_supermarket", @@ -25,5 +25,5 @@ execution_delta=datetime.timedelta(hours=6), dag=dag2, ) -report = DummyOperator(task_id="report", dag=dag2) +report = EmptyOperator(task_id="report", dag=dag2) wait >> report diff --git a/chapter06/dags/figure_6_5.py b/chapter06/dags/figure_6_5.py index ebb7452b..ab433506 100644 --- a/chapter06/dags/figure_6_5.py +++ b/chapter06/dags/figure_6_5.py @@ -1,24 +1,22 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.filesystem import FileSensor -dag = DAG( +with DAG( dag_id="figure_6_05", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", + schedule="0 16 * * *", description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.", default_args={"depends_on_past": True}, -) +): + create_metrics = EmptyOperator(task_id="create_metrics") -create_metrics = DummyOperator(task_id="create_metrics", dag=dag) - -for supermarket_id in [1, 2, 3, 4]: - wait = FileSensor( - task_id=f"wait_for_supermarket_{supermarket_id}", - filepath=f"/data/supermarket{supermarket_id}/data.csv", - dag=dag, - ) - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - wait >> copy >> process >> create_metrics + for supermarket_id in [1, 2, 3, 4]: + wait = FileSensor( + task_id=f"wait_for_supermarket_{supermarket_id}", + filepath=f"/data/supermarket{supermarket_id}/data.csv", + ) + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + wait >> copy >> process >> create_metrics diff --git a/chapter06/dags/figure_6_6.py b/chapter06/dags/figure_6_6.py index bcfbb464..1867c06b 100644 --- a/chapter06/dags/figure_6_6.py +++ b/chapter06/dags/figure_6_6.py @@ -2,19 +2,9 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.python import PythonSensor -dag = DAG( - dag_id="figure_6_06", - start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", - description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", - default_args={"depends_on_past": True}, -) - -create_metrics = DummyOperator(task_id="create_metrics", dag=dag) - def _wait_for_supermarket(supermarket_id_): supermarket_path = Path("/data/" + supermarket_id_) @@ -23,14 +13,22 @@ def _wait_for_supermarket(supermarket_id_): return data_files and success_file.exists() -for supermarket_id in range(1, 5): - wait = PythonSensor( - task_id=f"wait_for_supermarket_{supermarket_id}", - python_callable=_wait_for_supermarket, - op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, - timeout=600, - dag=dag, - ) - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - wait >> copy >> process >> create_metrics +with DAG( + dag_id="figure_6_06", + start_date=pendulum.today("UTC").add(days=-3), + schedule="0 16 * * *", + description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", + default_args={"depends_on_past": True}, +): + create_metrics = EmptyOperator(task_id="create_metrics") + + for supermarket_id in range(1, 5): + wait = PythonSensor( + task_id=f"wait_for_supermarket_{supermarket_id}", + python_callable=_wait_for_supermarket, + op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, + timeout=600, + ) + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + wait >> copy >> process >> create_metrics diff --git a/chapter06/dags/figure_6_8.py b/chapter06/dags/figure_6_8.py index c0486581..fce75430 100644 --- a/chapter06/dags/figure_6_8.py +++ b/chapter06/dags/figure_6_8.py @@ -1,21 +1,19 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.filesystem import FileSensor -dag = DAG( +with DAG( dag_id="figure_6_08", start_date=pendulum.today("UTC").add(days=-14), - schedule_interval="0 16 * * *", + schedule="0 16 * * *", description="Create a file /data/supermarket1/data.csv, and behold a sensor deadlock.", -) - -create_metrics = DummyOperator(task_id="create_metrics", dag=dag) -for supermarket_id in [1, 2, 3, 4]: - copy = FileSensor( - task_id=f"copy_to_raw_supermarket_{supermarket_id}", - filepath=f"/data/supermarket{supermarket_id}/data.csv", - dag=dag, - ) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - copy >> process >> create_metrics +): + create_metrics = EmptyOperator(task_id="create_metrics") + for supermarket_id in [1, 2, 3, 4]: + copy = FileSensor( + task_id=f"copy_to_raw_supermarket_{supermarket_id}", + filepath=f"/data/supermarket{supermarket_id}/data.csv", + ) + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + copy >> process >> create_metrics diff --git a/chapter06/dags/figure_6_9.py b/chapter06/dags/figure_6_9.py index 65a5da6b..ee8335de 100644 --- a/chapter06/dags/figure_6_9.py +++ b/chapter06/dags/figure_6_9.py @@ -2,18 +2,9 @@ import pendulum from airflow import DAG -from airflow.operators.dummy import DummyOperator +from airflow.operators.empty import EmptyOperator from airflow.sensors.python import PythonSensor -dag = DAG( - dag_id="figure_6_09", - start_date=pendulum.today("UTC").add(days=-14), - schedule_interval="0 16 * * *", - description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", -) - -create_metrics = DummyOperator(task_id="create_metrics", dag=dag) - def _wait_for_supermarket(supermarket_id_): supermarket_path = Path("/data/" + supermarket_id_) @@ -22,15 +13,22 @@ def _wait_for_supermarket(supermarket_id_): return data_files and success_file.exists() -for supermarket_id in range(1, 5): - wait = PythonSensor( - task_id=f"wait_for_supermarket_{supermarket_id}", - python_callable=_wait_for_supermarket, - op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, - timeout=600, - mode="reschedule", - dag=dag, - ) - copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag) - process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag) - wait >> copy >> process >> create_metrics +with DAG( + dag_id="figure_6_09", + start_date=pendulum.today("UTC").add(days=-14), + schedule="0 16 * * *", + description="A batch workflow for ingesting supermarket promotions data, demonstrating the PythonSensor.", +): + create_metrics = EmptyOperator(task_id="create_metrics") + + for supermarket_id in range(1, 5): + wait = PythonSensor( + task_id=f"wait_for_supermarket_{supermarket_id}", + python_callable=_wait_for_supermarket, + op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"}, + timeout=600, + mode="reschedule", + ) + copy = EmptyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}") + process = EmptyOperator(task_id=f"process_supermarket_{supermarket_id}") + wait >> copy >> process >> create_metrics diff --git a/chapter06/dags/listing_6_1.py b/chapter06/dags/listing_6_1.py index bb7ee975..deb5d444 100644 --- a/chapter06/dags/listing_6_1.py +++ b/chapter06/dags/listing_6_1.py @@ -2,12 +2,11 @@ from airflow import DAG from airflow.sensors.filesystem import FileSensor -dag = DAG( +with DAG( dag_id="listing_6_01", start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", + schedule="0 16 * * *", description="A batch workflow for ingesting supermarket promotions data, demonstrating the FileSensor.", default_args={"depends_on_past": True}, -) - -wait = FileSensor(task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv", dag=dag) +): + wait = FileSensor(task_id="wait_for_supermarket_1", filepath="/data/supermarket1/data.csv") diff --git a/chapter06/dags/listing_6_2.py b/chapter06/dags/listing_6_2.py index 8e55ce8b..a45aa191 100644 --- a/chapter06/dags/listing_6_2.py +++ b/chapter06/dags/listing_6_2.py @@ -4,14 +4,6 @@ from airflow import DAG from airflow.sensors.python import PythonSensor -dag = DAG( - dag_id="listing_6_02", - start_date=pendulum.today("UTC").add(days=-3), - schedule_interval="0 16 * * *", - description="A batch workflow for ingesting supermarket promotions data.", - default_args={"depends_on_past": True}, -) - def _wait_for_supermarket(supermarket_id_): supermarket_path = Path("/data/" + supermarket_id_) @@ -20,9 +12,15 @@ def _wait_for_supermarket(supermarket_id_): return data_files and success_file.exists() -wait_for_supermarket_1 = PythonSensor( - task_id="wait_for_supermarket_1", - python_callable=_wait_for_supermarket, - op_kwargs={"supermarket_id": "supermarket1"}, - dag=dag, -) +with DAG( + dag_id="listing_6_02", + start_date=pendulum.today("UTC").add(days=-3), + schedule="0 16 * * *", + description="A batch workflow for ingesting supermarket promotions data.", + default_args={"depends_on_past": True}, +): + wait_for_supermarket_1 = PythonSensor( + task_id="wait_for_supermarket_1", + python_callable=_wait_for_supermarket, + op_kwargs={"supermarket_id": "supermarket1"}, + )