-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
QuarantineIssues that are occasionally failing and are quarantinedIssues that are occasionally failing and are quarantinedkind:bugThis is a clearly a bugThis is a clearly a bug
Description
This test is flaky and fails sometimes
____________ TestLocalTaskJob.test_task_sigkill_works_with_retries _____________
self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 0x7f3a8ec6c6a0>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7f3aa1490080>
def test_task_sigkill_works_with_retries(self, dag_maker):
"""
Test that ensures that tasks are retried when they receive sigkill
"""
# use shared memory value so we can properly track value change even if
# it's been updated across processes.
retry_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()
def retry_callback(context):
with shared_mem_lock:
retry_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_failure_2'
def task_function(ti):
os.kill(os.getpid(), signal.SIGKILL)
# This should not happen -- the state change should be noticed and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0
with dag_maker(
dag_id='test_mark_failure_2', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}
):
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=1,
retry_delay=timedelta(seconds=2),
on_retry_callback=retry_callback,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.start()
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
time.sleep(0.4)
process.join(timeout=10)
ti.refresh_from_db()
> assert ti.state == State.UP_FOR_RETRY
E AssertionError: assert None == <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'>
E + where None = <TaskInstance: test_mark_failure_2.test_on_failure 2016-01-01 00:00:00+00:00 [None]>.state
E + and <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> = State.UP_FOR_RETRY
tests/jobs/test_local_task_job.py:778: AssertionError
----------------------------- Captured stdout call -----------------------------
Running <TaskInstance: test_mark_failure_2.test_on_failure 2016-01-01T00:00:00+00:00 [None]> on host 9a517fc8fb98
----------------------------- Captured stderr call -----------------------------
Process Process-116:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'test_on_failure-test_mark_failure_2-2016-01-01 00:00:00.000000' for key 'PRIMARY'")
Metadata
Metadata
Assignees
Labels
QuarantineIssues that are occasionally failing and are quarantinedIssues that are occasionally failing and are quarantinedkind:bugThis is a clearly a bugThis is a clearly a bug