-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.2.0 (latest released)
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-postgres==2.3.0
apache-airflow-providers-sqlite==2.0.1
Deployment
Other Docker-based deployment
Deployment details
The host machine runs Windows 10 Enterprise build 19043.1237. The version of Docker Desktop is 4.1.1 (69879). Docker image is based on python:3.8-slim, and it's a single-machine deployment.
What happened
If runs a python script with BashOperator and supplies any "env" argument to BashOperator, the sys.executable variable of the python session becomes None
What you expected to happen
sys.executable should populates the path of python executable when using BashOperator with "env" argument
How to reproduce
I was running the following DAG
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id='testdag',
start_date=datetime.datetime(2021,10,1),
schedule_interval='0 11 * * *', # daily 11 AM UTC
catchup=False
) as dag:
test_task = BashOperator(
task_id = 'test_task'
,bash_command= 'echo $(python3 -c "import sys;print(sys.executable)")'
,env={'test':'test'}
)
And the log showed:
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: testdag.test_task manual__2021-10-22T16:02:07.768715+00:00 [queued]>
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: testdag.test_task manual__2021-10-22T16:02:07.768715+00:00 [queued]>
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1242} INFO - Starting attempt 1 of 1
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2021-10-22, 12:02:09 EDT] {taskinstance.py:1262} INFO - Executing <Task(BashOperator): test_task> on 2021-10-22 16:02:07.768715+00:00
[2021-10-22, 12:02:09 EDT] {standard_task_runner.py:52} INFO - Started process 15435 to run task
[2021-10-22, 12:02:09 EDT] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'testdag', 'test_task', 'manual__2021-10-22T16:02:07.768715+00:00', '--job-id', '31', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpwzxrybgg', '--error-file', '/tmp/tmpqqsq91p4']
[2021-10-22, 12:02:09 EDT] {standard_task_runner.py:77} INFO - Job 31: Subtask test_task
[2021-10-22, 12:02:09 EDT] {logging_mixin.py:109} INFO - Running <TaskInstance: testdag.test_task manual__2021-10-22T16:02:07.768715+00:00 [running]> on host 8f4f12aea4ed
[2021-10-22, 12:02:10 EDT] {taskinstance.py:1412} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=testdag
AIRFLOW_CTX_TASK_ID=test_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-22T16:02:07.768715+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-10-22T16:02:07.768715+00:00
[2021-10-22, 12:02:10 EDT] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2021-10-22, 12:02:10 EDT] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'echo $(python3 -c "import sys;print(sys.executable)")']
[2021-10-22, 12:02:10 EDT] {subprocess.py:85} INFO - Output:
[2021-10-22, 12:02:10 EDT] {subprocess.py:89} INFO -
[2021-10-22, 12:02:10 EDT] {subprocess.py:93} INFO - Command exited with return code 0
[2021-10-22, 12:02:10 EDT] {taskinstance.py:1270} INFO - Marking task as SUCCESS. dag_id=testdag, task_id=test_task, execution_date=20211022T160207, start_date=20211022T160209, end_date=20211022T160210
[2021-10-22, 12:02:10 EDT] {local_task_job.py:154} INFO - Task exited with return code 0
[2021-10-22, 12:02:10 EDT] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
In my tests, as long as something is passed through "env" argument to the BashOperator which runs a python scripts, the spawned python session will have the sys.executable variable as None.
Anything else
The actual python script I tried to run is using Snowflake.connector library. The library tries to identify the underlying OS/platform using platform.libc_ver(sys.executable) function call. Very weirdly, in this case, the value of sys.executable becomes the temporary execution directory (such as /tmp/airflowtmpyes4ub8l in the following log). Not sure if this info helps.
DAG
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id='testdag',
start_date=datetime.datetime(2021,10,1),
schedule_interval='0 11 * * *', # daily 11 AM UTC
catchup=False
) as dag:
test_task = BashOperator(
task_id = 'test_task'
,bash_command = 'python3 -c "import snowflake.connector"'
,env={'test':'test'}
The log
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: testdag.test_task manual__2021-10-22T16:34:48.077920+00:00 [queued]>
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: testdag.test_task manual__2021-10-22T16:34:48.077920+00:00 [queued]>
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1242} INFO - Starting attempt 1 of 1
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1262} INFO - Executing <Task(BashOperator): test_task> on 2021-10-22 16:34:48.077920+00:00
[2021-10-22, 12:34:52 EDT] {standard_task_runner.py:52} INFO - Started process 17378 to run task
[2021-10-22, 12:34:52 EDT] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'testdag', 'test_task', 'manual__2021-10-22T16:34:48.077920+00:00', '--job-id', '37', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmp0smgg6b6', '--error-file', '/tmp/tmpa9trmhpd']
[2021-10-22, 12:34:52 EDT] {standard_task_runner.py:77} INFO - Job 37: Subtask test_task
[2021-10-22, 12:34:52 EDT] {logging_mixin.py:109} INFO - Running <TaskInstance: testdag.test_task manual__2021-10-22T16:34:48.077920+00:00 [running]> on host 8f4f12aea4ed
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1412} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=testdag
AIRFLOW_CTX_TASK_ID=test_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-22T16:34:48.077920+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-10-22T16:34:48.077920+00:00
[2021-10-22, 12:34:52 EDT] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2021-10-22, 12:34:52 EDT] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'python3 -c "import snowflake.connector"']
[2021-10-22, 12:34:52 EDT] {subprocess.py:85} INFO - Output:
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - Traceback (most recent call last):
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "<string>", line 1, in <module>
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/site-packages/snowflake/connector/__init__.py", line 15, in <module>
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - from .connection import SnowflakeConnection
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/site-packages/snowflake/connector/connection.py", line 31, in <module>
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - from . import errors, proxy
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py", line 14, in <module>
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - from .description import CLIENT_NAME, SNOWFLAKE_CONNECTOR_VERSION
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/site-packages/snowflake/connector/description.py", line 17, in <module>
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - PLATFORM = platform.platform()
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/platform.py", line 1206, in platform
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - libcname, libcversion = libc_ver(sys.executable)
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - File "/usr/local/lib/python3.8/platform.py", line 193, in libc_ver
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - with open(executable, 'rb') as f:
[2021-10-22, 12:34:52 EDT] {subprocess.py:89} INFO - IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpyes4ub8l'
[2021-10-22, 12:34:52 EDT] {subprocess.py:93} INFO - Command exited with return code 1
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1686} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1324, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1443, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1499, in _execute_task
result = execute_callable(context=context)
File "/usr/local/lib/python3.8/site-packages/airflow/operators/bash.py", line 187, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.
[2021-10-22, 12:34:52 EDT] {taskinstance.py:1270} INFO - Marking task as FAILED. dag_id=testdag, task_id=test_task, execution_date=20211022T163448, start_date=20211022T163452, end_date=20211022T163452
[2021-10-22, 12:34:52 EDT] {standard_task_runner.py:88} ERROR - Failed to execute job 37 for task test_task
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/usr/local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/usr/local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1324, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1443, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1499, in _execute_task
result = execute_callable(context=context)
File "/usr/local/lib/python3.8/site-packages/airflow/operators/bash.py", line 187, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 1.
[2021-10-22, 12:34:52 EDT] {local_task_job.py:154} INFO - Task exited with return code 1
[2021-10-22, 12:34:52 EDT] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
Thank you so much for the help!
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct