Skip to content

Commit

Permalink
Validate conn_prefix in extra field for Spark JDBC hook (#32946)
Browse files Browse the repository at this point in the history
The `conn_prefix` in `extras` should not contain a `?` as that is
usually meant for appending query params to the URL
and `query_params` are not the intended to be included in the
`conn_prefix` section of the `extras`.
  • Loading branch information
pankajkoti committed Jul 31, 2023
1 parent ddcd30e commit 4f83e83
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/apache/spark/hooks/spark_jdbc.py
Expand Up @@ -181,6 +181,8 @@ def _build_jdbc_application_arguments(self, jdbc_conn: dict[str, Any]) -> Any:
arguments = []
arguments += ["-cmdType", self._cmd_type]
if self._jdbc_connection["url"]:
if "?" in jdbc_conn["conn_prefix"]:
raise ValueError("The jdbc extra conn_prefix should not contain a '?'")
arguments += [
"-url",
f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}",
Expand Down
20 changes: 20 additions & 0 deletions tests/providers/apache/spark/hooks/test_spark_jdbc.py
Expand Up @@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations

from unittest.mock import patch

import pytest

from airflow.models import Connection
Expand Down Expand Up @@ -108,6 +110,18 @@ def setup_method(self):
extra='{"conn_prefix":"jdbc:postgresql://"}',
)
)
db.merge_conn(
Connection(
conn_id="jdbc-invalid-extra-conn-prefix",
conn_type="postgres",
host="localhost",
schema="default",
port=5432,
login="user",
password="supersecret",
extra='{"conn_prefix":"jdbc:mysql://some_host:8085/test?some_query_param=true#"}',
)
)

def test_resolve_jdbc_connection(self):
# Given
Expand Down Expand Up @@ -184,3 +198,9 @@ def test_invalid_host(self):
def test_invalid_schema(self):
with pytest.raises(ValueError, match="schema should not contain a"):
SparkJDBCHook(jdbc_conn_id="jdbc-invalid-schema", **self._config)

@patch("airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook.submit")
def test_invalid_extra_conn_prefix(self, mock_submit):
hook = SparkJDBCHook(jdbc_conn_id="jdbc-invalid-extra-conn-prefix", **self._config)
with pytest.raises(ValueError, match="extra conn_prefix should not contain a"):
hook.submit_jdbc_job()

0 comments on commit 4f83e83

Please sign in to comment.