Skip to content

Commit

Permalink
Updating Apache example DAGs to use XComArgs (#16869)
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-fell committed Jul 23, 2021
1 parent f692fc9 commit 91f4d80
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from airflow.utils.dates import days_ago

args = {
'owner': 'Airflow',
}

with DAG(
dag_id='example_cassandra_operator',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
Expand Down
28 changes: 9 additions & 19 deletions airflow/providers/apache/hive/example_dags/example_twitter_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,17 @@ def transfertodb():
"""


# --------------------------------------------------------------------------------
# set default arguments
# --------------------------------------------------------------------------------

default_args = {
'owner': 'Ekhtiar',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

with DAG(
dag_id='example_twitter_dag',
default_args=default_args,
default_args={
'owner': 'Ekhtiar',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
schedule_interval="@daily",
start_date=days_ago(5),
tags=['example'],
Expand Down
14 changes: 6 additions & 8 deletions airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from airflow.utils.dates import days_ago

args = {
'owner': 'airflow',
}

dag = DAG(
dag_id='example_kylin_operator',
default_args=args,
schedule_interval=None,
start_date=days_ago(1),
tags=['example'],
Expand All @@ -57,8 +52,8 @@ def gen_build_time(**kwargs):
project='learn_kylin',
cube='kylin_sales_cube',
command='build',
start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
start_time=gen_build_time_task.output['date_start'],
end_time=gen_build_time_task.output['date_end'],
is_track_job=True,
dag=dag,
)
Expand Down Expand Up @@ -128,5 +123,8 @@ def gen_build_time(**kwargs):
dag=dag,
)

gen_build_time_task >> build_task1 >> build_task2 >> refresh_task1 >> merge_task
build_task1 >> build_task2 >> refresh_task1 >> merge_task
merge_task >> disable_task >> purge_task >> build_task3

# Task dependency created via `XComArgs`:
# gen_build_time_task >> build_task1
5 changes: 0 additions & 5 deletions airflow/providers/apache/pig/example_dags/example_pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
from airflow.providers.apache.pig.operators.pig import PigOperator
from airflow.utils.dates import days_ago

args = {
'owner': 'airflow',
}

dag = DAG(
dag_id='example_pig_operator',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago

args = {
'owner': 'Airflow',
}

with DAG(
dag_id='example_spark_operator',
default_args=args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
Expand Down

0 comments on commit 91f4d80

Please sign in to comment.