Skip to content

Spark SQL Hook not using connections #8713

@boittega

Description

@boittega

Apache Airflow version: 1.10.10

What happened:

SparkSqlHook is not using any connection, the default conn_id is spark_sql_default, if this connection doesn't exist, the hook returns an error:

Traceback (most recent call last):
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/contrib/operators/spark_sql_operator.py", line 109, in execute
    yarn_queue=self._yarn_queue
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/contrib/hooks/spark_sql_hook.py", line 75, in __init__
    self._conn = self.get_connection(conn_id)
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/hooks/base_hook.py", line 84, in get_connection
    conn = random.choice(list(cls.get_connections(conn_id)))
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/hooks/base_hook.py", line 80, in get_connections
    return secrets.get_connections(conn_id)
  File "/Users/rbottega/Documents/airflow_latest/env/lib/python3.7/site-packages/airflow/secrets/__init__.py", line 56, in get_connections
    raise AirflowException("The conn_id `{0}` isn't defined".format(conn_id))
airflow.exceptions.AirflowException: The conn_id `spark_sql_default` isn't defined

If specified any valid connection, it does nothing, the self._conn variable is never used and there is an empty get_conn method.

    def get_conn(self):
        pass

What you expected to happen:

It should follow the same behaviour of SparkSubmitHook to receive the master host and extra parameters from the connection OR don't request a connection ID.

How to reproduce it:
Just create a DAG with a SparkSqlOperator and have not created the connection spark_sql_default.

    sql_job = SparkSqlOperator(
        sql="SELECT * FROM test",
        master="local",
        task_id="sql_job"
    )

Anything else we need to know:

I am happy to implement any of these solutions.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions