Skip to content

Commit

Permalink
stop passing upload_mgr to _upload_args() and _arg_hash_paths()
Browse files Browse the repository at this point in the history
recognize self._upload_mgr in MRJobRunner constructor
  • Loading branch information
David Marin committed Oct 15, 2016
1 parent 203090c commit e636bc4
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
2 changes: 1 addition & 1 deletion mrjob/dataproc.py
Expand Up @@ -631,7 +631,7 @@ def _build_dataproc_hadoop_job(self, step_num):
# TODO - dmarin @ mtai - Probably a little safer to do the API's way,
# assuming the API supports distributed cache syntax (so we can pick
# the names of the uploaded files).
args.extend(self._upload_args(self._upload_mgr))
args.extend(self._upload_args())

args.extend(self._hadoop_args_for_step(step_num))

Expand Down
2 changes: 1 addition & 1 deletion mrjob/emr.py
Expand Up @@ -1535,7 +1535,7 @@ def _build_streaming_step(self, step_num):

step_args = []
step_args.extend(step_arg_prefix) # add 'hadoop-streaming' for 4.x
step_args.extend(self._upload_args(self._upload_mgr))
step_args.extend(self._upload_args())
step_args.extend(self._libjar_step_args())
step_args.extend(self._hadoop_args_for_step(step_num))

Expand Down
3 changes: 1 addition & 2 deletions mrjob/hadoop.py
Expand Up @@ -463,8 +463,7 @@ def _args_for_streaming_step(self, step_num):
args = self.get_hadoop_bin() + ['jar', hadoop_streaming_jar]

# set up uploading from HDFS to the working dir
args.extend(
self._upload_args(self._upload_mgr))
args.extend(self._upload_args())

# if no reducer, shut off reducer tasks. This has to come before
# extra hadoop args, which could contain jar-specific args
Expand Down
15 changes: 9 additions & 6 deletions mrjob/runner.py
Expand Up @@ -265,6 +265,7 @@ def __init__(self, mr_job_script=None, conf_paths=None,
self._fs = None

self._working_dir_mgr = WorkingDirManager()
self._upload_mgr = None # define in subclasses that use this

self._script_path = mr_job_script
if self._script_path:
Expand Down Expand Up @@ -1155,26 +1156,28 @@ def _hadoop_args_for_step(self, step_num):

return args

def _arg_hash_paths(self, type, upload_mgr):
def _arg_hash_paths(self, type):
"""Helper function for the *upload_args methods."""
for name, path in self._working_dir_mgr.name_to_path(type).items():
uri = self._upload_mgr.uri(path)
yield '%s#%s' % (uri, name)

# TODO: upload_mgr is always self._upload_mgr, and _arg_hash_paths()
# hard-codes it anyway. Do we really want to pass it in?
def _upload_args(self, upload_mgr):
def _upload_args(self):
args = []

# TODO: does Hadoop have a way of coping with paths that have
# commas in their names?

file_hash_paths = list(self._arg_hash_paths('file', upload_mgr))
file_hash_paths = list(
self._arg_hash_paths('file', self._upload_mgr))

if file_hash_paths:
args.append('-files')
args.append(','.join(file_hash_paths))

archive_hash_paths = list(self._arg_hash_paths('archive', upload_mgr))
archive_hash_paths = list(
self._arg_hash_paths('archive', self._upload_mgr))

if archive_hash_paths:
args.append('-archives')
args.append(','.join(archive_hash_paths))
Expand Down

0 comments on commit e636bc4

Please sign in to comment.