diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py index f04c476f1ab97..5d25866eb86b5 100644 --- a/airflow/providers/apache/spark/hooks/spark_jdbc.py +++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py @@ -181,6 +181,8 @@ def _build_jdbc_application_arguments(self, jdbc_conn: dict[str, Any]) -> Any: arguments = [] arguments += ["-cmdType", self._cmd_type] if self._jdbc_connection["url"]: + if "?" in jdbc_conn["conn_prefix"]: + raise ValueError("The jdbc extra conn_prefix should not contain a '?'") arguments += [ "-url", f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}", diff --git a/tests/providers/apache/spark/hooks/test_spark_jdbc.py b/tests/providers/apache/spark/hooks/test_spark_jdbc.py index 495f7353a0aa8..fbec739c9f046 100644 --- a/tests/providers/apache/spark/hooks/test_spark_jdbc.py +++ b/tests/providers/apache/spark/hooks/test_spark_jdbc.py @@ -17,6 +17,8 @@ # under the License. from __future__ import annotations +from unittest.mock import patch + import pytest from airflow.models import Connection @@ -108,6 +110,18 @@ def setup_method(self): extra='{"conn_prefix":"jdbc:postgresql://"}', ) ) + db.merge_conn( + Connection( + conn_id="jdbc-invalid-extra-conn-prefix", + conn_type="postgres", + host="localhost", + schema="default", + port=5432, + login="user", + password="supersecret", + extra='{"conn_prefix":"jdbc:mysql://some_host:8085/test?some_query_param=true#"}', + ) + ) def test_resolve_jdbc_connection(self): # Given @@ -184,3 +198,9 @@ def test_invalid_host(self): def test_invalid_schema(self): with pytest.raises(ValueError, match="schema should not contain a"): SparkJDBCHook(jdbc_conn_id="jdbc-invalid-schema", **self._config) + + @patch("airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook.submit") + def test_invalid_extra_conn_prefix(self, mock_submit): + hook = SparkJDBCHook(jdbc_conn_id="jdbc-invalid-extra-conn-prefix", **self._config) + with pytest.raises(ValueError, match="extra conn_prefix should not contain a"): + hook.submit_jdbc_job()