Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace use_task_execution_date with use_task_logical_date #23983

Merged
merged 3 commits into from
May 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions airflow/operators/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import datetime
import warnings
from typing import Iterable, Union

from airflow.exceptions import AirflowException
Expand All @@ -39,7 +40,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
``datetime.datetime.now()`` falls below target_lower or above ``target_upper``.
:param target_lower: target lower bound.
:param target_upper: target upper bound.
:param use_task_execution_date: If ``True``, uses task's execution day to compare with targets.
:param use_task_logical_date: If ``True``, uses task's logical date to compare with targets.
Execution date is useful for backfilling. If ``False``, uses system's date.
"""

Expand All @@ -50,6 +51,7 @@ def __init__(
follow_task_ids_if_false: Union[str, Iterable[str]],
target_lower: Union[datetime.datetime, datetime.time, None],
target_upper: Union[datetime.datetime, datetime.time, None],
use_task_logical_date: bool = False,
use_task_execution_date: bool = False,
**kwargs,
) -> None:
Expand All @@ -64,10 +66,17 @@ def __init__(
self.target_upper = target_upper
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.use_task_execution_date = use_task_execution_date
self.use_task_logical_date = use_task_logical_date
if use_task_execution_date:
eladkal marked this conversation as resolved.
Show resolved Hide resolved
self.use_task_logical_date = use_task_execution_date
warnings.warn(
"Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)

def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
if self.use_task_execution_date is True:
if self.use_task_logical_date:
now = timezone.make_naive(context["logical_date"], self.dag.timezone)
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
Expand Down
16 changes: 12 additions & 4 deletions airflow/operators/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import warnings
from typing import Iterable, Union

from airflow.operators.branch import BaseBranchOperator
Expand All @@ -41,7 +41,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``

:param use_task_execution_day: If ``True``, uses task's execution day to compare
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
"""
Expand All @@ -52,18 +52,26 @@ def __init__(
follow_task_ids_if_true: Union[str, Iterable[str]],
follow_task_ids_if_false: Union[str, Iterable[str]],
week_day: Union[str, Iterable[str]],
use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
if self.use_task_execution_day:
if self.use_task_logical_date:
now = context["logical_date"]
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
Expand Down
22 changes: 15 additions & 7 deletions airflow/sensors/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings

from airflow.sensors.base import BaseSensorOperator
from airflow.utils import timezone
Expand All @@ -33,15 +34,15 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day='Saturday',
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)

**Example** (with multiple day using set): ::

weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={'Saturday', 'Sunday'},
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)

**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
Expand All @@ -52,7 +53,7 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)

:param week_day: Day of the week to check (full name). Optionally, a set
Expand All @@ -64,16 +65,23 @@ class DayOfWeekSensor(BaseSensorOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``

:param use_task_execution_day: If ``True``, uses task's execution day to compare
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with week_day. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
"""

def __init__(self, *, week_day, use_task_execution_day=False, **kwargs):
def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
super().__init__(**kwargs)
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def poke(self, context: Context):
Expand All @@ -82,7 +90,7 @@ def poke(self, context: Context):
self.week_day,
WeekDay(timezone.utcnow().isoweekday()).name,
)
if self.use_task_execution_day:
if self.use_task_logical_date:
return context['logical_date'].isoweekday() in self._week_day_num
else:
return timezone.utcnow().isoweekday() in self._week_day_num
21 changes: 19 additions & 2 deletions tests/operators/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import unittest

import freezegun
import pytest

from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance as TI
Expand Down Expand Up @@ -225,10 +226,10 @@ def test_branch_datetime_operator_lower_comparison_outside_range(self):
)

@freezegun.freeze_time("2020-12-01 09:00:00")
def test_branch_datetime_operator_use_task_execution_date(self):
def test_branch_datetime_operator_use_task_logical_date(self):
"""Check if BranchDateTimeOperator uses task execution date"""
in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
self.branch_op.use_task_execution_date = True
self.branch_op.use_task_logical_date = True
self.dr = self.dag.create_dagrun(
run_id='manual_exec_date__',
start_date=in_between_date,
Expand All @@ -249,3 +250,19 @@ def test_branch_datetime_operator_use_task_execution_date(self):
'branch_2': State.SKIPPED,
}
)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDateTimeOperator(
task_id='warning',
follow_task_ids_if_true='branch_1',
follow_task_ids_if_false='branch_2',
target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
use_task_execution_date=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
19 changes: 17 additions & 2 deletions tests/operators/test_weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ def test_branch_follow_true(self, _, weekday):

@freeze_time("2021-01-25") # Monday
def test_branch_follow_true_with_execution_date(self):
"""Checks if BranchDayOfWeekOperator follows true branch when set use_task_execution_day"""
"""Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date"""

branch_op = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Wednesday",
use_task_execution_day=True, # We compare to DEFAULT_DATE which is Wednesday
use_task_logical_date=True, # We compare to DEFAULT_DATE which is Wednesday
dag=self.dag,
)

Expand Down Expand Up @@ -274,3 +274,18 @@ def test_branch_xcom_push_true_branch(self):
for ti in tis:
if ti.task_id == 'make_choice':
assert ti.xcom_pull(task_ids='make_choice') == 'branch_1'

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDayOfWeekOperator(
task_id="week_day_warn",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Monday",
use_task_execution_day=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
25 changes: 20 additions & 5 deletions tests/sensors/test_weekday_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def tearDown(self):
)
def test_weekday_sensor_true(self, _, week_day):
op = DayOfWeekSensor(
task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag
task_id='weekday_sensor_check_true', week_day=week_day, use_task_logical_date=True, dag=self.dag
)
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
assert op.week_day == week_day
Expand All @@ -83,7 +83,7 @@ def test_weekday_sensor_false(self):
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
Expand All @@ -95,7 +95,7 @@ def test_invalid_weekday_number(self):
DayOfWeekSensor(
task_id='weekday_sensor_invalid_weekday_num',
week_day=invalid_week_day,
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)

Expand All @@ -110,7 +110,7 @@ def test_weekday_sensor_with_invalid_type(self):
DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=invalid_week_day,
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)

Expand All @@ -120,8 +120,23 @@ def test_weekday_sensor_timeout_with_set(self):
poke_interval=1,
timeout=2,
week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
DayOfWeekSensor(
task_id='week_day_warn',
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)