-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
8.3.2
Apache Airflow version
2.9.2
Operating System
rhel
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
Having specific typing elements that need to be imported generate an error when using @task.kubernetes task decorator.
See below for the specific of the errors.
What you think should happen instead
A solution could be to adjust the get_python_source function here : https://github.com/apache/airflow/blob/main/airflow/decorators/base.py#L301.
We could add a step to remove the typing from the code (for the parameters part but also, if wished, in the code).
I have a solve ready using ast and astunparse.
How to reproduce
from typing import Any
from airflow.decorators import dag, task
class MyObject:
var: str = 'foo'
POD_CONFIG = dict(
image='apache/airflow:slim-latest-python3.11',
image_pull_policy="Always",
namespace=None,
get_logs=True,
log_events_on_failure=True,
do_xcom_push=True,
)
@dag(
schedule=None,
)
def kpo_bug_test():
@task.kubernetes(**POD_CONFIG)
def my_task0(var='foo'):
pass
@task.kubernetes(**POD_CONFIG)
def my_task1(var: Any ='foo'):
print(var)
@task.kubernetes(**POD_CONFIG)
def my_task2(var: MyObject=MyObject()) :
pass
@task.kubernetes(**POD_CONFIG)
def my_task3():
var: Any = 'foo'
print(var)
my_task0()
my_task1()
my_task2()
my_task3()
kpo_bug_test()
I obtain the following results:

my_task0is supposed to run as proof that everything is finemy_task1is supposed to fail because thefrom typing import Anyis not added in the/tmp/scripts.pyfile. The interesting log portion:
[2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] + python /tmp/script.py /tmp/script.in /airflow/xcom/return.json
[2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] Traceback (most recent call last):
[2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] File "/tmp/script.py", line 19, in
[2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] def my_task1(var: Any ='foo'):
[2024-07-03, 11:48:09 UTC] {pod_manager.py:472} INFO - [base] ^^^
[2024-07-03, 11:48:09 UTC] {pod_manager.py:490} INFO - [base] NameError: name 'Any' is not defined. Did you mean: 'any'?
my_task2is also supposed to fail, because the definition of theMyObjectis not ported to/tmp/scripts.pyand may never be since the created pod migt not contain the class definition.
[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] + python /tmp/script.py /tmp/script.in /airflow/xcom/return.json
[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] Traceback (most recent call last):
[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] File "/tmp/script.py", line 19, in
[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] def my_task2(var: MyObject=MyObject()) :
[2024-07-03, 11:48:20 UTC] {pod_manager.py:472} INFO - [base] ^^^^^^^^
[2024-07-03, 11:48:20 UTC] {pod_manager.py:490} INFO - [base] NameError: name 'MyObject' is not defined. Did you mean: 'object'?
- For
my_task3, I was expecting a failure sincefrom typing import Anyis not in the/tmp/script.pyfile (as formy_task2). But it appears that is does not raise errors. Thus we do not need to solve this none existing problem.
Anything else
This issue should also appear for @task.docker. But I have not tested it.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct