From e2345ffca9013de8dedaa6c75dbecb48c073353f Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 5 Dec 2021 20:06:26 +0100 Subject: [PATCH] Fix flaky on_kill (#20054) The previous fix in #20040 improved forked tests but also caused instability in the "on_kill" test for standard task runner. This PR fixes the instability by signalling when the task started rather than waiting for fixed amount of time and it adds better diagnostics for the test. --- tests/dags/test_on_kill.py | 10 ++++- .../task_runner/test_standard_task_runner.py | 44 ++++++++++++------- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/tests/dags/test_on_kill.py b/tests/dags/test_on_kill.py index 3e53df69a65a1..5b6a4e340a0cf 100644 --- a/tests/dags/test_on_kill.py +++ b/tests/dags/test_on_kill.py @@ -26,6 +26,12 @@ class DummyWithOnKill(DummyOperator): def execute(self, context): import os + self.log.info("Signalling that I am running") + # signal to the test that we've started + with open("/tmp/airflow_on_kill_running", "w") as f: + f.write("ON_KILL_RUNNING") + self.log.info("Signalled") + # This runs extra processes, so that we can be sure that we correctly # tidy up all processes launched by a task when killing if not os.fork(): @@ -34,11 +40,13 @@ def execute(self, context): def on_kill(self): self.log.info("Executing on_kill") - with open("/tmp/airflow_on_kill", "w") as f: + with open("/tmp/airflow_on_kill_killed", "w") as f: f.write("ON_KILL_TEST") + self.log.info("Executed on_kill") # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it dag1 = DAG(dag_id='test_on_kill', start_date=datetime(2015, 1, 1)) + dag1_task1 = DummyWithOnKill(task_id='task1', dag=dag1, owner='airflow') diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py index c2e90695dca54..34054106f68e8 100644 --- a/tests/task/task_runner/test_standard_task_runner.py +++ b/tests/task/task_runner/test_standard_task_runner.py @@ -179,9 +179,14 @@ def test_on_kill(self): Test that ensures that clearing in the UI SIGTERMS the task """ - path = "/tmp/airflow_on_kill" + path_on_kill_running = "/tmp/airflow_on_kill_running" + path_on_kill_killed = "/tmp/airflow_on_kill_killed" try: - os.unlink(path) + os.unlink(path_on_kill_running) + except OSError: + pass + try: + os.unlink(path_on_kill_killed) except OSError: pass @@ -207,27 +212,36 @@ def test_on_kill(self): runner = StandardTaskRunner(job1) runner.start() - # give the task some time to startup + with timeout(seconds=3): + while True: + runner_pgid = os.getpgid(runner.process.pid) + if runner_pgid == runner.process.pid: + break + time.sleep(0.01) + + processes = list(self._procs_in_pgroup(runner_pgid)) + + logging.info("Waiting for the task to start") + with timeout(seconds=4): + while True: + if os.path.exists(path_on_kill_running): + break + time.sleep(0.01) + logging.info("Task started. Give the task some time to settle") time.sleep(3) - - pgid = os.getpgid(runner.process.pid) - assert pgid > 0 - assert pgid != os.getpgid(0), "Task should be in a different process group to us" - - processes = list(self._procs_in_pgroup(pgid)) - + logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group") runner.terminate() - session.close() # explicitly close as `create_session`s commit will blow up otherwise - # Wait some time for the result - with timeout(seconds=40): + logging.info("Waiting for the on kill killed file to appear") + with timeout(seconds=4): while True: - if os.path.exists(path): + if os.path.exists(path_on_kill_killed): break time.sleep(0.01) + logging.info("The file appeared") - with open(path) as f: + with open(path_on_kill_killed) as f: assert "ON_KILL_TEST" == f.readline() for process in processes: