Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkKubernetesOperator fails after upgrade from 2.8.1 to 2.8.2 #38017

Closed
1 of 2 tasks
gbloisi-openaire opened this issue Mar 10, 2024 · 11 comments · Fixed by #38035
Closed
1 of 2 tasks

SparkKubernetesOperator fails after upgrade from 2.8.1 to 2.8.2 #38017

gbloisi-openaire opened this issue Mar 10, 2024 · 11 comments · Fixed by #38035
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:cncf-kubernetes Kubernetes provider related issues

Comments

@gbloisi-openaire
Copy link

gbloisi-openaire commented Mar 10, 2024

Apache Airflow version

2.8.2

If "Other Airflow 2 version" selected, which one?

2.8.3rc1

What happened?

I'm running a spark-pi example using the SparkKubernetesOperator:

    task_id='spark_pi_submit',
    namespace='lot1-spark-jobs',
    application_file="/example_spark_kubernetes_operator_pi.yaml",
    kubernetes_conn_id="kubernetes_default",
    do_xcom_push=True,
    in_cluster=True,
    delete_on_termination=True,
    dag=dag
)

It was running fine on 2.8.1. After upgrading to airflow 2.8.2 I got the following error:

│     kube_client=self.client,                                                                                                                                                              │
│                 ^^^^^^^^^^^                                                                                                                                                               │
│   File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__                                                                                                                    │
│     val = self.func(instance)                                                                                                                                                             │
│           ^^^^^^^^^^^^^^^^^^^                                                                                                                                                             │
│   File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 250, in client                                           │
│     return self.hook.core_v1_client                                                                                                                                                       │
│            ^^^^^^^^^                                                                                                                                                                      │
│   File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__                                                                                                                    │
│     val = self.func(instance)                                                                                                                                                             │
│           ^^^^^^^^^^^^^^^^^^^                                                                                                                                                             │
│   File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 242, in hook                                             │
│     or self.template_body.get("kubernetes", {}).get("kube_config_file", None),                                                                                                            │
│        ^^^^^^^^^^^^^^^^^^                                                                                                                                                                 │
│   File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 198, in template_body                                    │
│     return self.manage_template_specs()                                                                                                                                                   │
│            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                   │
│   File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 127, in manage_template_specs                            │
│     template_body = _load_body_to_dict(open(self.application_file))                                                                                                                       │
│                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                        │
│ FileNotFoundError: [Errno 2] No such file or directory: 'apiVersion: "sparkoperator.k8s.io/v1beta2"\nkind: SparkApplication\nmetadata:\n  name: spark-pi\n  namespace: lot1-spark-jobs\ns │
│ [2024-03-10T10:29:15.613+0000] {taskinstance.py:1149} INFO - Marking task as UP_FOR_RETRY. dag_id=spark_pi, task_id=spark_pi_submit, execution_date=20240310T102910, start_date=20240310T │

It looks like self.application_file eventually contains the content of the file it point to.

I suspect it was caused by changes introduced by PR-22253. I'm quite new to Airflow and Python but my guess is that "application_file" property hasn't to be managed as a template_property since template representations where moved to template_body.

What you think should happen instead?

No response

How to reproduce

Given my understanding of the issue, a very simple example of SparkKubernetesOperator using application_file property should reproduce this issue.

Operating System

kind kubernetes

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@gbloisi-openaire gbloisi-openaire added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 10, 2024
Copy link

boring-cyborg bot commented Mar 10, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@potiuk
Copy link
Member

potiuk commented Mar 10, 2024

cc: @hamedhsn ? Can you please take a look? It seems related to your change

As a workaround @gbloisi-openaire -> you can easily downgrade cncf.kubernetes provider to previous version and continue using Airlfow 2.8.2. In Airflow Providers can be upgraded and downloaded separately from airflow.

If you are using container image - you can build your own image and downgrade providers there: https://airflow.apache.org/docs/docker-stack/build.html#example-of-customizing-airflow-provider-packages

@hamedhsn
Copy link
Contributor

@gbloisi-openaire can you pls post the content of your example_spark_kubernetes_operator_pi.yaml file
I assume the file exists at /example_spark_kubernetes_operator_pi.yaml

@gbloisi-openaire
Copy link
Author

Plese find it below. It is an adaption from tests/system/providers/cncf/kubernetes/example_spark_kubernetes_spark_pi.yaml. I removed license's comments after a few tries to have confirmation that the error was reporting the content of this file.

Also my DAG is an adaption from the provided tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py.
So far I got it working successfully by rewriting the file content as a template_spec object and by removing the do_xcom_push=True and the SparkKubernetesSensor task.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: lot1-spark-jobs
spec:
  type: Scala
  mode: cluster
  image: "apache/spark:v3.1.3"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar"
  sparkVersion: "3.1.3"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.3
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.3
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

@Taragolis
Copy link
Contributor

This cause by multiple changes and some of them mutable exclusive or conflicts to the Airflow Core

  1. Expects that application_file is always is file
  2. application_file is templated_field, which could be both values or template file. If it a file it return content of the file, rather than path.
  3. Move operation around templated fields into the execute method: Fix rendering SparkKubernetesOperator.template_body #37271 which fix SparkKubernetesOperator not rendering template correctly #37261

If value expected non-templated file I would recommend to use LiteralValue (see: #35017, this not documented well) wrapper instead

@renanxx1
Copy link

I had the same issue described in the post after upgrading from 2.8.1 to 2.8.2. Will it be fixed in 2.8.3 version ?

@potiuk
Copy link
Member

potiuk commented Mar 12, 2024

I had the same issue described in the post after upgrading from 2.8.1 to 2.8.2. Will it be fixed in 2.8.3 version ?

Well. The merge happened 6 hours ago and 2.8.3 has been released yesterday - so - obviously - no.

But this is a provider fix - and you can both - downgrade provider to older version that had no problems and upgrade the provider when it gets released after the fix. So you do not have to wait - you can downgrade the provider now - and then when we release an RC candidate for new providers - in a few days you can take RC for a spin and test it.

Can we count on your help testing it when RC is out @renanxx1 ?

@treyhendon
Copy link

If anyone finds this, I was able to run Airflow 2.8.3 with apache-airflow-providers-cncf-kubernetes 7.13.0.

Our container does custom PIP installs and I set the requirement as apache-airflow-providers-cncf-kubernetes == 7.13.0.

@lyabomyr
Copy link

I still have the same issue on Airflow 2.9.0

@thispejo
Copy link

the error still persists in v2.8.3
Is there any workaround?

@lyabomyr
Copy link

lyabomyr commented May 24, 2024

the error still persists in v2.8.3 Is there any workaround?

I create my own operator based on previous version and then I use it without fear that in new update operator will be changed or broken.

import datetime
from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from kubernetes.client import ApiException
from kubernetes.watch import Watch
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, _load_body_to_dict
import json

if TYPE_CHECKING:
    from kubernetes.client.models import CoreV1EventList

    from airflow.utils.context import Context

import uuid


class SparkKubernetesOperator(BaseOperator):
    def __init__(
            self,
            *,
            spec_job_template_file: str,
            namespace: str,
            kubernetes_conn_id: str = "kubernetes_default",
            api_group: str = "sparkoperator.k8s.io",
            api_version: str = "v1beta2",
            in_cluster: bool | None = None,
            cluster_context: str | None = None,
            watch: bool = True,
            image: str,
            main_application_path: str,
            **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        try:
            path_to_templates = '/opt/airflow/dags/repo/plugins/custom_operators/spark/templates/'
            json_file_path = path_to_templates + spec_job_template_file
            with open(json_file_path, 'r') as json_file:
                self.application_file = json.load(json_file)
                self.application_file['metadata']['namespace'] = namespace
                self.application_file['metadata'][
                    'name'] = f'{self.task_id[:13]}-{str(uuid.uuid4())}-task'.lower().replace('_', '-')
                self.application_file['spec']['mainApplicationFile'] = main_application_path
                self.application_file['spec']['image'] = image
        except FileNotFoundError:
            raise FileNotFoundError(f"Config file '{spec_job_template_file}' not found.")
        except json.JSONDecodeError:
            raise ValueError(f"Invalid JSON format in '{spec_job_template_file}'.")

        self.namespace = namespace
        self.kubernetes_conn_id = kubernetes_conn_id
        self.api_group = api_group
        self.api_version = api_version
        self.plural = "sparkapplications"
        self.in_cluster = in_cluster
        self.cluster_context = cluster_context
        self.watch = watch

    @cached_property
    def hook(self) -> KubernetesHook:
        return KubernetesHook(
            conn_id=self.kubernetes_conn_id,
            in_cluster=self.in_cluster,
            config_file=None,
            cluster_context=self.cluster_context,
        )

    def _get_namespace_event_stream(self, namespace, query_kwargs=None):
        try:
            return Watch().stream(
                self.hook.core_v1_client.list_namespaced_event,
                namespace=namespace,
                watch=True,
                **(query_kwargs or {}),
            )
        except ApiException as e:
            if e.status == 410:  # Resource version is too old
                events: CoreV1EventList = self.hook.core_v1_client.list_namespaced_event(
                    namespace=namespace, watch=False
                )
                resource_version = events.metadata.resource_version
                query_kwargs["resource_version"] = resource_version
                return self._get_namespace_event_stream(namespace, query_kwargs)
            else:
                raise

    def execute(self, context: Context):
        if isinstance(self.application_file, str):
            body = _load_body_to_dict(self.application_file)
        else:
            body = self.application_file
        name = body["metadata"]["name"]
        namespace = self.namespace or self.hook.get_namespace()

        response = None
        is_job_created = False
        if self.watch:
            try:
                namespace_event_stream = self._get_namespace_event_stream(
                    namespace=namespace,
                    query_kwargs={
                        "field_selector": f"involvedObject.kind=SparkApplication,involvedObject.name={name}"
                    },
                )

                response = self.hook.create_custom_object(
                    group=self.api_group,
                    version=self.api_version,
                    plural=self.plural,
                    body=body,
                    namespace=namespace,
                )
                is_job_created = True
                for event in namespace_event_stream:
                    obj = event["object"]
                    if event["object"].last_timestamp >= datetime.datetime.strptime(
                            response["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%S%z"
                    ):
                        self.log.info(obj.message)
                        if obj.reason == "SparkDriverRunning":
                            pod_log_stream = Watch().stream(
                                self.hook.core_v1_client.read_namespaced_pod_log,
                                name=f"{name}-driver",
                                namespace=namespace,
                                timestamps=True,
                            )
                            for line in pod_log_stream:
                                self.log.info(line)
                        elif obj.reason in [
                            "SparkApplicationSubmissionFailed",
                            "SparkApplicationFailed",
                            "SparkApplicationDeleted",
                        ]:
                            is_job_created = False
                            raise AirflowException(obj.message)
                        elif obj.reason == "SparkApplicationCompleted":
                            break
                        else:
                            continue
            except Exception:
                if is_job_created:
                    self.on_kill()
                raise

        else:
            response = self.hook.create_custom_object(
                group=self.api_group,
                version=self.api_version,
                plural=self.plural,
                body=body,
                namespace=namespace,
            )

        return response

    def on_kill(self) -> None:
        if isinstance(self.application_file, str):
            body = _load_body_to_dict(self.application_file)
        else:
            body = self.application_file
        name = body["metadata"]["name"]
        namespace = self.namespace or self.hook.get_namespace()
        self.hook.delete_custom_object(
            group=self.api_group,
            version=self.api_version,
            plural=self.plural,
            namespace=namespace,
            name=name,
        )

Don't forget to change path_to_templates = '/opt/airflow/dags/repo/plugins/custom_operators/spark/templates/'
I store different templetes in directory templates and chose necessaries template in operator
Template example:

  "apiVersion": "sparkoperator.k8s.io/v1beta2",
  "kind": "SparkApplication",
  "metadata": {
    "name": null,
    "namespace": null
  },
  "spec": {
    "type": "Python",
    "pythonVersion": "3",
    "mode": "cluster",
    "sparkConf": {
      "spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.4",
      "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp",
      "spark.kubernetes.allocation.batch.size": "10",
      "hadoop.security.uid.cache.secs": "10s"
    },
    "hadoopConf": {
      "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    },
    "image": null,
    "imagePullPolicy": "Always",
    "mainApplicationFile": null,
    "sparkVersion": "3.1.1",
    "restartPolicy": {
      "type": "Never"
    },
    "nodeSelector": {
      "application": "spark"
    },
    "driver": {
      "cores": 1,
      "coreLimit": "1024m",
      "memory": "4g",
      "labels": {
        "version": "3.5.1"
      }
    },
    "executor": {
      "cores": 3,
      "instances": 3,
      "memory": "6g",
      "labels": {
        "version": "3.5.1"
      },
      "podSecurityContext": {
        "fsGroup": 185
      },
      "env": [
        {
          "name": "SPARK_USER",
          "value": "root"  # use ur own
        }
      ]
    },
    "deps": {
      "jars": [
        "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar",
        "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.641/aws-java-sdk-bundle-1.12.641.jar"
      ]
    }
  }
}

use operator as next:

submit = SparkKubernetesOperator(
        task_id='spark-job-template1',
        trigger_rule="all_success",
        spec_job_template_file='spark_default_config_job.json', # ur template file
        image=SPARK_IMAGE, # put ur spark image 
        main_application_path="local:///opt/spark/work-dir/test_job2.py", # put path to  ur spark file
        in_cluster=True,
        namespace=K8S_NAMESPACE, # usual "airflow" or spark
        do_xcom_push=False,
        dag=dag
    )

Or u can use new KubernetusSparkOperator by next example:

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
submit = SparkKubernetesOperator(
        task_id='spark-job-template',
        in_cluster=True,
        image=SPARK_IMAGE, # put ur spark image 
        code_path="local:///opt/spark/work-dir/test_job2.py", #put spark execution  file 
        template_spec={"spark": {
            "apiVersion": "sparkoperator.k8s.io/v1beta2",
            "version": "v1beta2",
            "kind": "SparkApplication",
            "apiGroup": "sparkoperator.k8s.io",
            "metadata": {
                "namespace":"airflow"
            },
            "spec": {
                "type": "Python",
                "pythonVersion": "3",
                "mode": "cluster",
                "sparkVersion": "3.1.1",
                "successfulRunHistoryLimit": 1,
                "restartPolicy": {
                    "type": "Never"
                },
                "imagePullPolicy": "Always",
                "hadoopConf": {},
                "imagePullSecrets": [],
                "dynamicAllocation": {
                    "enabled": False,
                    "initialExecutors": 1,
                    "minExecutors": 1,
                    "maxExecutors": 4
                },
                "labels": {
                    "version": "3.1.1"
                },
                "driver": {
                    "serviceAccount": "spark",
                    "cores": 1,
                    "coreLimit": "1200m",
                    "instances": 1,
                    "container_resources": {
                        "gpu": {
                            "name": None,
                            "quantity": 2
                        },
                        "cpu": {
                            "request": 1,
                            "limit": 2
                        },
                        "memory": {
                            "request": "2Gi",
                            "limit": "10Gi"
                        }
                    }
                },
                "executor": {
                    "cores": 1,
                    "instances": 1,
                    "container_resources": {
                        "gpu": {
                            "name": None,
                            "quantity": 1
                        },
                        "cpu": {
                            "request": 2,
                            "limit": 5
                        },
                        "memory": {
                            "request": "2Gi",
                            "limit": "10Gi"
                        }
                    }
                }
            }
        },
            "kubernetes": {
                "env_vars": [],
                "env_from": [],
                "image_pull_secrets": [],
                "node_selector": {},
                "affinity": {
                    "nodeAffinity": {},
                    "podAffinity": {},
                    "podAntiAffinity": {}
                },
                "tolerations": [
                    {"key": "node-role.kubernetes.io/control-plane",
                     "operator": "Exists",
                     "effect": "NoSchedule"
                     }
                ],
                "config_map_mounts": {},
                "volume_mounts": [
                    {
                        "name": "config",
                        "mountPath": "/tmp/logs/"
                    }
                ],
                "volumes": [
                    {
                        "name": "config",
                        "persistentVolumeClaim": {
                            "claimName": "airflow-logs"
                        }
                    }
                ],
                "from_env_config_map": [],
                "from_env_secret": [],
                "in_cluster": True,
                "conn_id": "kubernetes_default",
                "kube_config_file": None,
                "cluster_context": None
            }
        },
        namespace="airflow",
        get_logs=True,
        dag=dag

U can build spark image used next DockerFile:

FROM spark:latest
USER root

WORKDIR /opt/spark/
ENV SPARK_HOME=/opt/spark
RUN chown -R spark:spark ${SPARK_HOME} && \
    chmod -R 0700 ${SPARK_HOME}

RUN mkdir /home/spark && chown spark:spark /home/spark

COPY requirements.txt ./
RUN pip install -r  requirements.txt
ENV PYTHONPATH="/opt/spark/$PWD"

COPY src/ /opt/spark/ # copy ur local files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants