Skip to content

Commit

Permalink
[AIRFLOW-6949] Respect explicit spark.kubernetes.namespace conf to …
Browse files Browse the repository at this point in the history
…SparkSubmitOperator (#7575)

This means the value from the Operator/dag file takes precedence over
the connection

The previous behaviour was to emit one line from the conf arg, but then
a later one from the connection:

```
--conf spark.kubernetes.namespace=airflow \
--conf spark.kubernetes.namespace=default \
```
  • Loading branch information
ashb committed Feb 28, 2020
1 parent 6d905fd commit b59042b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
10 changes: 6 additions & 4 deletions airflow/providers/apache/spark/hooks/spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(self,
env_vars=None,
verbose=False,
spark_binary=None):
self._conf = conf
self._conf = conf or {}
self._conn_id = conn_id
self._files = files
self._py_files = py_files
Expand Down Expand Up @@ -208,6 +208,9 @@ def _resolve_connection(self):
self._conn_id, conn_data['master']
)

if 'spark.kubernetes.namespace' in self._conf:
conn_data['namespace'] = self._conf['spark.kubernetes.namespace']

return conn_data

def get_conn(self):
Expand Down Expand Up @@ -247,9 +250,8 @@ def _build_spark_submit_command(self, application):
# The url of the spark master
connection_cmd += ["--master", self._connection['master']]

if self._conf:
for key in self._conf:
connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))]
for key in self._conf:
connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))]
if self._env_vars and (self._is_kubernetes or self._is_yarn):
if self._is_yarn:
tmpl = "spark.yarn.appMasterEnv.{}={}"
Expand Down
24 changes: 24 additions & 0 deletions tests/providers/apache/spark/hooks/test_spark_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,30 @@ def test_resolve_connection_spark_k8s_cluster_connection(self):
self.assertEqual(dict_cmd["--master"], "k8s://https://k8s-master")
self.assertEqual(dict_cmd["--deploy-mode"], "cluster")

def test_resolve_connection_spark_k8s_cluster_ns_conf(self):
# Given we specify the config option directly
conf = {
'spark.kubernetes.namespace': 'airflow',
}
hook = SparkSubmitHook(conn_id='spark_k8s_cluster', conf=conf)

# When
connection = hook._resolve_connection()
cmd = hook._build_spark_submit_command(self._spark_job_file)

# Then
dict_cmd = self.cmd_args_to_dict(cmd)
expected_spark_connection = {"spark_home": "/opt/spark",
"queue": None,
"spark_binary": "spark-submit",
"master": "k8s://https://k8s-master",
"deploy_mode": "cluster",
"namespace": "airflow"}
self.assertEqual(connection, expected_spark_connection)
self.assertEqual(dict_cmd["--master"], "k8s://https://k8s-master")
self.assertEqual(dict_cmd["--deploy-mode"], "cluster")
self.assertEqual(dict_cmd["--conf"], "spark.kubernetes.namespace=airflow")

def test_resolve_connection_spark_home_set_connection(self):
# Given
hook = SparkSubmitHook(conn_id='spark_home_set')
Expand Down

0 comments on commit b59042b

Please sign in to comment.