Skip to content

Spark Operator not working in Airflow Standalone version 2.2.1  #19602

@gauravguptabits

Description

@gauravguptabits

Apache Airflow version

2.2.1 (latest released)

Operating System

Ubuntu

Versions of Apache Airflow Providers

Apache Web UI is unable to load the dag file.

Logs:

Traceback (most recent call last):
  File "/home/gaurav.gupta/miniconda3/envs/venv_pyspark/lib/python3.7/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/gaurav.gupta/airflow/dags/prepare_complaint_raw_dataset_dag.py", line 4, in <module>
    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
  File "/home/gaurav.gupta/miniconda3/envs/venv_pyspark/lib/python3.7/site-packages/airflow/contrib/operators/spark_submit_operator.py", line 22, in <module>
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator  # noqa
ModuleNotFoundError: No module named 'airflow.providers.apache'

DAG file

from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable

local_tz = pendulum.timezone("Asia/Kolkata")

default_args = {
    'owner': 'Impressico',
    'depends_on_past': False,
    'start_date': datetime(2021, 11, 15, tzinfo=local_tz),
    'email': ['gaurav.gupta@impressico.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(dag_id='prepare_complaint_raw_dataset',
          default_args=default_args,
          catchup=False,
          schedule_interval="0 * * * *")
pyspark_app_home = Variable.get("PYSPARK_APP_HOME")

prepare_complaint_raw_dataset = SparkSubmitOperator(task_id='prepare_complaint_raw_dataset',
                                                    conn_id='Spark_Miniconda_Venv_pyspark',
                                                    py_files='jobs.zip,libs.zip',
                                                    application=f'{pyspark_app_home}/dist/main.py',
                                                    total_executor_cores=4,
                                                    name='prepare_complaint_raw_dataset',
                                                    dag=dag)

Deployment

Virtualenv installation

Deployment details

I have installed via pip in Anaconda environment.

What happened

After defining dag file, web ui of airflow is popping up an error. Following the stacktrace it resulted in some import error which has been attached in description

What you expected to happen

I was expecting the file to get at least accepted by WEB UI as the definition looks fine. Being newbee to Airflow, I feel there can be configuration issue within the SparkSubmitOperator parameters.

How to reproduce

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions