Skip to content

KubernetesPodOperator uses filepath's *contents* rather than generated filepath for config_file parameter #18025

@ZackingIt

Description

@ZackingIt

Apache Airflow version

2.1.3 (latest released)

Operating System

Ubuntu VERSION="16.04.6 LTS (Xenial Xerus)"

Versions of Apache Airflow Providers

2.1.3

Deployment

Docker-Compose

Deployment details

My docker-compose.yaml is identical to the default dev box example except I mount a few additional volumes:

  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./kubernetes_configs:/opt/airflow/kubernetes_configs
    - ./airflow.cfg:/opt/airflow/airflow.cfg

What happened

KubernetesPodOperator seems to be opening the config file at the location, and then providing the opened file contents as the "file path", rather than simply providing the generated string which is the composition of the template_path and the additional config_file path.

AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-03T20:22:20.691296+00:00
[2021-09-03 20:22:23,803] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 336, in execute
    config_file=self.config_file,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/kube_client.py", line 145, in get_kube_client
    client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/kube_client.py", line 46, in _get_kube_config
    load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/refresh_config.py", line 123, in load_kube_config
    loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/refresh_config.py", line 105, in _get_kube_config_loader_for_yaml_file
    with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: 'apiVersion: batch/v1beta1\nkind: CronJob\nmetadata:\n  name: zek-01\nspec:\n  schedule: "0 */1 * * *"\n  concurrencyPolicy: Forbid\n  jobTemplate:\n    spec:\n      template:\n        metadata:\n          labels:\n            networkpolicy.xandr.com/unrestrictedEgress: "true"\n        spec:\n          containers:\n          - name: zek-01\n            image: docker.artifactory.prod.adnxs.net/ssp-object-sync:0.87\n            env:\n            - name: GROUP\n              value: "1"\n            - name: ENV\n              value: "TEST"\n            - name: SCRIPT\n              value: "ADX-CREATIVE-REGISTER"\n            - name: "PARAM_api_user"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_api_user"\n            - name: "PARAM_api_passwd"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_api_passwd"\n            - name: "PARAM_cron_db_user"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_cron_db_user"\n            - name: "PARAM_cron_db_passwd"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_cron_db_passwd"\n            - name: "PARAM_int_db_user"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_int_db_user"\n            - name: "PARAM_int_db_passwd"\n              valueFrom:\n                secretKeyRef:\n                  name: ssp-object-sync-secrets-production\n                  key: "PARAM_int_db_passwd"\n            resources:\n              limits:\n                cpu: "1"\n                memory: "2.5Gi"\n              requests:\n                cpu: "0.1"\n                memory: "1Gi"\n            volumeMounts:\n            - mountPath: /var/log/adnexus\n              name: log-volume\n          volumes:\n          - name: log-volume\n            emptyDir: {}\n          restartPolicy: Never'
[2021-09-03 20:22:23,804] {taskinstance.py:1512} INFO - Marking task as FAILED. dag_id=aaa_sample_dag_2, task_id=noop_zek_1, execution_date=20210903T202220, start_date=20210903T202223, end_date=20210903T202223
[2021-09-03 20:22:23,854] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-03 20:22:23,875] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you expected to happen

This is the code that's running: -- I would expect config_file='kubernetes_configs/foo.yaml' to
generate the correct filepath and for the KubePodOperator to consume the generated filepath, not the generated filepath contents.


args = {
    'owner': 'sup_team',
    'start_date': days_ago(1)
}

dag = DAG(dag_id = 'aaa_sample_dag_2', default_args=args, schedule_interval=None, template_searchpath="/opt/airflow/")
 
with dag:
    kube_operator_1 = KubernetesPodOperator(
        task_id= 'noop_zek_1',
        name='zekk',
        namespace='default',
        cmds=['echo'],
        in_cluster=False,
        do_xcom_push=False,
        config_file='kubernetes_configs/foo.yaml'
    )
    kube_operator_1

How to reproduce

You can reproduce with simple docker-compose up with following docker-compose.yaml file -- just make sure the final result has a kubernetes_config folder with a foo.yaml file inside:
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./kubernetes_configs:/opt/airflow/kubernetes_configs
- ./airflow.cfg:/opt/airflow/airflow.cfg
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
deploy:
resources:
limits:
cpus: 2
memory: 2096M
reservations:
cpus: 1
memory: 1048M
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always

redis:
image: redis:latest
deploy:
resources:
limits:
cpus: 2
memory: 2096M
reservations:
cpus: 1
memory: 1048M
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
deploy:
resources:
limits:
cpus: 2
memory: 2096M
reservations:
cpus: 1
memory: 1048M
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.1.0
min_airlfow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airlfow_version_comparable )); then
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo -e "\033[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
exit 1
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:${AIRFLOW_GID:-0}"
volumes:
- .:/sources

flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always

volumes:
postgres-db-volume:

Airflow.cfg file is same as default.

Anything else

Problem occurs every time.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions