Skip to content

Commit

Permalink
Fix flaky on_kill (#20054)
Browse files Browse the repository at this point in the history
The previous fix in #20040 improved forked tests but also caused
instability in the "on_kill" test for standard task runner.

This PR fixes the instability by signalling when the task started
rather than waiting for fixed amount of time and it adds better
diagnostics for the test.
  • Loading branch information
potiuk committed Dec 5, 2021
1 parent 2391b19 commit e2345ff
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
10 changes: 9 additions & 1 deletion tests/dags/test_on_kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class DummyWithOnKill(DummyOperator):
def execute(self, context):
import os

self.log.info("Signalling that I am running")
# signal to the test that we've started
with open("/tmp/airflow_on_kill_running", "w") as f:
f.write("ON_KILL_RUNNING")
self.log.info("Signalled")

# This runs extra processes, so that we can be sure that we correctly
# tidy up all processes launched by a task when killing
if not os.fork():
Expand All @@ -34,11 +40,13 @@ def execute(self, context):

def on_kill(self):
self.log.info("Executing on_kill")
with open("/tmp/airflow_on_kill", "w") as f:
with open("/tmp/airflow_on_kill_killed", "w") as f:
f.write("ON_KILL_TEST")
self.log.info("Executed on_kill")


# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(dag_id='test_on_kill', start_date=datetime(2015, 1, 1))

dag1_task1 = DummyWithOnKill(task_id='task1', dag=dag1, owner='airflow')
44 changes: 29 additions & 15 deletions tests/task/task_runner/test_standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,14 @@ def test_on_kill(self):
Test that ensures that clearing in the UI SIGTERMS
the task
"""
path = "/tmp/airflow_on_kill"
path_on_kill_running = "/tmp/airflow_on_kill_running"
path_on_kill_killed = "/tmp/airflow_on_kill_killed"
try:
os.unlink(path)
os.unlink(path_on_kill_running)
except OSError:
pass
try:
os.unlink(path_on_kill_killed)
except OSError:
pass

Expand All @@ -207,27 +212,36 @@ def test_on_kill(self):
runner = StandardTaskRunner(job1)
runner.start()

# give the task some time to startup
with timeout(seconds=3):
while True:
runner_pgid = os.getpgid(runner.process.pid)
if runner_pgid == runner.process.pid:
break
time.sleep(0.01)

processes = list(self._procs_in_pgroup(runner_pgid))

logging.info("Waiting for the task to start")
with timeout(seconds=4):
while True:
if os.path.exists(path_on_kill_running):
break
time.sleep(0.01)
logging.info("Task started. Give the task some time to settle")
time.sleep(3)

pgid = os.getpgid(runner.process.pid)
assert pgid > 0
assert pgid != os.getpgid(0), "Task should be in a different process group to us"

processes = list(self._procs_in_pgroup(pgid))

logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group")
runner.terminate()

session.close() # explicitly close as `create_session`s commit will blow up otherwise

# Wait some time for the result
with timeout(seconds=40):
logging.info("Waiting for the on kill killed file to appear")
with timeout(seconds=4):
while True:
if os.path.exists(path):
if os.path.exists(path_on_kill_killed):
break
time.sleep(0.01)
logging.info("The file appeared")

with open(path) as f:
with open(path_on_kill_killed) as f:
assert "ON_KILL_TEST" == f.readline()

for process in processes:
Expand Down

0 comments on commit e2345ff

Please sign in to comment.