-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-6212] SparkSubmitHook resolve connection #7075
[AIRFLOW-6212] SparkSubmitHook resolve connection #7075
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! In case of doubts contact the developers at: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark hook works for me and tests pass without this pr. can u send the add connection cli command u create to hit the issue? i dont agree with removing the spark check on line 177
@tooptoop4 if we don't remove the spark check on line 177, how to use this hook to track driver status deployed on yarn, mesos, or k8s? Since I think Or this hook is created only for standalone mode? |
@tooptoop4 the tests pass only for the case when the connection information (host, port, conn_type, etc.) are stored in database. I tried this hook by storing the connection info as an environment variable. This failed because the URI parser returned irrelevant results for all types of cluster mode deployment. For instance,
|
yes.there is no concept of async driver status poll for other modes , read https://spark.apache.org/docs/latest/running-on-yarn.html ! in other modes the submit to launch is synchronous . i think u can cancel this @albertusk95 |
|
I couldn't find any info stating that there's no async driver polling for YARN anyway from the provided link. |
@tooptoop4 I think you might want to try this sample DAG to reproduce the issue. a) create an environment var for spark connection.
b) create a DAG file to run from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
import os
job_file = 'path/to/job/file'
default_args = {
'depends_on_past': False,
'start_date': <fill_start_date>,
'retries': <fill_retries>,
'retry_delay': <fill_retry_delay>
}
dag = DAG('spark-submit-hook', default_args=default_args, schedule_interval=<fill_interval>)
avg = SparkSubmitOperator(task_id=<fill_task_id>, dag=dag,
application=job_file,
spark_binary='path/to/spark-submit') |
There isn't async driver polling in YARN, I know Spark on YARN. |
@@ -174,8 +174,7 @@ def _resolve_should_track_driver_status(self): | |||
subsequent spark-submit status requests after the initial spark-submit request | |||
:return: if the driver status should be tracked | |||
""" | |||
return ('spark://' in self._connection['master'] and | |||
self._connection['deploy_mode'] == 'cluster') | |||
return self._connection['deploy_mode'] == 'cluster' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return (('spark://' in self._connection['master'] or conn_type starts with spark) and self._connection['deploy_mode'] == 'cluster')
@@ -190,17 +189,22 @@ def _resolve_connection(self): | |||
# Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT and | |||
# k8s://https://<HOST>:<PORT> | |||
conn = self.get_connection(self._conn_id) | |||
if conn.port: | |||
conn_data['master'] = "{}:{}".format(conn.host, conn.port) | |||
if conn.conn_type in ['spark', 'mesos']: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this section 192-200
existing tests for connection added via db/cli needs to work |
well, I guess the current tests don't support connection added via cli, right? |
How about using Livy to interact with the YARN cluster? I guess it supports sync & async results retrieval. |
there is another active pr on livy i saw |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Problem
I tried to use
SparkSubmitOperator
using standalone cluster first. Unfortunately, thespark-submit
task was failed. The following exception occurred.The first thing that came up into my mind was why the master address excluded the
spark://
prefix. So it should be like--master spark://host:port
. I performed a quick check to the source code and found that such a thing (scheme addition) hadn't been handled.After reviewing the subsequent method callings, it turned out that the driver status tracking feature won't be utilised at all because of the above bug. Look at the following code snippet.
The above method will always return
False
as the spark master's address doesn't start with the scheme, such asspark://
.Later on, I investigated the
Connection
module (airflow.models.connection) further and found that if we provide the URI (ex: spark://host:port), then the attributes of theConnection
object will be derived via URI parsing.When parsing the host, the resulting value was only the hostname without the scheme. It also becomes a critical enough bug.
Proposed Solution
I think we don't really need the whole URI. I mean, when we store the connection data as an environment variable, we could just specify the URI parts in form of JSON. This approach is mainly used to tackle the URI parsing problem.
In this case, the
conn_id
will still be preserved.Take a look at the following example (
conn_id
= "spark_default"). For simplicity, let's presume thatextra
is in JSON form.Even though this solution could reduce the false result returned by URI parsing, one need to strictly ensure that each attribute (host, port, scheme, etc.) should store the relevant value. I think it's much easier than creating a correct URI parser. Moreover, applying such a technique makes the whole connection data builder for both database & environment variable mode have the same pattern (both use a structured data specification).
Link to JIRA issue: https://issues.apache.org/jira/browse/AIRFLOW-6212
[AIRFLOW-NNNN]
, where AIRFLOW-NNNN = JIRA ID*(*) For document-only changes, no JIRA issue is needed. Commit message starts
[AIRFLOW-XXXX]
.In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.