Skip to content

Commit

Permalink
replace mrjob_tar_gz with mrjob_zip
Browse files Browse the repository at this point in the history
  • Loading branch information
David Marin committed Dec 2, 2016
1 parent 47a33b5 commit 7d05296
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 58 deletions.
8 changes: 4 additions & 4 deletions mrjob/dataproc.py
Expand Up @@ -464,8 +464,8 @@ def _add_bootstrap_files_for_upload(self):
"""
# lazily create mrjob.tar.gz
if self._bootstrap_mrjob():
self._create_mrjob_tar_gz()
self._bootstrap_dir_mgr.add('file', self._mrjob_tar_gz_path)
self._create_mrjob_zip()
self._bootstrap_dir_mgr.add('file', self._mrjob_zip_path)

# all other files needed by the script are already in
# _bootstrap_dir_mgr
Expand Down Expand Up @@ -849,9 +849,9 @@ def _create_master_bootstrap_script_if_needed(self):
# create mrjob.tar.gz if we need it, and add commands to install it
mrjob_bootstrap = []
if self._bootstrap_mrjob():
assert self._mrjob_tar_gz_path
assert self._mrjob_zip_path
path_dict = {
'type': 'file', 'name': None, 'path': self._mrjob_tar_gz_path}
'type': 'file', 'name': None, 'path': self._mrjob_zip_path}
self._bootstrap_dir_mgr.add(**path_dict)

# find out where python keeps its libraries
Expand Down
10 changes: 5 additions & 5 deletions mrjob/emr.py
Expand Up @@ -944,8 +944,8 @@ def _add_bootstrap_files_for_upload(self, persistent=False):
"""
# lazily create mrjob.tar.gz
if self._bootstrap_mrjob():
self._create_mrjob_tar_gz()
self._bootstrap_dir_mgr.add('file', self._mrjob_tar_gz_path)
self._create_mrjob_zip()
self._bootstrap_dir_mgr.add('file', self._mrjob_zip_path)

# all other files needed by the script are already in
# _bootstrap_dir_mgr
Expand Down Expand Up @@ -2407,9 +2407,9 @@ def _create_master_bootstrap_script_if_needed(self):
mrjob_bootstrap = []
if self._bootstrap_mrjob():
# _add_bootstrap_files_for_upload() should have done this
assert self._mrjob_tar_gz_path
assert self._mrjob_zip_path
path_dict = {
'type': 'file', 'name': None, 'path': self._mrjob_tar_gz_path}
'type': 'file', 'name': None, 'path': self._mrjob_zip_path}
self._bootstrap_dir_mgr.add(**path_dict)

# find out where python keeps its libraries
Expand Down Expand Up @@ -3128,7 +3128,7 @@ def _pool_hash(self):
sorted(
(name, self.fs.md5sum(path)) for name, path
in self._bootstrap_dir_mgr.name_to_path('file').items()
if not path == self._mrjob_tar_gz_path),
if not path == self._mrjob_zip_path),
self._opts['additional_emr_info'],
self._bootstrap,
self._bootstrap_actions(),
Expand Down
34 changes: 16 additions & 18 deletions mrjob/runner.py
Expand Up @@ -55,7 +55,7 @@
from mrjob.step import _is_spark_step_type
from mrjob.util import bash_wrap
from mrjob.util import cmd_line
from mrjob.util import tar_and_gzip
from mrjob.util import zip_dir


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -327,8 +327,8 @@ def __init__(self, mr_job_script=None, conf_paths=None,
self._stdin = stdin or sys.stdin.buffer
self._stdin_path = None # temp file containing dump from stdin

# where a tarball of the mrjob library is stored locally
self._mrjob_tar_gz_path = None
# where a zip file of the mrjob library is stored locally
self._mrjob_zip_path = None

# store output_dir
self._output_dir = output_dir
Expand Down Expand Up @@ -887,8 +887,8 @@ def _create_setup_wrapper_script(

if self._bootstrap_mrjob() and self.BOOTSTRAP_MRJOB_IN_SETUP:
# patch setup to add mrjob.tar.gz to PYTYHONPATH
mrjob_tar_gz = self._create_mrjob_tar_gz()
path_dict = {'type': 'archive', 'name': None, 'path': mrjob_tar_gz}
mrjob_zip = self._create_mrjob_zip()
path_dict = {'type': 'archive', 'name': None, 'path': mrjob_zip}
self._working_dir_mgr.add(**path_dict)
setup = [['export PYTHONPATH=', path_dict, ':$PYTHONPATH']] + setup

Expand Down Expand Up @@ -974,7 +974,7 @@ def _parse_setup(self):

return setup

def _setup_wrapper_script_content(self, setup, mrjob_tar_gz_name=None):
def _setup_wrapper_script_content(self, setup, mrjob_zip_name=None):
"""Return a (Bourne) shell script that runs the setup commands and then
executes whatever is passed to it (this will be our mapper/reducer),
as a list of strings (one for each line, including newlines).
Expand Down Expand Up @@ -1111,17 +1111,17 @@ def interpolate(arg):

return [interpolate(arg) for arg in args]

def _create_mrjob_tar_gz(self):
"""Make a tarball of the mrjob library, without .pyc or .pyo files,
This will also set ``self._mrjob_tar_gz_path`` and return it.
def _create_mrjob_zip(self):
"""Make a zip of the mrjob library, without .pyc or .pyo files,
This will also set ``self._mrjob_zip_path`` and return it.
Typically called from
:py:meth:`_create_setup_wrapper_script`.
It's safe to call this method multiple times (we'll only create
the tarball once.)
the zip file once.)
"""
if not self._mrjob_tar_gz_path:
if not self._mrjob_zip_path:
# find mrjob library
import mrjob

Expand All @@ -1132,8 +1132,7 @@ def _create_mrjob_tar_gz(self):

mrjob_dir = os.path.dirname(mrjob.__file__) or '.'

tar_gz_path = os.path.join(self._get_local_tmp_dir(),
'mrjob.tar.gz')
zip_path = os.path.join(self._get_local_tmp_dir(), 'mrjob.zip')

def filter_path(path):
filename = os.path.basename(path)
Expand All @@ -1147,13 +1146,12 @@ def filter_path(path):
filename.startswith('._'))

log.debug('archiving %s -> %s as %s' % (
mrjob_dir, tar_gz_path, os.path.join('mrjob', '')))
tar_and_gzip(
mrjob_dir, tar_gz_path, filter=filter_path, prefix='mrjob')
mrjob_dir, zip_path, os.path.join('mrjob', '')))
zip_dir(mrjob_dir, zip_path, filter=filter_path, prefix='mrjob')

self._mrjob_tar_gz_path = tar_gz_path
self._mrjob_zip_path = zip_path

return self._mrjob_tar_gz_path
return self._mrjob_zip_path

def _hadoop_generic_args_for_step(self, step_num):
"""Arguments like -D and -libjars that apply to every Hadoop
Expand Down
8 changes: 4 additions & 4 deletions tests/mockboto.py
Expand Up @@ -175,13 +175,13 @@ def setUp(self):
super(MockBotoTestCase, self).setUp()

# patch slow things
def fake_create_mrjob_tar_gz(mocked_self, *args, **kwargs):
mocked_self._mrjob_tar_gz_path = self.fake_mrjob_tgz_path
def fake_create_mrjob_zip(mocked_self, *args, **kwargs):
mocked_self._mrjob_zip_path = self.fake_mrjob_tgz_path
return self.fake_mrjob_tgz_path

self.start(patch.object(
EMRJobRunner, '_create_mrjob_tar_gz',
fake_create_mrjob_tar_gz))
EMRJobRunner, '_create_mrjob_zip',
fake_create_mrjob_zip))

self.start(patch.object(time, 'sleep'))

Expand Down
8 changes: 4 additions & 4 deletions tests/mockgoogleapiclient.py
Expand Up @@ -198,13 +198,13 @@ def setUp(self):
super(MockGoogleAPITestCase, self).setUp()

# patch slow things
def fake_create_mrjob_tar_gz(mocked_self, *args, **kwargs):
mocked_self._mrjob_tar_gz_path = self.fake_mrjob_tgz_path
def fake_create_mrjob_zip(mocked_self, *args, **kwargs):
mocked_self._mrjob_zip_path = self.fake_mrjob_tgz_path
return self.fake_mrjob_tgz_path

self.start(patch.object(
DataprocJobRunner, '_create_mrjob_tar_gz',
fake_create_mrjob_tar_gz))
DataprocJobRunner, '_create_mrjob_zip',
fake_create_mrjob_zip))

self.start(patch.object(time, 'sleep'))

Expand Down
14 changes: 7 additions & 7 deletions tests/test_dataproc.py
Expand Up @@ -161,10 +161,10 @@ def test_end_to_end(self):

# make sure mrjob.tar.gz is created and uploaded as
# a bootstrap file
self.assertTrue(os.path.exists(runner._mrjob_tar_gz_path))
self.assertIn(runner._mrjob_tar_gz_path,
self.assertTrue(os.path.exists(runner._mrjob_zip_path))
self.assertIn(runner._mrjob_zip_path,
runner._upload_mgr.path_to_uri())
self.assertIn(runner._mrjob_tar_gz_path,
self.assertIn(runner._mrjob_zip_path,
runner._bootstrap_dir_mgr.paths())

cluster_id = runner.get_cluster_id()
Expand Down Expand Up @@ -734,7 +734,7 @@ def assertScriptDownloads(path, name=None):
# check files get downloaded
assertScriptDownloads(foo_py_path, 'bar.py')
assertScriptDownloads('gs://walrus/scripts/ohnoes.sh')
assertScriptDownloads(runner._mrjob_tar_gz_path)
assertScriptDownloads(runner._mrjob_zip_path)

# check scripts get run

Expand All @@ -750,12 +750,12 @@ def assertScriptDownloads(path, name=None):
self.assertIn('/tmp/s.sh', lines)

# bootstrap_mrjob
mrjob_tar_gz_name = runner._bootstrap_dir_mgr.name(
'file', runner._mrjob_tar_gz_path)
mrjob_zip_name = runner._bootstrap_dir_mgr.name(
'file', runner._mrjob_zip_path)
self.assertIn("__mrjob_PYTHON_LIB=$(" + PYTHON_BIN + " -c 'from"
" distutils.sysconfig import get_python_lib;"
" print(get_python_lib())')", lines)
self.assertIn('sudo tar xfz $__mrjob_PWD/' + mrjob_tar_gz_name +
self.assertIn('sudo tar xfz $__mrjob_PWD/' + mrjob_zip_name +
' -C $__mrjob_PYTHON_LIB', lines)
self.assertIn('sudo ' + PYTHON_BIN + ' -m compileall -f'
' $__mrjob_PYTHON_LIB/mrjob && true', lines)
Expand Down
14 changes: 7 additions & 7 deletions tests/test_emr.py
Expand Up @@ -234,10 +234,10 @@ def test_end_to_end(self):

# make sure mrjob.tar.gz is created and uploaded as
# a bootstrap file
self.assertTrue(os.path.exists(runner._mrjob_tar_gz_path))
self.assertIn(runner._mrjob_tar_gz_path,
self.assertTrue(os.path.exists(runner._mrjob_zip_path))
self.assertIn(runner._mrjob_zip_path,
runner._upload_mgr.path_to_uri())
self.assertIn(runner._mrjob_tar_gz_path,
self.assertIn(runner._mrjob_zip_path,
runner._bootstrap_dir_mgr.paths())

self.assertEqual(sorted(results),
Expand Down Expand Up @@ -1495,7 +1495,7 @@ def assertScriptDownloads(path, name=None):
assertScriptDownloads(foo_py_path, 'bar.py')
assertScriptDownloads('s3://walrus/scripts/ohnoes.sh')
assertScriptDownloads('/tmp/quz', 'quz')
assertScriptDownloads(runner._mrjob_tar_gz_path)
assertScriptDownloads(runner._mrjob_zip_path)
assertScriptDownloads('speedups.sh')
assertScriptDownloads('/tmp/s.sh')
if PY2:
Expand All @@ -1512,12 +1512,12 @@ def assertScriptDownloads(path, name=None):
self.assertIn(' true', lines)
self.assertIn(' ls', lines)
# bootstrap_mrjob
mrjob_tar_gz_name = runner._bootstrap_dir_mgr.name(
'file', runner._mrjob_tar_gz_path)
mrjob_zip_name = runner._bootstrap_dir_mgr.name(
'file', runner._mrjob_zip_path)
self.assertIn(" __mrjob_PYTHON_LIB=$(" + expected_python_bin +
" -c 'from distutils.sysconfig import get_python_lib;"
" print(get_python_lib())')", lines)
self.assertIn(' sudo tar xfz $__mrjob_PWD/' + mrjob_tar_gz_name +
self.assertIn(' sudo tar xfz $__mrjob_PWD/' + mrjob_zip_name +
' -C $__mrjob_PYTHON_LIB', lines)
self.assertIn(' sudo ' + expected_python_bin + ' -m compileall -f'
' $__mrjob_PYTHON_LIB/mrjob && true', lines)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_hadoop.py
Expand Up @@ -671,20 +671,20 @@ def _test_end_to_end(self, args=()):
['-verbose'])

# make sure mrjob.tar.gz is was uploaded
self.assertTrue(os.path.exists(runner._mrjob_tar_gz_path))
self.assertIn(runner._mrjob_tar_gz_path,
self.assertTrue(os.path.exists(runner._mrjob_zip_path))
self.assertIn(runner._mrjob_zip_path,
runner._upload_mgr.path_to_uri())

# make sure setup script exists, and mrjob.tar.gz is added
# to PYTHONPATH in it
self.assertTrue(os.path.exists(runner._setup_wrapper_script_path))
self.assertIn(runner._setup_wrapper_script_path,
runner._upload_mgr.path_to_uri())
mrjob_tar_gz_name = runner._working_dir_mgr.name(
'archive', runner._mrjob_tar_gz_path)
mrjob_zip_name = runner._working_dir_mgr.name(
'archive', runner._mrjob_zip_path)
with open(runner._setup_wrapper_script_path) as wrapper:
self.assertTrue(any(
('export PYTHONPATH' in line and mrjob_tar_gz_name in line)
('export PYTHONPATH' in line and mrjob_zip_name in line)
for line in wrapper))

self.assertEqual(runner.counters(),
Expand Down
8 changes: 4 additions & 4 deletions tests/test_runner.py
Expand Up @@ -201,12 +201,12 @@ def test_owner_and_label_kwargs(self):

class CreateMrjobTarGzTestCase(TestCase):

def test_create_mrjob_tar_gz(self):
def test_create_mrjob_zip(self):
with no_handlers_for_logger('mrjob.runner'):
with InlineMRJobRunner(conf_paths=[]) as runner:
mrjob_tar_gz_path = runner._create_mrjob_tar_gz()
mrjob_tar_gz = tarfile.open(mrjob_tar_gz_path)
contents = mrjob_tar_gz.getnames()
mrjob_zip_path = runner._create_mrjob_zip()
mrjob_zip = tarfile.open(mrjob_zip_path)
contents = mrjob_zip.getnames()

for path in contents:
self.assertEqual(path[:6], 'mrjob/')
Expand Down

0 comments on commit 7d05296

Please sign in to comment.