Skip to content

Commit

Permalink
TimeSensor should respect DAG timezone (#9882)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9c518fe)
  • Loading branch information
22quinn authored and kaxil committed Nov 20, 2020
1 parent 652b0c1 commit cad03ba
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
6 changes: 3 additions & 3 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ https://developers.google.com/style/inclusive-documentation
-->
## Airflow 1.10.13

### TimeSensor will consider default_timezone setting.
### TimeSensor is now timezone aware

Previously `TimeSensor` always compared the `target_time` with the current time in UTC.

Now it will compare `target_time` with the current time in the timezone set by `default_timezone` under the `core`
section of the config.
Now it will compare `target_time` with the current time in the timezone of the DAG,
defaulting to the `default_timezone` in the global config.

### Removed Kerberos support for HDFS hook

Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ def __init__(self, target_time, *args, **kwargs):

def poke(self, context):
self.log.info('Checking if the time (%s) has come', self.target_time)
return timezone.make_naive(timezone.utcnow()).time() > self.target_time
return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time
52 changes: 52 additions & 0 deletions tests/sensors/test_time_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, time

import pendulum
from parameterized import parameterized

from airflow.models.dag import DAG
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils import timezone
from tests.compat import patch

DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
DEFAULT_DATE_WITH_TZ = datetime(
2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)
)


@patch(
"airflow.sensors.time_sensor.timezone.utcnow",
return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
)
class TestTimeSensor:
@parameterized.expand(
[
("UTC", DEFAULT_DATE_WO_TZ, True),
("UTC", DEFAULT_DATE_WITH_TZ, False),
(DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
]
)
def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
dag = DAG("test", default_args={"start_date": start_date})
op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
assert op.poke(None) == expected

0 comments on commit cad03ba

Please sign in to comment.