Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions airflow/providers/apache/beam/example_dags/example_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG for Apache Beam operators
"""
import os
from datetime import datetime
from urllib.parse import urlparse

from airflow import models
Expand All @@ -31,7 +32,7 @@
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/shakespeare/kinglear.txt')
Expand Down Expand Up @@ -73,18 +74,18 @@
GCS_JAR_FLINK_RUNNER_OBJECT_NAME = GCS_JAR_FLINK_RUNNER_PARTS.path[1:]


default_args = {
'default_pipeline_options': {
'output': '/tmp/example_beam',
},
"trigger_rule": "all_done",
DEFAULT_ARGS = {
'default_pipeline_options': {'output': '/tmp/example_beam'},
'trigger_rule': TriggerRule.ALL_DONE,
}
START_DATE = datetime(2021, 1, 1)


with models.DAG(
"example_beam_native_java_direct_runner",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag_native_java_direct_runner:

Expand Down Expand Up @@ -112,7 +113,8 @@
with models.DAG(
"example_beam_native_java_dataflow_runner",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag_native_java_dataflow_runner:
# [START howto_operator_start_java_dataflow_runner_pipeline]
Expand Down Expand Up @@ -142,7 +144,8 @@
with models.DAG(
"example_beam_native_java_spark_runner",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag_native_java_spark_runner:

Expand All @@ -169,7 +172,8 @@
with models.DAG(
"example_beam_native_java_flink_runner",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag_native_java_flink_runner:

Expand All @@ -196,9 +200,10 @@

with models.DAG(
"example_beam_native_python",
default_args=default_args,
start_date=days_ago(1),
start_date=START_DATE,
schedule_interval=None, # Override to match your needs
catchup=False,
default_args=DEFAULT_ARGS,
tags=['example'],
) as dag_native_python:

Expand Down Expand Up @@ -280,9 +285,10 @@

with models.DAG(
"example_beam_native_python_dataflow_async",
default_args=default_args,
start_date=days_ago(1),
default_args=DEFAULT_ARGS,
start_date=START_DATE,
schedule_interval=None, # Override to match your needs
catchup=False,
tags=['example'],
) as dag_native_python_dataflow_async:
# [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,22 @@
Example Airflow DAG to check if a Cassandra Table and a Records exists
or not using `CassandraTableSensor` and `CassandraRecordSensor`.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from airflow.utils.dates import days_ago

# [START howto_operator_cassandra_sensors]
with DAG(
dag_id='example_cassandra_operator',
schedule_interval=None,
start_date=days_ago(2),
start_date=datetime(2021, 1, 1),
default_args={'table': 'keyspace_name.table_name'},
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_cassandra_table_sensor]
table_sensor = CassandraTableSensor(
task_id="cassandra_table_sensor",
cassandra_conn_id="cassandra_default",
table="keyspace_name.table_name",
)
# [END howto_operator_cassandra_table_sensor]
table_sensor = CassandraTableSensor(task_id="cassandra_table_sensor")

# [START howto_operator_cassandra_record_sensor]
record_sensor = CassandraRecordSensor(
task_id="cassandra_record_sensor",
cassandra_conn_id="cassandra_default",
table="keyspace_name.table_name",
keys={"p1": "v1", "p2": "v2"},
)
# [END howto_operator_cassandra_record_sensor]
record_sensor = CassandraRecordSensor(task_id="cassandra_record_sensor", keys={"p1": "v1", "p2": "v2"})
# [END howto_operator_cassandra_sensors]
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
"""
Example Airflow DAG to execute SQL in an Apache Drill environment using the `DrillOperator`.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.drill.operators.drill import DrillOperator
from airflow.utils.dates import days_ago

with DAG(
dag_id='example_drill_dag',
schedule_interval=None,
start_date=days_ago(2),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_drill]
Expand Down
12 changes: 5 additions & 7 deletions airflow/providers/apache/druid/example_dags/example_druid_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
"""
Example Airflow DAG to submit Apache Druid json index file using `DruidOperator`
"""
from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.druid.operators.druid import DruidOperator
from airflow.utils.dates import days_ago

with DAG(
dag_id='example_druid_operator',
schedule_interval=None,
start_date=days_ago(2),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_druid_submit]
submit_job = DruidOperator(
task_id='spark_submit_job',
json_index_file='json_index.json',
druid_ingest_conn_id='druid_ingest_default',
)
submit_job = DruidOperator(task_id='spark_submit_job', json_index_file='json_index.json')
# Example content of json_index.json:
JSON_INDEX_STR = """
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
"""
This is an example dag for managing twitter data.
"""
from datetime import date, timedelta
from datetime import date, datetime, timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago


@task
Expand Down Expand Up @@ -77,8 +76,9 @@ def transfer_to_db():
'retries': 1,
},
schedule_interval="@daily",
start_date=days_ago(5),
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
fetch_tweets = fetch_tweets()
clean_tweets = clean_tweets()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
This is an example DAG which uses the KylinCubeOperator.
The tasks below include kylin build, refresh, merge operation.
"""
from datetime import datetime

from airflow import DAG
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from airflow.utils.dates import days_ago

dag = DAG(
dag_id='example_kylin_operator',
schedule_interval=None,
start_date=days_ago(1),
default_args={'kylin_conn_id': 'kylin_default', 'project': 'learn_kylin', 'cube': 'kylin_sales_cube'},
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={'project': 'learn_kylin', 'cube': 'kylin_sales_cube'},
tags=['example'],
)

Expand Down
21 changes: 5 additions & 16 deletions airflow/providers/apache/livy/example_dags/example_livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,29 @@
The tasks below trigger the computation of pi on the Spark instance
using the Java and Python executables provided in the example library.
"""
from datetime import datetime

from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
from airflow.utils.dates import days_ago

args = {'owner': 'airflow', 'email': ['airflow@example.com'], 'depends_on_past': False}

with DAG(
dag_id='example_livy_operator',
default_args=args,
default_args={'args': [10]},
schedule_interval='@daily',
start_date=days_ago(5),
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:

livy_java_task = LivyOperator(
task_id='pi_java_task',
dag=dag,
livy_conn_id='livy_conn_default',
file='/spark-examples.jar',
args=[10],
num_executors=1,
conf={
'spark.shuffle.compress': 'false',
},
class_name='org.apache.spark.examples.SparkPi',
)

livy_python_task = LivyOperator(
task_id='pi_python_task',
dag=dag,
livy_conn_id='livy_conn_default',
file='/pi.py',
args=[10],
polling_interval=60,
)
livy_python_task = LivyOperator(task_id='pi_python_task', file='/pi.py', polling_interval=60)

livy_java_task >> livy_python_task
5 changes: 3 additions & 2 deletions airflow/providers/apache/pig/example_dags/example_pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
# under the License.

"""Example DAG demonstrating the usage of the PigOperator."""
from datetime import datetime

from airflow import DAG
from airflow.providers.apache.pig.operators.pig import PigOperator
from airflow.utils.dates import days_ago

dag = DAG(
dag_id='example_pig_operator',
schedule_interval=None,
start_date=days_ago(2),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
Example Airflow DAG to submit Apache Spark applications using
`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago

with DAG(
dag_id='example_spark_operator',
schedule_interval=None,
start_date=days_ago(2),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_spark_submit]
Expand Down
19 changes: 7 additions & 12 deletions docs/apache-airflow-providers-apache-cassandra/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ Waiting for a Table to be created

The :class:`~airflow.providers.apache.cassandra.sensors.table.CassandraTableSensor` operator is used to check for the existence of a table in a Cassandra cluster.

Use the ``table`` parameter to poke until the provided table is found. Use dot notation to target a specific keyspace.

.. exampleinclude:: /../../airflow/providers/apache/cassandra/example_dags/example_cassandra_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cassandra_table_sensor]
:end-before: [END howto_operator_cassandra_table_sensor]

Use the ``table`` parameter (set in ``default_args`` in the example below) to poke until the provided table is found. Use dot notation to target a specific keyspace.

.. _howto/operator:CassandraRecordSensor:

Expand All @@ -50,15 +43,17 @@ Waiting for a Record to be created

The :class:`~airflow.providers.apache.cassandra.sensors.record.CassandraRecordSensor` operator is used to check for the existence of a record of a table in the Cassandra cluster.

Use the ``table`` parameter to mention the keyspace and table for the record. Use dot notation to target a specific keyspace.
Use the ``table`` parameter (set in ``default_args`` in the example below) to mention the keyspace and table for the record. Use dot notation to target a specific keyspace.

Use the ``keys`` parameter to poke until the provided record is found. The existence of record is identified using key value pairs. In the given example, we're are looking for value ``v1`` in column ``p1`` and ``v2`` in column ``p2``.

Example use of these sensors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/cassandra/example_dags/example_cassandra_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cassandra_record_sensor]
:end-before: [END howto_operator_cassandra_record_sensor]
:start-after: [START howto_operator_cassandra_sensors]
:end-before: [END howto_operator_cassandra_sensors]

Reference
^^^^^^^^^
Expand Down