Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove custom spark home and custom binarires for spark #27646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/providers/apache/spark/CHANGELOG.rst
Expand Up @@ -24,6 +24,16 @@
Changelog
---------

4.0.0
.....

The ``spark-binary`` connection extra could be set to any binary, but with 4.0.0 version only two values
are allowed for it ``spark-submit`` and ``spark2-submit``.

The ``spark-home`` connection extra is not allowed any more - the binary should be available on the
PATH in order to use SparkSubmitHook and SparkSubmitOperator.


3.0.0
.....

Expand Down
54 changes: 28 additions & 26 deletions airflow/providers/apache/spark/hooks/spark_submit.py
Expand Up @@ -33,12 +33,13 @@
with contextlib.suppress(ImportError, NameError):
from airflow.kubernetes import kube_client

ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit"]


class SparkSubmitHook(BaseHook, LoggingMixin):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH or the spark_home to be
supplied.
It requires that the "spark-submit" binary is in the PATH.

:param conf: Arbitrary Spark configuration properties
:param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured
Expand Down Expand Up @@ -150,6 +151,12 @@ def __init__(
self._yarn_application_id: str | None = None
self._kubernetes_driver_pod: str | None = None
self._spark_binary = spark_binary
if self._spark_binary is not None and self._spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"
f" was `{spark_binary}`. Please make sure your spark binary is one of the"
f" allowed ones and that it is available on the PATH"
)

self._connection = self._resolve_connection()
self._is_yarn = "yarn" in self._connection["master"]
Expand All @@ -167,7 +174,7 @@ def __init__(

def _resolve_should_track_driver_status(self) -> bool:
"""
Determines whether or not this hook should poll the spark driver status through
Determines whether this hook should poll the spark driver status through
subsequent spark-submit status requests after the initial spark-submit request
:return: if the driver status should be tracked
"""
Expand All @@ -179,7 +186,6 @@ def _resolve_connection(self) -> dict[str, Any]:
"master": "yarn",
"queue": None,
"deploy_mode": None,
"spark_home": None,
"spark_binary": self._spark_binary or "spark-submit",
"namespace": None,
}
Expand All @@ -197,8 +203,20 @@ def _resolve_connection(self) -> dict[str, Any]:
extra = conn.extra_dejson
conn_data["queue"] = extra.get("queue")
conn_data["deploy_mode"] = extra.get("deploy-mode")
conn_data["spark_home"] = extra.get("spark-home")
conn_data["spark_binary"] = self._spark_binary or extra.get("spark-binary", "spark-submit")
spark_binary = self._spark_binary or extra.get("spark-binary", "spark-submit")
if spark_binary not in ALLOWED_SPARK_BINARIES:
raise RuntimeError(
f"The `spark-binary` extra can be on of {ALLOWED_SPARK_BINARIES} and it"
f" was `{spark_binary}`. Please make sure your spark binary is one of the"
" allowed ones and that it is available on the PATH"
)
conn_spark_home = extra.get("spark-home")
if conn_spark_home:
raise RuntimeError(
"The `spark-home` extra is not allowed any more. Please make sure your `spark-submit` or"
" `spark2-submit` are available on the PATH."
)
conn_data["spark_binary"] = spark_binary
conn_data["namespace"] = extra.get("namespace")
except AirflowException:
self.log.info(
Expand All @@ -214,17 +232,8 @@ def get_conn(self) -> Any:
pass

def _get_spark_binary_path(self) -> list[str]:
# If the spark_home is passed then build the spark-submit executable path using
# the spark_home; otherwise assume that spark-submit is present in the path to
# the executing user
if self._connection["spark_home"]:
connection_cmd = [
os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"])
]
else:
connection_cmd = [self._connection["spark_binary"]]

return connection_cmd
# Assume that spark-submit is present in the path to the executing user
return [self._connection["spark_binary"]]

def _mask_cmd(self, connection_cmd: str | list[str]) -> str:
# Mask any password related fields in application args with key value pair
Expand Down Expand Up @@ -580,15 +589,8 @@ def _build_spark_driver_kill_command(self) -> list[str]:
Construct the spark-submit command to kill a driver.
:return: full command to kill a driver
"""
# If the spark_home is passed then build the spark-submit executable path using
# the spark_home; otherwise assume that spark-submit is present in the path to
# the executing user
if self._connection["spark_home"]:
connection_cmd = [
os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"])
]
else:
connection_cmd = [self._connection["spark_binary"]]
# Assume that spark-submit is present in the path to the executing user
connection_cmd = [self._connection["spark_binary"]]

# The url to the spark master
connection_cmd += ["--master", self._connection["master"]]
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/apache/spark/operators/spark_submit.py
Expand Up @@ -30,8 +30,7 @@
class SparkSubmitOperator(BaseOperator):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
It requires that the "spark-submit" binary is in the PATH or the spark-home is set
in the extra on the connection.
It requires that the "spark-submit" binary is in the PATH.

.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
Expand Up @@ -42,8 +42,7 @@ Extra (optional)

* ``queue`` - The name of the YARN queue to which the application is submitted.
* ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client).
* ``spark-home`` - If passed then build the ``spark-binary`` executable path using it (``spark-home``/bin/``spark-binary``); otherwise assume that ``spark-binary`` is present in the PATH of the executing user.
* ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``.
* ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit`` and ``spark2-submit`` are allowed as value.
* ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).

When specifying the connection in environment variable you should specify
Expand Down