Skip to content

TestLocalTaskJob.test_mark_success_no_kill fails consistently on MSSQL #17326

@jedcunningham

Description

@jedcunningham

Apache Airflow version: main

Environment: CI

What happened:

The TestLocalTaskJob.test_mark_success_no_kill test no longer passes on MSSQL. I initially thought it was a race condition, but even after 5 minutes the TI wasn't running.

for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
assert State.RUNNING == ti.state

I've tracked down that the issue was introduced with #16301 (cc @ephraimbuddy), but I haven't really dug into why.

How to reproduce it:

./breeze --backend mssql tests tests/jobs/test_local_task_job.py

_____________________________________________________________________________________ TestLocalTaskJob.test_mark_success_no_kill _____________________________________________________________________________________
                                                                                                                                                                                                                      
self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 0x7f54652abf10>                          
                                                                                                                                                                                                                      
    def test_mark_success_no_kill(self):                                                                   
        """                                                                                                                                                                                                           
        Test that ensures that mark_success in the UI doesn't cause                                                                                                                                                   
        the task to fail, and that the task exits                                                                                                                                                                     
        """                                                                                                                                                                                                           
        dagbag = DagBag(                                                                                                                                                                                              
            dag_folder=TEST_DAG_FOLDER,                                                                                                                                                                               
            include_examples=False,                                                                                                                                                                                   
        )                                                                                                  
        dag = dagbag.dags.get('test_mark_success')                                                                                                                                                                    
        task = dag.get_task('task1')                                                                                                                                                                                  
                                                                                                                                                                                                                      
        session = settings.Session()                                                                                                                                                                                  
                                                                                                           
        dag.clear()                                                                                                                                                                                                   
        dag.create_dagrun(                                                                                                                                                                                            
            run_id="test",                                                                                                                                                                                            
            state=State.RUNNING,                                                                                                                                                                                      
            execution_date=DEFAULT_DATE,                                                                                                                                                                              
            start_date=DEFAULT_DATE,                                                                                                                                                                                  
            session=session,                                                                                                                                                                                          
        )                                                                                                  
        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)                                                                                                                                                     
        ti.refresh_from_db()                                                                                                                                                                                          
        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)                                                                                                                                                   
        process = multiprocessing.Process(target=job1.run)                                                                                                                                                            
        process.start()                                                                                                                                                                                               
        for _ in range(0, 50):                                                                                                                                                                                        
            if ti.state == State.RUNNING:                                                                                                                                                                             
                break                                                                                      
            time.sleep(0.1)                                                                                                                                                                                           
            ti.refresh_from_db()                                                                                                                                                                                      
>       assert State.RUNNING == ti.state                                                                                                                                                                              
E       AssertionError: assert <TaskInstanceState.RUNNING: 'running'> == None                                                                                                                                         
E        +  where <TaskInstanceState.RUNNING: 'running'> = State.RUNNING                                   
E        +  and   None = <TaskInstance: test_mark_success.task1 2016-01-01 00:00:00+00:00 [None]>.state                                                                                                               
                                                                                                                                                                                                                      
tests/jobs/test_local_task_job.py:306: AssertionError                                                                                                                                                                 
------------------------------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------------------------------
INFO  [airflow.models.dagbag.DagBag] Filling up the DagBag from /opt/airflow/tests/dags                                                                                                                               
INFO  [root] class_instance type: <class 'unusual_prefix_5d280a9b385120fec3c40cfe5be04e2f41b6b5e8_test_task_view_type_check.CallableClass'>
INFO  [airflow.models.dagbag.DagBag] File /opt/airflow/tests/dags/test_zip.zip:file_no_airflow_dag.py assumed to contain no DAGs. Skipping.                                                                           
Process Process-1:                                                                                         
Traceback (most recent call last):                                                                                                                                                                                    
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2336, in _wrap_pool_connect 
    return fn()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 364, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 809, in _checkout
    result = pool._dialect.do_ping(fairy.connection)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 575, in do_ping
    cursor.execute(self._dialect_specific_select_one) 
pyodbc.ProgrammingError: ('42000', '[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]The server failed to resume the transaction. Desc:3400000012. (3971) (SQLExecDirectW)')

(truncated)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions