Skip to content

Commit

Permalink
[AIRFLOW-1393][[AIRFLOW-1393] Enable Py3 tests in contrib/spark_submi…
Browse files Browse the repository at this point in the history
…t_hook[

The unit tests in
`tests/contrib/hooks/test_spark_submit_hook.py`
were skiped if run in Python3 because some test
cases loop forever
due to a mismatch/misunderstanding about bytes vs
string that didn't
matter under Py2 (i.e. the mocked data for
subprocess.Popen was
returning a String, but the actual Popen call
would return bytes.)

The fix is to use bytes and `six.ByteIO` so that
the tests work on Py2
and Py3. Alsowe had to patch `subprocess.Popen` in
the right place so
the mocks are picked up.

Closes #2427 from ashb/enable-
spark_submit_hook_tests-py3
  • Loading branch information
ashb authored and bolkedebruin committed Jul 17, 2017
1 parent 0dd0029 commit 751e936
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
6 changes: 4 additions & 2 deletions airflow/contrib/hooks/spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,11 @@ def submit(self, application="", **kwargs):
self._sp = subprocess.Popen(spark_submit_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1,
universal_newlines=True,
**kwargs)

self._process_log(iter(self._sp.stdout.readline, b''))
self._process_log(iter(self._sp.stdout.readline, ''))
returncode = self._sp.wait()

if returncode:
Expand All @@ -232,7 +234,7 @@ def _process_log(self, itr):
"""
# Consume the iterator
for line in itr:
line = line.decode('utf-8').strip()
line = line.strip()
# If we run yarn cluster mode, we want to extract the application id from
# the logs so we can kill the application when we stop it unexpectedly
if self._is_yarn and self._connection['deploy_mode'] == 'cluster':
Expand Down
20 changes: 8 additions & 12 deletions tests/contrib/hooks/test_spark_submit_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import six
import sys
import unittest
from io import StringIO

from airflow import configuration, models
from airflow.utils import db
Expand Down Expand Up @@ -63,10 +63,6 @@ def cmd_args_to_dict(list_cmd):

def setUp(self):

if sys.version_info[0] == 3:
raise unittest.SkipTest('TestSparkSubmitHook won\'t work with '
'python3. No need to test anything here')

configuration.load_test_config()
db.merge_conn(
models.Connection(
Expand Down Expand Up @@ -135,19 +131,19 @@ def test_build_command(self):
]
self.assertEquals(expected_build_cmd, cmd)

@patch('subprocess.Popen')
@patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_spark_process_runcmd(self, mock_popen):
# Given
mock_popen.return_value.stdout = StringIO(u'stdout')
mock_popen.return_value.stderr = StringIO(u'stderr')
mock_popen.return_value.stdout = six.StringIO('stdout')
mock_popen.return_value.stderr = six.StringIO('stderr')
mock_popen.return_value.wait.return_value = 0

# When
hook = SparkSubmitHook(conn_id='')
hook.submit()

# Then
self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2))
self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-2, stdout=-1, universal_newlines=True, bufsize=-1))

def test_resolve_connection_yarn_default(self):
# Given
Expand Down Expand Up @@ -309,11 +305,11 @@ def test_process_log(self):

self.assertEqual(hook._yarn_application_id, 'application_1486558679801_1820')

@patch('subprocess.Popen')
@patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_spark_process_on_kill(self, mock_popen):
# Given
mock_popen.return_value.stdout = StringIO(u'stdout')
mock_popen.return_value.stderr = StringIO(u'stderr')
mock_popen.return_value.stdout = six.StringIO('stdout')
mock_popen.return_value.stderr = six.StringIO('stderr')
mock_popen.return_value.poll.return_value = None
mock_popen.return_value.wait.return_value = 0
log_lines = [
Expand Down

0 comments on commit 751e936

Please sign in to comment.