Skip to content

Template container_resources of KubernetesPod #29973

@Atlaoui

Description

@Atlaoui

What do you see as an issue?

I have DAG that uses a KubernetesPodOperator to run a containerized task. I would like to be able to parameterize the resources (memory and CPU) allocated to the container, so that I can change them depending on the specific DAG run.

Solving the problem

I tried using Airflow's params to define the container_resources argument in the KubernetesPodOperator, but I couldn't find a way to reference the parameters in the dictionary. Here's a simplified version of my code:

from airflow import DAG
from airflow.contrib.operators import KubernetesPodOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 7),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}

with DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    params={
        "request_memory": Param("2000", type="string"),
        "request_cpu": Param("4000", type="string"),
        "limit_memory": Param("4G", type="string"),
        "limit_cpu": Param("5000", type="string"),
    }
) as dag:

    task = KubernetesPodOperator(
        task_id='my_task',
        name='my_task',
        image='my_image:latest',
        namespace='my_namespace',
        image_pull_policy='Always',
        is_delete_operator_pod=True,
        container_resources={
            'request_memory': '{{ params.request_memory }}',# tried dag.params["request_memory"],
            'limit_memory': '{{ params.limit_memory }}' 
            'request_cpu': '{{ params.request_cpu }}',
            'limit_cpu': '{{ params.limit_cpu }}',
        },
    )
    
 # I also tried to  use k8s.V1ResourceRequirements


ressources = k8s.V1ResourceRequirements(
    limits={
        'cpu': '{{ params.limit_cpu }}',
        'memory': '{{ params.limit_memory }}'
    },
    requests={
        'cpu': '{{ params.request_cpu }}',
        'memory': '{{ params.request_memory }}'
    }
)

task = KubernetesPodOperator(
    container_resources=resources
)

Anything else

In the doc we can read that :

container_resources (k8s.V1ResourceRequirements | None) – resources for the launched pod. (templated)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions