diff --git a/airflow/providers/apache/spark/CHANGELOG.rst b/airflow/providers/apache/spark/CHANGELOG.rst index 5c56cd95e3d01..f93f31236496b 100644 --- a/airflow/providers/apache/spark/CHANGELOG.rst +++ b/airflow/providers/apache/spark/CHANGELOG.rst @@ -24,6 +24,16 @@ Changelog --------- +4.0.0 +..... + +The ``spark-binary`` connection extra could be set to any binary, but with 4.0.0 version only two values +are allowed for it ``spark-submit`` and ``spark2-submit``. + +The ``spark-home`` connection extra is not allowed any more - the binary should be available on the +PATH in order to use SparkSubmitHook and SparkSubmitOperator. + + 3.0.0 ..... diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index 472f1f8cda7df..bfc08eda6402d 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -33,12 +33,13 @@ with contextlib.suppress(ImportError, NameError): from airflow.kubernetes import kube_client +ALLOWED_SPARK_BINARIES = ["spark-submit", "spark2-submit"] + class SparkSubmitHook(BaseHook, LoggingMixin): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. - It requires that the "spark-submit" binary is in the PATH or the spark_home to be - supplied. + It requires that the "spark-submit" binary is in the PATH. :param conf: Arbitrary Spark configuration properties :param spark_conn_id: The :ref:`spark connection id ` as configured @@ -150,6 +151,12 @@ def __init__( self._yarn_application_id: str | None = None self._kubernetes_driver_pod: str | None = None self._spark_binary = spark_binary + if self._spark_binary is not None and self._spark_binary not in ALLOWED_SPARK_BINARIES: + raise RuntimeError( + f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it" + f" was `{spark_binary}`. Please make sure your spark binary is one of the" + f" allowed ones and that it is available on the PATH" + ) self._connection = self._resolve_connection() self._is_yarn = "yarn" in self._connection["master"] @@ -167,7 +174,7 @@ def __init__( def _resolve_should_track_driver_status(self) -> bool: """ - Determines whether or not this hook should poll the spark driver status through + Determines whether this hook should poll the spark driver status through subsequent spark-submit status requests after the initial spark-submit request :return: if the driver status should be tracked """ @@ -179,7 +186,6 @@ def _resolve_connection(self) -> dict[str, Any]: "master": "yarn", "queue": None, "deploy_mode": None, - "spark_home": None, "spark_binary": self._spark_binary or "spark-submit", "namespace": None, } @@ -197,8 +203,20 @@ def _resolve_connection(self) -> dict[str, Any]: extra = conn.extra_dejson conn_data["queue"] = extra.get("queue") conn_data["deploy_mode"] = extra.get("deploy-mode") - conn_data["spark_home"] = extra.get("spark-home") - conn_data["spark_binary"] = self._spark_binary or extra.get("spark-binary", "spark-submit") + spark_binary = self._spark_binary or extra.get("spark-binary", "spark-submit") + if spark_binary not in ALLOWED_SPARK_BINARIES: + raise RuntimeError( + f"The `spark-binary` extra can be on of {ALLOWED_SPARK_BINARIES} and it" + f" was `{spark_binary}`. Please make sure your spark binary is one of the" + " allowed ones and that it is available on the PATH" + ) + conn_spark_home = extra.get("spark-home") + if conn_spark_home: + raise RuntimeError( + "The `spark-home` extra is not allowed any more. Please make sure your `spark-submit` or" + " `spark2-submit` are available on the PATH." + ) + conn_data["spark_binary"] = spark_binary conn_data["namespace"] = extra.get("namespace") except AirflowException: self.log.info( @@ -214,17 +232,8 @@ def get_conn(self) -> Any: pass def _get_spark_binary_path(self) -> list[str]: - # If the spark_home is passed then build the spark-submit executable path using - # the spark_home; otherwise assume that spark-submit is present in the path to - # the executing user - if self._connection["spark_home"]: - connection_cmd = [ - os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"]) - ] - else: - connection_cmd = [self._connection["spark_binary"]] - - return connection_cmd + # Assume that spark-submit is present in the path to the executing user + return [self._connection["spark_binary"]] def _mask_cmd(self, connection_cmd: str | list[str]) -> str: # Mask any password related fields in application args with key value pair @@ -580,15 +589,8 @@ def _build_spark_driver_kill_command(self) -> list[str]: Construct the spark-submit command to kill a driver. :return: full command to kill a driver """ - # If the spark_home is passed then build the spark-submit executable path using - # the spark_home; otherwise assume that spark-submit is present in the path to - # the executing user - if self._connection["spark_home"]: - connection_cmd = [ - os.path.join(self._connection["spark_home"], "bin", self._connection["spark_binary"]) - ] - else: - connection_cmd = [self._connection["spark_binary"]] + # Assume that spark-submit is present in the path to the executing user + connection_cmd = [self._connection["spark_binary"]] # The url to the spark master connection_cmd += ["--master", self._connection["master"]] diff --git a/airflow/providers/apache/spark/operators/spark_submit.py b/airflow/providers/apache/spark/operators/spark_submit.py index a024038ffc73e..b0b2961c73f4c 100644 --- a/airflow/providers/apache/spark/operators/spark_submit.py +++ b/airflow/providers/apache/spark/operators/spark_submit.py @@ -30,8 +30,7 @@ class SparkSubmitOperator(BaseOperator): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. - It requires that the "spark-submit" binary is in the PATH or the spark-home is set - in the extra on the connection. + It requires that the "spark-submit" binary is in the PATH. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/docs/apache-airflow-providers-apache-spark/connections/spark.rst b/docs/apache-airflow-providers-apache-spark/connections/spark.rst index 330c22ac9f9f9..2d3247757a1f6 100644 --- a/docs/apache-airflow-providers-apache-spark/connections/spark.rst +++ b/docs/apache-airflow-providers-apache-spark/connections/spark.rst @@ -42,8 +42,7 @@ Extra (optional) * ``queue`` - The name of the YARN queue to which the application is submitted. * ``deploy-mode`` - Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client). - * ``spark-home`` - If passed then build the ``spark-binary`` executable path using it (``spark-home``/bin/``spark-binary``); otherwise assume that ``spark-binary`` is present in the PATH of the executing user. - * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. + * ``spark-binary`` - The command to use for Spark submit. Some distros may use ``spark2-submit``. Default ``spark-submit``. Only ``spark-submit`` and ``spark2-submit`` are allowed as value. * ``namespace`` - Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota). When specifying the connection in environment variable you should specify diff --git a/tests/providers/apache/spark/hooks/test_spark_submit.py b/tests/providers/apache/spark/hooks/test_spark_submit.py index 7f550eead5522..f0736c84907e2 100644 --- a/tests/providers/apache/spark/hooks/test_spark_submit.py +++ b/tests/providers/apache/spark/hooks/test_spark_submit.py @@ -89,11 +89,7 @@ def setUp(self): conn_id="spark_k8s_cluster", conn_type="spark", host="k8s://https://k8s-master", - extra=( - '{"spark-home": "/opt/spark", ' - ' "deploy-mode": "cluster", ' - ' "namespace": "mynamespace"}' - ), + extra='{"deploy-mode": "cluster", "namespace": "mynamespace"}', ) ) db.merge_conn( @@ -102,28 +98,26 @@ def setUp(self): db.merge_conn( Connection( - conn_id="spark_home_set", + conn_id="spark_binary_set", conn_type="spark", - host="yarn://yarn-master", - extra='{"spark-home": "/opt/myspark"}', + host="yarn", + extra='{"spark-binary": "spark2-submit"}', ) ) - - db.merge_conn(Connection(conn_id="spark_home_not_set", conn_type="spark", host="yarn://yarn-master")) db.merge_conn( Connection( - conn_id="spark_binary_set", + conn_id="spark_custom_binary_set", conn_type="spark", host="yarn", - extra='{"spark-binary": "custom-spark-submit"}', + extra='{"spark-binary": "spark-other-submit"}', ) ) db.merge_conn( Connection( - conn_id="spark_binary_and_home_set", + conn_id="spark_home_set", conn_type="spark", host="yarn", - extra='{"spark-home": "/path/to/spark_home", "spark-binary": "custom-spark-submit"}', + extra='{"spark-home": "/custom/spark-home/path"}', ) ) db.merge_conn( @@ -131,7 +125,7 @@ def setUp(self): conn_id="spark_standalone_cluster", conn_type="spark", host="spark://spark-standalone-master:6066", - extra='{"spark-home": "/path/to/spark_home", "deploy-mode": "cluster"}', + extra='{"deploy-mode": "cluster"}', ) ) db.merge_conn( @@ -139,7 +133,7 @@ def setUp(self): conn_id="spark_standalone_cluster_client_mode", conn_type="spark", host="spark://spark-standalone-master:6066", - extra='{"spark-home": "/path/to/spark_home", "deploy-mode": "client"}', + extra='{"deploy-mode": "client"}', ) ) @@ -265,10 +259,7 @@ def test_resolve_should_track_driver_status(self): hook_spark_yarn_cluster = SparkSubmitHook(conn_id="spark_yarn_cluster") hook_spark_k8s_cluster = SparkSubmitHook(conn_id="spark_k8s_cluster") hook_spark_default_mesos = SparkSubmitHook(conn_id="spark_default_mesos") - hook_spark_home_set = SparkSubmitHook(conn_id="spark_home_set") - hook_spark_home_not_set = SparkSubmitHook(conn_id="spark_home_not_set") hook_spark_binary_set = SparkSubmitHook(conn_id="spark_binary_set") - hook_spark_binary_and_home_set = SparkSubmitHook(conn_id="spark_binary_and_home_set") hook_spark_standalone_cluster = SparkSubmitHook(conn_id="spark_standalone_cluster") # When @@ -282,16 +273,9 @@ def test_resolve_should_track_driver_status(self): should_track_driver_status_spark_default_mesos = ( hook_spark_default_mesos._resolve_should_track_driver_status() ) - should_track_driver_status_spark_home_set = hook_spark_home_set._resolve_should_track_driver_status() - should_track_driver_status_spark_home_not_set = ( - hook_spark_home_not_set._resolve_should_track_driver_status() - ) should_track_driver_status_spark_binary_set = ( hook_spark_binary_set._resolve_should_track_driver_status() ) - should_track_driver_status_spark_binary_and_home_set = ( - hook_spark_binary_and_home_set._resolve_should_track_driver_status() - ) should_track_driver_status_spark_standalone_cluster = ( hook_spark_standalone_cluster._resolve_should_track_driver_status() ) @@ -301,10 +285,7 @@ def test_resolve_should_track_driver_status(self): assert should_track_driver_status_spark_yarn_cluster is False assert should_track_driver_status_spark_k8s_cluster is False assert should_track_driver_status_spark_default_mesos is False - assert should_track_driver_status_spark_home_set is False - assert should_track_driver_status_spark_home_not_set is False assert should_track_driver_status_spark_binary_set is False - assert should_track_driver_status_spark_binary_and_home_set is False assert should_track_driver_status_spark_standalone_cluster is True def test_resolve_connection_yarn_default(self): @@ -322,7 +303,6 @@ def test_resolve_connection_yarn_default(self): "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection @@ -343,7 +323,6 @@ def test_resolve_connection_yarn_default_connection(self): "spark_binary": "spark-submit", "deploy_mode": None, "queue": "root.default", - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection @@ -365,7 +344,6 @@ def test_resolve_connection_mesos_default_connection(self): "spark_binary": "spark-submit", "deploy_mode": None, "queue": None, - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection @@ -386,7 +364,6 @@ def test_resolve_connection_spark_yarn_cluster_connection(self): "spark_binary": "spark-submit", "deploy_mode": "cluster", "queue": "root.etl", - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection @@ -405,7 +382,6 @@ def test_resolve_connection_spark_k8s_cluster_connection(self): # Then dict_cmd = self.cmd_args_to_dict(cmd) expected_spark_connection = { - "spark_home": "/opt/spark", "queue": None, "spark_binary": "spark-submit", "master": "k8s://https://k8s-master", @@ -430,7 +406,6 @@ def test_resolve_connection_spark_k8s_cluster_ns_conf(self): # Then dict_cmd = self.cmd_args_to_dict(cmd) expected_spark_connection = { - "spark_home": "/opt/spark", "queue": None, "spark_binary": "spark-submit", "master": "k8s://https://k8s-master", @@ -442,46 +417,6 @@ def test_resolve_connection_spark_k8s_cluster_ns_conf(self): assert dict_cmd["--deploy-mode"] == "cluster" assert dict_cmd["--conf"] == "spark.kubernetes.namespace=airflow" - def test_resolve_connection_spark_home_set_connection(self): - # Given - hook = SparkSubmitHook(conn_id="spark_home_set") - - # When - connection = hook._resolve_connection() - cmd = hook._build_spark_submit_command(self._spark_job_file) - - # Then - expected_spark_connection = { - "master": "yarn://yarn-master", - "spark_binary": "spark-submit", - "deploy_mode": None, - "queue": None, - "spark_home": "/opt/myspark", - "namespace": None, - } - assert connection == expected_spark_connection - assert cmd[0] == "/opt/myspark/bin/spark-submit" - - def test_resolve_connection_spark_home_not_set_connection(self): - # Given - hook = SparkSubmitHook(conn_id="spark_home_not_set") - - # When - connection = hook._resolve_connection() - cmd = hook._build_spark_submit_command(self._spark_job_file) - - # Then - expected_spark_connection = { - "master": "yarn://yarn-master", - "spark_binary": "spark-submit", - "deploy_mode": None, - "queue": None, - "spark_home": None, - "namespace": None, - } - assert connection == expected_spark_connection - assert cmd[0] == "spark-submit" - def test_resolve_connection_spark_binary_set_connection(self): # Given hook = SparkSubmitHook(conn_id="spark_binary_set") @@ -493,18 +428,29 @@ def test_resolve_connection_spark_binary_set_connection(self): # Then expected_spark_connection = { "master": "yarn", - "spark_binary": "custom-spark-submit", + "spark_binary": "spark2-submit", "deploy_mode": None, "queue": None, - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection - assert cmd[0] == "custom-spark-submit" + assert cmd[0] == "spark2-submit" + + def test_resolve_connection_custom_spark_binary_not_allowed_runtime_error(self): + with pytest.raises(RuntimeError): + SparkSubmitHook(conn_id="spark_binary_set", spark_binary="another-custom-spark-submit") + + def test_resolve_connection_spark_binary_extra_not_allowed_runtime_error(self): + with pytest.raises(RuntimeError): + SparkSubmitHook(conn_id="spark_custom_binary_set") + + def test_resolve_connection_spark_home_not_allowed_runtime_error(self): + with pytest.raises(RuntimeError): + SparkSubmitHook(conn_id="spark_home_set") def test_resolve_connection_spark_binary_default_value_override(self): # Given - hook = SparkSubmitHook(conn_id="spark_binary_set", spark_binary="another-custom-spark-submit") + hook = SparkSubmitHook(conn_id="spark_binary_set", spark_binary="spark2-submit") # When connection = hook._resolve_connection() @@ -513,14 +459,13 @@ def test_resolve_connection_spark_binary_default_value_override(self): # Then expected_spark_connection = { "master": "yarn", - "spark_binary": "another-custom-spark-submit", + "spark_binary": "spark2-submit", "deploy_mode": None, "queue": None, - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection - assert cmd[0] == "another-custom-spark-submit" + assert cmd[0] == "spark2-submit" def test_resolve_connection_spark_binary_default_value(self): # Given @@ -536,32 +481,11 @@ def test_resolve_connection_spark_binary_default_value(self): "spark_binary": "spark-submit", "deploy_mode": None, "queue": "root.default", - "spark_home": None, "namespace": None, } assert connection == expected_spark_connection assert cmd[0] == "spark-submit" - def test_resolve_connection_spark_binary_and_home_set_connection(self): - # Given - hook = SparkSubmitHook(conn_id="spark_binary_and_home_set") - - # When - connection = hook._resolve_connection() - cmd = hook._build_spark_submit_command(self._spark_job_file) - - # Then - expected_spark_connection = { - "master": "yarn", - "spark_binary": "custom-spark-submit", - "deploy_mode": None, - "queue": None, - "spark_home": "/path/to/spark_home", - "namespace": None, - } - assert connection == expected_spark_connection - assert cmd[0] == "/path/to/spark_home/bin/custom-spark-submit" - def test_resolve_connection_spark_standalone_cluster_connection(self): # Given hook = SparkSubmitHook(conn_id="spark_standalone_cluster") @@ -576,11 +500,10 @@ def test_resolve_connection_spark_standalone_cluster_connection(self): "spark_binary": "spark-submit", "deploy_mode": "cluster", "queue": None, - "spark_home": "/path/to/spark_home", "namespace": None, } assert connection == expected_spark_connection - assert cmd[0] == "/path/to/spark_home/bin/spark-submit" + assert cmd[0] == "spark-submit" def test_resolve_spark_submit_env_vars_standalone_client_mode(self): # Given @@ -820,7 +743,7 @@ def test_standalone_cluster_process_on_kill(self): kill_cmd = hook._build_spark_driver_kill_command() # Then - assert kill_cmd[0] == "/path/to/spark_home/bin/spark-submit" + assert kill_cmd[0] == "spark-submit" assert kill_cmd[1] == "--master" assert kill_cmd[2] == "spark://spark-standalone-master:6066" assert kill_cmd[3] == "--kill"