Skip to content

Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator. #35179

@ImmortalLotus

Description

@ImmortalLotus

Apache Airflow version

2.7.2

What happened

airflow throws this error when trying to terminate a task that comes from a DAG that is created using DAG decorator and then it can't terminate the pod when using Kubernetes Executor.

It runs the task correctly, however when killing the pod it throws this error and then does not delete the pod.

exit log:

[2023-10-25T13:23:21.856+0000] {local_task_job_runner.py:228} INFO - Task exited with return code 0
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 60, in main
args.func(args)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 430, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 208, in _run_task_by_selected_method
return _run_task_by_local_task_job(args, ti)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 270, in _run_task_by_local_task_job
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airf
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line 318, in execute_job
ret = execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 192, in _execute
self.handle_task_exit(return_code)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 232, in handle_task_exit
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2754, in schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2393, in partial_subset
dag.task_dict = {
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2394, in <dictcomp>
t.task_id: _deepcopy_task(t)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2391, in _deepcopy_task
return copy.deepcopy(t, memo)
File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
y = copier(memo)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1214, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle 'module' object

What you think should happen instead

It should terminate correctly and delete the pod.

How to reproduce

Use the following class(that is still not perfect) to generate a dag like below. the purpose of this class is to encapsulate some common usage we have when working some speficic EL jobs:

from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from docker.types import Mount
from airflow.operators.dummy_operator import DummyOperator
from kubernetes.client import models as k8s
from airflow.models import Variable
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook

from typing import TYPE_CHECKING, Any

import httpx
import pendulum
from sqlalchemy import create_engine
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
if TYPE_CHECKING:
    from airflow.utils.context import Context
    

    # [START dag_decorator_usage]
class sql_dag():
    
    def __init__(self,nome_servidor_destino, servidor_origem_dict:dict, nome_banco_destino , nome_banco_origem, schema):
        conexao = MsSqlHook.get_connection(nome_servidor_destino)
        self.hook_banco_destino=MsSqlHook(mssql_conn_id=nome_servidor_destino)
        self.engine_destino = create_engine(f'mssql+pyodbc://{conexao.login}:{conexao.password}@{conexao.host}/{nome_banco_destino}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=YES')
        for chave, valor in servidor_origem_dict.items():
            if(valor=='Postgres'):
                self.hook_banco_origem=PostgresHook(postgres_conn_id=chave)
            elif(valor=='MSSQL'):
                self.hook_banco_origem=MsSqlHook(mssql_conn_id=chave)
            else:
                raise Exception("banco não suportado")
        self.schema = schema
        self.sql_template_padrao=f'use [{nome_banco_origem}] select * from '
        self.truncate_sql=f"""use [{nome_banco_destino}] truncate table """
        
    def configurar_dag(self,horario:str,nome:str, origem,acao,destino):   
        tabela_dict= Variable.get(nome, default_var={"erro":["erro"]}, deserialize_json=True)      
        
        def truncate_tabelas(**kwargs):
            sql3=kwargs['self'].truncate_sql+kwargs['tabela']
            kwargs['self'].hook_banco_destino.run(sql3)
        
        def insert_tabelas(**kwargs):
            sql4 = kwargs['self'].sql_template_padrao+kwargs['tabela']
            df =  kwargs['self'].hook_banco_origem.get_pandas_df(sql4)
            df.to_sql(kwargs['tabela'], schema=kwargs['self'].schema,con=kwargs['self'].engine_destino,if_exists='append', index=False)   
            
        @dag(
            schedule=horario,
            dag_id=nome,
            start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
            catchup=False,
            tags=[origem,acao,destino]
        )        
        def criar():     
            start = DummyOperator(
                task_id='comecaDag'
                )       
            end = DummyOperator(
                task_id='endDag'
                )
            with TaskGroup(group_id=f"insert_tabelas") as insert:
                se_estiver_vazio=DummyOperator(
                            task_id='pra_evitar_erros'
                            )  
                for chave,valor in tabela_dict.items():
                    if(valor=='padraoSemDtInsert'):
                        task_insert_tabelas = PythonOperator(task_id=f'insert_{chave}',
                            python_callable=insert_tabelas,
                            op_kwargs={"tabela":chave, "self":self}
                            )      
            with TaskGroup(group_id=f"truncate_tabelas") as truncate:
                se_estiver_vazio2=DummyOperator(
                            task_id='pra_evitar_erros2'
                            )  
                for chave, valor in tabela_dict.items():
                        task_truncate_tabelas = PythonOperator(task_id=f'truncate_{chave}',
                            python_callable=truncate_tabelas,
                            op_kwargs={"tabela":chave, "self":self}
                            )  
            start>>truncate>>insert>>end
        dag_teste=criar()

code to use the above class, that works when considering a MSSQL destination and a PGSQL/MSSQL Source:


from cagd_libs.BANCOS_SQL import sql_dag
from airflow.models import Variable

tabela_dict={"YourTable":"padraoSemDtInsert"
             }

nome_variavel="ID_Of_the_DAG_YOULL_CREATE"

Variable.set(key=nome_variavel, value=tabela_dict, serialize_json=True)
dag_sql=sql_dag(nome_servidor_destino='ID_OF_THE_DESTINATION_DATABASE_IN_AIRFLOW',
                servidor_origem_dict={'ID_OF_THE_SOURCE_DATABASE_IN_AIRFLOW':'Postgres'},
                nome_banco_destino='destination_database_name',
                nome_banco_origem='source_database_name',
                schema='database_schema')
teste_dag= dag_sql.configurar_dag('airflow_schedule',nome_variavel,"tags_that_we_use","tags_that_we_use","tags_that_we_use")

Operating System

Red Hat Openshift, airflow HelmChart

Versions of Apache Airflow Providers

pip install apache-airflow-providers-odbc
&& pip install apache-airflow-providers-microsoft-mssql

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

this error occus on every that that is creating using above class, regardless of whether it runs correctly or not, the pod throws the log cited above and then is not deleted

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions