Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add examples and howtos about sensors #27333

Merged
merged 1 commit into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from datetime import datetime, timedelta

import pendulum
from pytz import UTC

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.bash import BashSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.sensors.time_sensor import TimeSensor, TimeSensorAsync
from airflow.sensors.weekday import DayOfWeekSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weekday import WeekDay


# [START example_callables]
def success_callable():
return True


def failure_callable():
return False


# [END example_callables]


with DAG(
dag_id='example_sensors',
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
# [START example_time_delta_sensor]
t0 = TimeDeltaSensor(task_id='wait_some_seconds', delta=timedelta(seconds=2))
# [END example_time_delta_sensor]

# [START example_time_delta_sensor_async]
t0a = TimeDeltaSensorAsync(task_id='wait_some_seconds_async', delta=timedelta(seconds=2))
# [END example_time_delta_sensor_async]

# [START example_time_sensors]
t1 = TimeSensor(task_id='fire_immediately', target_time=datetime.now(tz=UTC).time())

t2 = TimeSensor(
task_id='timeout_after_second_date_in_the_future',
timeout=1,
soft_fail=True,
target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(),
)
# [END example_time_sensors]

# [START example_time_sensors_async]
t1a = TimeSensorAsync(task_id='fire_immediately_async', target_time=datetime.now(tz=UTC).time())

t2a = TimeSensorAsync(
task_id='timeout_after_second_date_in_the_future_async',
timeout=1,
soft_fail=True,
target_time=(datetime.now(tz=UTC) + timedelta(hours=1)).time(),
)
# [END example_time_sensors_async]

# [START example_bash_sensors]
t3 = BashSensor(task_id='Sensor_succeeds', bash_command='exit 0')

t4 = BashSensor(task_id='Sensor_fails_after_3_seconds', timeout=3, soft_fail=True, bash_command='exit 1')
# [END example_bash_sensors]

t5 = BashOperator(task_id='remove_file', bash_command='rm -rf /tmp/temporary_file_for_testing')

# [START example_file_sensor]
t6 = FileSensor(task_id='wait_for_file', filepath="/tmp/temporary_file_for_testing")
# [END example_file_sensor]

t7 = BashOperator(
task_id='create_file_after_3_seconds', bash_command='sleep 3; touch /tmp/temporary_file_for_testing'
)

# [START example_python_sensors]
t8 = PythonSensor(task_id='success_sensor_python', python_callable=success_callable)

t9 = PythonSensor(
task_id='failure_timeout_sensor_python', timeout=3, soft_fail=True, python_callable=failure_callable
)
# [END example_python_sensors]

# [START example_day_of_week_sensor]
t10 = DayOfWeekSensor(
task_id='week_day_sensor_failing_on_timeout', timeout=3, soft_fail=True, week_day=WeekDay.MONDAY
)
# [END example_day_of_week_sensor]

tx = BashOperator(task_id='print_date_in_bash', bash_command='date')

tx.trigger_rule = TriggerRule.NONE_FAILED
[t0, t0a, t1, t1a, t2, t2a, t3, t4] >> tx
t5 >> t6 >> tx
t7 >> tx
[t8, t9] >> tx
t10 >> tx
5 changes: 5 additions & 0 deletions airflow/sensors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class BashSensor(BaseSensorOperator):
of inheriting the current process environment, which is the default
behavior. (templated)
:param output_encoding: output encoding of bash command.

.. seealso::
For more information on how to use this sensor,take a look at the guide:
:ref:`howto/operator:BashSensor`

"""

template_fields: Sequence[str] = ('bash_command', 'env')
Expand Down
6 changes: 6 additions & 0 deletions airflow/sensors/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class FileSensor(BaseSensorOperator):
the base path set within the connection), can be a glob.
:param recursive: when set to ``True``, enables recursive directory matching behavior of
``**`` in glob filepath parameter. Defaults to ``False``.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:FileSensor`


"""

template_fields: Sequence[str] = ('filepath',)
Expand Down
4 changes: 4 additions & 0 deletions airflow/sensors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class PythonSensor(BaseSensorOperator):
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:PythonSensor`
"""

template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
Expand Down
11 changes: 11 additions & 0 deletions airflow/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ class TimeDeltaSensor(BaseSensorOperator):
Waits for a timedelta after the run's data interval.

:param delta: time length to wait after the data interval before succeeding.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
:ref:`howto/operator:TimeDeltaSensor`


"""

def __init__(self, *, delta, **kwargs):
Expand All @@ -47,6 +53,11 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
taking up a worker slot while it is waiting.

:param delta: time length to wait after the data interval before succeeding.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:TimeDeltaSensorAsync`

"""

def execute(self, context: Context):
Expand Down
9 changes: 9 additions & 0 deletions airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class TimeSensor(BaseSensorOperator):
Waits until the specified time of the day.

:param target_time: time after which the job succeeds

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:TimeSensor`

"""

def __init__(self, *, target_time, **kwargs):
Expand All @@ -47,6 +52,10 @@ class TimeSensorAsync(BaseSensorOperator):
it is waiting.

:param target_time: time after which the job succeeds

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:TimeSensorAsync`
"""

def __init__(self, *, target_time, **kwargs):
Expand Down
5 changes: 5 additions & 0 deletions airflow/sensors/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ class DayOfWeekSensor(BaseSensorOperator):
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
:param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:DayOfWeekSensor`

"""

def __init__(
Expand Down
15 changes: 15 additions & 0 deletions docs/apache-airflow/howto/operator/bash.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,18 @@ Example:
bash_command="test.sh ",
dag=dag,
)


.. _howto/operator:BashSensor:

BashSensor
==========

Use the :class:`~airflow.sensors.bash.BashSensor` to use arbitrary command for sensing. The command
should return 0 when it succeeds, any other value otherwise.

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_bash_sensors]
:end-before: [END example_bash_sensors]
33 changes: 33 additions & 0 deletions docs/apache-airflow/howto/operator/file.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.



.. _howto/operator:FileSensor:

FileSensor
==========

Use the :class:`~airflow.sensors.filesystem.FileSensor` to detect files appearing your local
filesystem. You need to have connection defined to use it (pass connection id via ``fs_conn_id``).
Default connection is ``fs_default``.

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_file_sensor]
:end-before: [END example_file_sensor]
2 changes: 2 additions & 0 deletions docs/apache-airflow/howto/operator/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ determine what actually executes when your DAG runs.

bash
datetime
file
python
time
weekday
external_task_sensor
15 changes: 14 additions & 1 deletion docs/apache-airflow/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ tasks have completed running regardless of status (i.e. the ``TriggerRule.ALL_DO
:end-before: [END howto_operator_short_circuit_trigger_rules]



Passing in arguments
^^^^^^^^^^^^^^^^^^^^

Expand All @@ -192,3 +191,17 @@ Templating
^^^^^^^^^^

Jinja templating can be used in same way as described for the PythonOperator.

.. _howto/operator:PythonSensor:

PythonSensor
============

Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary callable for sensing. The callable
should return True when it succeeds, False otherwise.

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_python_sensors]
:end-before: [END example_python_sensors]
78 changes: 78 additions & 0 deletions docs/apache-airflow/howto/operator/time.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.



.. _howto/operator:TimeDeltaSensor:

TimeDeltaSensor
===============

Use the :class:`~airflow.sensors.time_delta.TimeDeltaSensor` to end sensing after specific time.


.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_time_delta_sensor]
:end-before: [END example_time_delta_sensor]


.. _howto/operator:TimeDeltaSensorAsync:

TimeDeltaSensorAsync
====================

Use the :class:`~airflow.sensors.time_delta.TimeDeltaSensorAsync` to end sensing after specific time.
It is an async version of the operator and requires Triggerer to run.


.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_time_delta_sensor_async]
:end-before: [END example_time_delta_sensor_async]



.. _howto/operator:TimeSensor:

TimeSensor
==========

Use the :class:`~airflow.sensors.time_sensor.TimeSensor` to end sensing after time specified.

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_time_sensors]
:end-before: [END example_time_sensors]


.. _howto/operator:TimeSensorAsync:

TimeSensorAsync
===============

Use the :class:`~airflow.sensors.time_sensor.TimeSensorAsync` to end sensing after time specified.
It is an async version of the operator and requires Triggerer to run.

.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
:language: python
:dedent: 4
:start-after: [START example_time_sensors_async]
:end-before: [END example_time_sensors_async]
Loading