From 6c565010111ab59cd2672d53d20d5700c48a4564 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 16 Jul 2024 13:18:47 +0200 Subject: [PATCH] Fix flaky async sensor test I think this one should finally fix the flaky test. The issue with it was that in Pytest parsing the test might occur a long time before executing it - depending how many tests are run between and how slow the test run. The original test calculated the reference time at parsing time and it assumed on hour ahead (when minutes where 0-ed were enough) for the "deferral" to happen. But if parsing was executed (thus reference time calculation happened) before end of hour and execution after end hour, then the +1 hour from beginning of the reference hour already passed. The fix is to move the reference time calculation to inside the test, so that it is not calculated a long time before the test is run. The test runs on public runners are generally slower "per container", that's why it happened more frequently on public runners. --- tests/sensors/test_time_delta.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py index 91a768b5b75c2..78ea7af49476b 100644 --- a/tests/sensors/test_time_delta.py +++ b/tests/sensors/test_time_delta.py @@ -34,7 +34,6 @@ DEFAULT_DATE = datetime(2015, 1, 1) DEV_NULL = "/dev/null" TEST_DAG_ID = "unit_tests" -REFERENCE_TIME = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0) class TestTimedeltaSensor: @@ -55,15 +54,17 @@ def setup_method(self): self.dag = DAG(TEST_DAG_ID, default_args=self.args) @pytest.mark.parametrize( - "data_interval_end, delta, should_deffer", - [ - (REFERENCE_TIME.add(hours=-1, minutes=-1), timedelta(hours=1), False), - (REFERENCE_TIME, timedelta(hours=1), True), - ], + "should_deffer", + [False, True], ) @mock.patch("airflow.models.baseoperator.BaseOperator.defer") - def test_timedelta_sensor(self, defer_mock, data_interval_end, delta, should_deffer): + def test_timedelta_sensor(self, defer_mock, should_deffer): + delta = timedelta(hours=1) op = TimeDeltaSensorAsync(task_id="timedelta_sensor_check", delta=delta, dag=self.dag) + if should_deffer: + data_interval_end = pendulum.now("UTC").add(hours=1) + else: + data_interval_end = pendulum.now("UTC").replace(microsecond=0, second=0, minute=0).add(hours=-1) op.execute({"data_interval_end": data_interval_end}) if should_deffer: defer_mock.assert_called_once()