Skip to content

Commit

Permalink
attempted to replace StringIO in tests, not entirely successful
Browse files Browse the repository at this point in the history
  • Loading branch information
David Marin committed Apr 12, 2015
1 parent f3fa37d commit f54da15
Show file tree
Hide file tree
Showing 19 changed files with 152 additions and 132 deletions.
2 changes: 1 addition & 1 deletion mrjob/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _run_step(self, step_num, step_type, input_path, output_path,

if has_combiner:
sorted_lines = sorted(child_stdout.getvalue().splitlines())
combiner_stdin = BytesIO('\n'.join(sorted_lines))
combiner_stdin = BytesIO(b'\n'.join(sorted_lines))
else:
child_stdout.flush()

Expand Down
4 changes: 2 additions & 2 deletions mrjob/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,11 @@ def sandbox(self, stdin=None, stdout=None, stderr=None):
``stdin`` is empty by default. You can set it to anything that yields
lines::
mr_job.sandbox(stdin=BytesIO('some_data\\n'))
mr_job.sandbox(stdin=BytesIO(b'some_data\\n'))
or, equivalently::
mr_job.sandbox(stdin=['some_data\\n'])
mr_job.sandbox(stdin=[b'some_data\\n'])
For convenience, this sandbox() returns self, so you can do::
Expand Down
4 changes: 2 additions & 2 deletions tests/compress.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
# limitations under the License.
"""Utilities to compress data in memory."""
import gzip
from StringIO import StringIO
from io import BytesIO


# use bz2.compress() to compress bz2 data

def gzip_compress(data):
"""return the gzip-compressed version of the given bytes."""
s = StringIO()
s = BytesIO()
g = gzip.GzipFile(fileobj=s, mode='wb')
g.write(data)
g.close()
Expand Down
14 changes: 8 additions & 6 deletions tests/fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from StringIO import StringIO
from io import BytesIO

from mrjob.py2 import stdin

from tests.sandbox import SandboxedTestCase

Expand All @@ -38,11 +40,11 @@ class MockPopen(object):

def __init__(self, args, stdin=None, stdout=None, stderr=None):
self.args = args
self.stdin = stdin if stdin is not None else StringIO()
self.stdin = stdin if stdin is not None else BytesIO()

# discard incoming stdout/stderr objects
self.stdout = StringIO()
self.stderr = StringIO()
self.stdout = BytesIO()
self.stderr = BytesIO()

if stdin is None:
self._run()
Expand All @@ -62,8 +64,8 @@ def _run(self):
outer.io_log.append((self.stdout_result, self.stderr_result))

# expose the results as readable file objects
self.stdout = StringIO(self.stdout_result)
self.stderr = StringIO(self.stderr_result)
self.stdout = BytesIO(self.stdout_result)
self.stderr = BytesIO(self.stderr_result)

def communicate(self, stdin=None):
if stdin is not None:
Expand Down
2 changes: 1 addition & 1 deletion tests/mockhadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def add_mock_hadoop_output(parts):
Args:
parts -- a list of the contents of parts files, which should be iterables
that return lines (e.g. lists, StringIOs).
that return lines (e.g. lists, BytesIOs).
The environment variable MOCK_HADOOP_OUTPUT must be set.
"""
Expand Down
14 changes: 14 additions & 0 deletions tests/py2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""More Python 2/3 compatibility stuff that is only used for testing."""
from mrjob.py2 import IN_PY2

# a StringIO that you can safely set sys.stdout or sys.stderr to
# (for logging or printing)
#
# Don't use this for mocking out files or subprocess stdout/stderr;
# use io.BytesIO instead
#
# TODO: maybe move this to tests.py2?
if IN_PY2:
from StringIO import StringIO
else:
from io import StringIO
52 changes: 26 additions & 26 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for EMRJobRunner"""
from contextlib import contextmanager
from contextlib import nested
import copy
from datetime import datetime
from datetime import timedelta
import getpass
import itertools
import logging
import os
import os.path
import posixpath
import shutil
from StringIO import StringIO
import tempfile
import time
from contextlib import contextmanager
from contextlib import nested
from datetime import datetime
from datetime import timedelta
from io import BytesIO

from mock import patch
from mock import Mock
Expand Down Expand Up @@ -224,7 +224,7 @@ def teardown_ssh(self, master_ssh_root):
def run_and_get_job_flow(self, *args):
# set up a job flow without caring about what the job is or what its
# inputs are.
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')
mr_job = MRTwoStepJob(
['-r', 'emr', '-v'] + list(args))
mr_job.sandbox(stdin=stdin)
Expand All @@ -245,7 +245,7 @@ class EMRJobRunnerEndToEndTestCase(MockEMRAndS3TestCase):

def test_end_to_end(self):
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
Expand Down Expand Up @@ -347,7 +347,7 @@ def test_failed_job(self):
self.mock_emr_failures = {('j-MOCKJOBFLOW0', 0): None}

with no_handlers_for_logger('mrjob.emr'):
stderr = StringIO()
stderr = BytesIO()
log_to_stream('mrjob.emr', stderr)

with mr_job.make_runner() as runner:
Expand Down Expand Up @@ -377,7 +377,7 @@ def test_failed_job(self):

def _test_remote_scratch_cleanup(self, mode, scratch_len, log_len):
self.add_mock_s3_data({'walrus': {'logs/j-MOCKJOBFLOW0/1': '1\n'}})
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'--s3-log-uri', 's3://walrus/logs',
Expand Down Expand Up @@ -432,7 +432,7 @@ def test_cleanup_error(self):
def test_args_version_018(self):
self.add_mock_s3_data({'walrus': {'logs/j-MOCKJOBFLOW0/1': '1\n'}})
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'--hadoop-version=0.18', '--ami-version=1.0'])
Expand All @@ -449,7 +449,7 @@ def test_args_version_018(self):
def test_args_version_020_205(self):
self.add_mock_s3_data({'walrus': {'logs/j-MOCKJOBFLOW0/1': '1\n'}})
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

mr_job = MRTwoStepJob(['-r', 'emr', '-v', '--ami-version=2.0'])
mr_job.sandbox(stdin=stdin)
Expand Down Expand Up @@ -522,7 +522,7 @@ def test_attach_to_existing_job_flow(self):
name='Development Job Flow', log_uri=None,
keep_alive=True)

stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')
self.mock_emr_output = {(emr_job_flow_id, 1): [
'1\t"bar"\n1\t"foo"\n2\tnull\n']}

Expand Down Expand Up @@ -598,7 +598,7 @@ def test_visible(self):
class IAMJobFlowRoleTestCase(MockEMRAndS3TestCase):

def run_and_get_job_flow(self, *args):
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')
mr_job = MRTwoStepJob(
['-r', 'emr', '-v'] + list(args))
mr_job.sandbox(stdin=stdin)
Expand Down Expand Up @@ -881,7 +881,7 @@ def test_region_bucket_match(self):
def test_region_bucket_does_not_match(self):
# aws_region specified, bucket specified with incorrect location
with no_handlers_for_logger():
stderr = StringIO()
stderr = BytesIO()
log = logging.getLogger('mrjob.emr')
log.addHandler(logging.StreamHandler(stderr))
log.setLevel(logging.WARNING)
Expand Down Expand Up @@ -1256,7 +1256,7 @@ def test_task_type_defaults_to_core_type(self):
task=(20, 'c1.medium', None))

def test_mixing_instance_number_opts_on_cmd_line(self):
stderr = StringIO()
stderr = BytesIO()
with no_handlers_for_logger():
log_to_stream('mrjob.emr', stderr)
self._test_instance_groups(
Expand All @@ -1272,7 +1272,7 @@ def test_mixing_instance_number_opts_in_mrjob_conf(self):
num_ec2_core_instances=5,
num_ec2_task_instances=9)

stderr = StringIO()
stderr = BytesIO()
with no_handlers_for_logger():
log_to_stream('mrjob.emr', stderr)
self._test_instance_groups(
Expand All @@ -1287,7 +1287,7 @@ def test_cmd_line_instance_numbers_beat_mrjob_conf(self):
self.set_in_mrjob_conf(num_ec2_core_instances=5,
num_ec2_task_instances=9)

stderr = StringIO()
stderr = BytesIO()
with no_handlers_for_logger():
log_to_stream('mrjob.emr', stderr)
self._test_instance_groups(
Expand Down Expand Up @@ -1382,7 +1382,7 @@ def test_python_exception(self):
}})
self.assertEqual(
self.runner._find_probable_cause_of_failure([1]),
{'lines': list(StringIO(TRACEBACK_START + PY_EXCEPTION)),
{'lines': list(BytesIO(TRACEBACK_START + PY_EXCEPTION)),
'log_file_uri': BUCKET_URI + ATTEMPT_0_DIR + 'stderr',
'input_uri': BUCKET_URI + 'input.gz'})

Expand All @@ -1393,7 +1393,7 @@ def test_python_exception_without_input_uri(self):
}})
self.assertEqual(
self.runner._find_probable_cause_of_failure([1]),
{'lines': list(StringIO(TRACEBACK_START + PY_EXCEPTION)),
{'lines': list(BytesIO(TRACEBACK_START + PY_EXCEPTION)),
'log_file_uri': BUCKET_URI + ATTEMPT_0_DIR + 'stderr',
'input_uri': None})

Expand All @@ -1409,7 +1409,7 @@ def test_java_exception(self):
}})
self.assertEqual(
self.runner._find_probable_cause_of_failure([1]),
{'lines': list(StringIO(JAVA_STACK_TRACE)),
{'lines': list(BytesIO(JAVA_STACK_TRACE)),
'log_file_uri': BUCKET_URI + ATTEMPT_0_DIR + 'syslog',
'input_uri': BUCKET_URI + 'input.gz'})

Expand All @@ -1422,7 +1422,7 @@ def test_java_exception_without_input_uri(self):
}})
self.assertEqual(
self.runner._find_probable_cause_of_failure([1]),
{'lines': list(StringIO(JAVA_STACK_TRACE)),
{'lines': list(BytesIO(JAVA_STACK_TRACE)),
'log_file_uri': BUCKET_URI + ATTEMPT_0_DIR + 'syslog',
'input_uri': None})

Expand Down Expand Up @@ -1576,7 +1576,7 @@ def tearDown(self):
def test_empty_counters_running_job(self):
self.runner._describe_jobflow().state = 'RUNNING'
with no_handlers_for_logger():
stderr = StringIO()
stderr = BytesIO()
log_to_stream('mrjob.emr', stderr)
self.runner._fetch_counters([1], skip_s3_wait=True)
self.assertIn('5 minutes', stderr.getvalue())
Expand Down Expand Up @@ -2172,7 +2172,7 @@ def rm_tmp_dir(self):

def test_no_mapper(self):
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
Expand Down Expand Up @@ -3061,7 +3061,7 @@ def die_ssh(*args, **kwargs):

with no_handlers_for_logger('mrjob.emr'):
r = self._quick_runner()
stderr = StringIO()
stderr = BytesIO()
log_to_stream('mrjob.emr', stderr)
with patch.object(mrjob.emr, 'ssh_terminate_single_job',
side_effect=die_ssh):
Expand All @@ -3076,7 +3076,7 @@ def die_io(*args, **kwargs):
r = self._quick_runner()
with patch.object(mrjob.emr, 'ssh_terminate_single_job',
side_effect=die_io):
stderr = StringIO()
stderr = BytesIO()
log_to_stream('mrjob.emr', stderr)
r._cleanup_job()
self.assertIn('Unable to kill job', stderr.getvalue())
Expand Down Expand Up @@ -3225,7 +3225,7 @@ def setUp(self):
super(BuildStreamingStepTestCase, self).setUp()
with patch_fs_s3():
self.runner = EMRJobRunner(
mr_job_script='my_job.py', conf_paths=[], stdin=StringIO())
mr_job_script='my_job.py', conf_paths=[], stdin=BytesIO())
self.runner._steps = [] # don't actually run `my_job.py --steps`
self.runner._add_job_files_for_upload()

Expand Down
6 changes: 3 additions & 3 deletions tests/test_hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test the hadoop job runner."""
from StringIO import StringIO
import getpass
import os
import pty
from io import BytesIO
from subprocess import CalledProcessError
from subprocess import check_call

Expand Down Expand Up @@ -146,7 +146,7 @@ class HadoopJobRunnerEndToEndTestCase(MockHadoopTestCase):

def _test_end_to_end(self, args=()):
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
stdin = BytesIO(b'foo\nbar\n')

local_input_path = os.path.join(self.tmp_dir, 'input')
with open(local_input_path, 'w') as local_input_file:
Expand Down Expand Up @@ -274,7 +274,7 @@ def setUp(self):
super(StreamingArgsTestCase, self).setUp()
self.runner = HadoopJobRunner(
hadoop_bin='hadoop', hadoop_streaming_jar='streaming.jar',
mr_job_script='my_job.py', stdin=StringIO())
mr_job_script='my_job.py', stdin=BytesIO())
self.runner._add_job_files_for_upload()

self.runner._hadoop_version='0.20.204'
Expand Down
Loading

0 comments on commit f54da15

Please sign in to comment.