Skip to content

Commit

Permalink
Remove custom spark home and custom binarires for spark (#27646)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk committed Nov 13, 2022
1 parent fc7b95d commit 9358928
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 136 deletions.
10 changes: 10 additions & 0 deletions airflow/providers/apache/spark/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
Changelog
---------

4.0.0
.....

The ``spark-binary`` connection extra could be set to any binary, but with 4.0.0 version only two values
are allowed for it ``spark-submit`` and ``spark2-submit``.

The ``spark-home`` connection extra is not allowed any more - the binary should be available on the
PATH in order to use SparkSubmitHook and SparkSubmitOperator.


3.0.0
.....

Expand Down
54 changes: 28 additions & 26 deletions airflow/providers/apache/spark/hooks/spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
with contextlib.suppress(ImportError, NameError):
from airflow.kubernetes import kube_client

ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit"]


class SparkSubmitHook(BaseHook, LoggingMixin):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH or the spark_home to be
supplied.
It requires that the "spark-submit" binary is in the PATH.
:param conf: Arbitrary Spark configuration properties
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
Expand Down Expand Up @@ -150,6 +151,12 @@ def __init__(
self._yarn_application_id: str | None = None
self._kubernetes_driver_pod: str | None = None
self._spark_binary = spark_binary
if self._spark_binary is not None and self._spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"
f" was `{spark_binary}`. Please make sure your spark binary is one of the"
f" allowed ones and that it is available on the PATH"
)

self._connection = self._resolve_connection()
self._is_yarn = "yarn" in self._connection["master"]
Expand All @@ -167,7 +174,7 @@ def __init__(

def _resolve_should_track_driver_status(self) -> bool:
"""
Determines whether or not this hook should poll the spark driver status through
Determines whether this hook should poll the spark driver status through
subsequent spark-submit status requests after the initial spark-submit request
:return: if the driver status should be tracked
"""
Expand All @@ -179,7 +186,6 @@ def _resolve_connection(self) -> dict[str, Any]:
"master": "yarn",
"queue": None,
"deploy_mode": None,
"spark_home": None,
"spark_binary": self._spark_binary or "spark-submit",
"namespace": None,
}
Expand All @@ -197,8 +203,20 @@ def _resolve_connection(self) -> dict[str, Any]:
extra = conn.extra_dejson
conn_data["queue"] = extra.get("queue")
conn_data["deploy_mode"] = extra.get("deploy-mode")
conn_data["spark_home"] = extra.get("spark-home")
conn_data["spark_binary"] = self._spark_binary or extra.get("spark-binary", "spark-submit")
spark_binary = self._spark_binary or extra.get("spark-binary", "spark-submit")
if spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The `spark-binary` extra can be on of {ALLOWED_SPARK_BINARIES} and it"
f" was `{spark_binary}`. Please make sure your spark binary is one of the"
" allowed ones and that it is available on the PATH"
)
conn_spark_home = extra.get("spark-home")
if conn_spark_home:
raise RuntimeError(
"The `spark-home` extra is not allowed any more. Please make sure your `spark-submit` or"
" `spark2-submit` are available on the PATH."
)
conn_data["spark_binary"] = spark_binary
conn_data["namespace"] = extra.get("namespace")
except AirflowException:
self.log.info(
Expand All @@ -214,17 +232,8 @@ def get_conn(self) -> Any:
pass

def _get_spark_binary_path(self) -> list[str]:
# If the spark_home is passed then build the spark-submit executable path using
# the spark_home; otherwise assume that spark-submit is present in the path to
# the executing user
if self._connection["spark_home"]:
connection_cmd = [
os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"])
]
else:
connection_cmd = [self._connection["spark_binary"]]

return connection_cmd
# Assume that spark-submit is present in the path to the executing user
return [self._connection["spark_binary"]]

def _mask_cmd(self, connection_cmd: str | list[str]) -> str:
# Mask any password related fields in application args with key value pair
Expand Down Expand Up @@ -580,15 +589,8 @@ def _build_spark_driver_kill_command(self) -> list[str]:
Construct the spark-submit command to kill a driver.
:return: full command to kill a driver
"""
# If the spark_home is passed then build the spark-submit executable path using
# the spark_home; otherwise assume that spark-submit is present in the path to
# the executing user
if self._connection["spark_home"]:
connection_cmd = [
os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"])
]
else:
connection_cmd = [self._connection["spark_binary"]]
# Assume that spark-submit is present in the path to the executing user
connection_cmd = [self._connection["spark_binary"]]

# The url to the spark master
connection_cmd += ["--master", self._connection["master"]]
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/apache/spark/operators/spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
class SparkSubmitOperator(BaseOperator):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH or the spark-home is set
in the extra on the connection.
It requires that the "spark-submit" binary is in the PATH.
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ Extra (optional)

* ``queue`` - The name of the YARN queue to which the application is submitted.
* ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
* ``spark-home`` - If passed then build the ``spark-binary`` executable path using it (``spark-home``/bin/``spark-binary``); otherwise assume that ``spark-binary`` is present in the PATH of the executing user.
* ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``.
* ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit`` and ``spark2-submit`` are allowed as value.
* ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).

When specifying the connection in environment variable you should specify
Expand Down
Loading

0 comments on commit 9358928

Please sign in to comment.