Skip to content
This repository
Browse code

all runners now use Working/UploadDirManagers

  • Loading branch information...
commit 74a4ba564a976c9f64c8a00afb82721cb00a5cf1 1 parent 08df384
authored August 22, 2012
1  .gitignore
... ...
@@ -1,4 +1,5 @@
1 1
 .#*
  2
+*.egg
2 3
 *.pyc
3 4
 *~
4 5
 *.swp
120  mrjob/cmd.py
@@ -18,7 +18,7 @@
18 18
 files.
19 19
 
20 20
 This module provides two utility classes for keeping track of uploads.
21  
-Typically, you'll want to use ScratchDirManager to manage uploading files from
  21
+Typically, you'll want to use UploadDirManager to manage uploading files from
22 22
 a local machine to a place where Hadoop can see them (HDFS or S3) and
23 23
 WorkingDir to manage placing these files within your tasks' working
24 24
 directories (i.e. to help interface with Hadoop's DistributedCache system).
@@ -30,65 +30,89 @@
30 30
 import os.path
31 31
 import posixpath
32 32
 
  33
+from mrjob.parse import is_uri
  34
+
33 35
 
34 36
 log = logging.getLogger(__name__)
35 37
 
36 38
 
37  
-def parse_hash_path(hash_path, type=None, name_if_no_hash=False):
  39
+def parse_hash_path(hash_path):
38 40
     """Parse Hadoop Distributed Cache-style paths into a dictionary.
39 41
 
40 42
     For example:
41 43
     >>> parse_hash_path('/foo/bar.py#baz.py')
42 44
     {'path': '/foo/bar.py', 'name': 'baz.py', 'type': 'file'}
43 45
 
44  
-    *path* is the actual path of the file, *name* is the name of the symlink
45  
-    to it in the task's working directory, and *type* is either ``'file'``
46  
-    (an ordinary file) or ``'archive'`` (a file that Hadoop automatically
47  
-    unarchives into a directory).
48  
-
49 46
     A slash at the end of the name indicates an archive:
50 47
     >>> parse_hash_path('/foo/bar.tar.gz#baz/')
51 48
     {'path': '/foo/bar.py', 'name': 'baz.py', 'type': 'archive'}
52 49
 
53  
-    You can also explicitly specify *type*, to support options which
54  
-    always take files or archives (e.g. ``mr_job_script``, ``python_archive``).
55  
-
56 50
     An empty name (e.g. ``'/foo/bar.py#'``) indicates any name is acceptable.
  51
+    """
  52
+    if not '#' in hash_path:
  53
+        raise ValueError('Bad hash path %r, must contain #' % (hash_path,))
  54
+
  55
+    path, name = hash_path.split('#', 1)
  56
+
  57
+    if name.endswith('/'):
  58
+        type = 'archive'
  59
+        name = name[:-1]
  60
+    else:
  61
+        type = 'file'
  62
+
  63
+    if '/' in name or '#' in name:
  64
+        raise ValueError('Bad path %r; name must not contain # or /' % (path,))
57 65
 
58  
-    If there's no hash at all (e.g. ``'/foo/bar.py'``) we also assume any
59  
-    name is acceptable unless *name_if_no_hash* is set, in which case
60  
-    we attempt to match the path's filename.
  66
+    # use None for no name
  67
+    if not name:
  68
+        name = None
  69
+
  70
+    return {'path': path, 'name': name, 'type': type}
  71
+
  72
+
  73
+def parse_legacy_hash_path(self, type, path, must_name=None):
  74
+    """Parse hash paths from old setup/bootstrap options.
  75
+
  76
+    This is similar to :py:func:`parse_hash_path` except that we pass in
  77
+    path type explicitly, and we don't always require the hash.
  78
+
  79
+    :param type: Type of the path (``'archive'`` or ``'file'``)
  80
+    :param path: Path to parse, possibly with a ``#``
  81
+    :param must_name: If set, use *path*'s filename as its name if there
  82
+                      is no ``'#'`` in *path*, and raise an exception
  83
+                      if there is just a ``'#'`` with no name. Set *must_name*
  84
+                      to the name of the relevant option so we can print
  85
+                      a useful error message. This is intended for options
  86
+                      like ``upload_files`` that merely upload a file
  87
+                      without tracking it.
61 88
     """
62  
-    inferred_type = 'file'
63  
-    if '#' in hash_path:
64  
-        path, name = hash_path.split('#', 1)
  89
+    if '#' in path:
  90
+        path, name = path.split('#', 1)
65 91
 
66  
-        if name.endswith('/'):
67  
-            inferred_type = 'archive'
  92
+        # allow a slash after the name of an archive because that's
  93
+        # the new-way of specifying archive paths
  94
+        if name.endswith('/') and type == 'archive':
68 95
             name = name[:-1]
69 96
 
70 97
         if '/' in name or '#' in name:
71  
-            raise ValueError('Bad name %r; must not contain # or /' % name)
  98
+            raise ValueError(
  99
+                'Bad path %r; name must not contain # or /' % (path,))
72 100
 
73  
-        # empty names are okay
74 101
         if not name:
  102
+            if must_name:
  103
+                raise ValueError(
  104
+                    'Empty name makes no sense for %s: %r' % (must_name, path))
  105
+        else:
75 106
             name = None
76 107
     else:
77  
-        path = hash_path
78  
-        name = name_uniquely(hash_path)
79  
-
80  
-    if type:
81  
-        if type == 'file' and inferred_type == 'archive':
82  
-            log.warn("names for files shouldn't end with '/': %r" % hash_path)
83  
-        # don't require archives to have '/' at the end of their names;
84  
-        # this is new in v0.4.0
85  
-    else:
86  
-        type = inferred_type
  108
+        if must_name:
  109
+            name = os.path.basename(path)
  110
+        else:
  111
+            name = None
87 112
 
88 113
     return {'path': path, 'name': name, 'type': type}
89 114
 
90 115
 
91  
-
92 116
 def name_uniquely(path, names_taken=(), proposed_name=None):
93 117
     """Come up with a unique name for *path*.
94 118
 
@@ -120,16 +144,19 @@ def name_uniquely(path, names_taken=(), proposed_name=None):
120 144
             return name
121 145
 
122 146
 
123  
-class ScratchDirManager(object):
124  
-    """Represents a directory on HDFS or S3 where we want to park files
125  
-    for consumption by Hadoop.
  147
+class UploadDirManager(object):
  148
+    """Represents a directory on HDFS or S3 where we want to upload
  149
+    local files for consumption by Hadoop.
126 150
 
127  
-    :py:class:`ScratchDirManager` tries to give files the same name as their
  151
+    :py:class:`UploadDirManager` tries to give files the same name as their
128 152
     filename in the path (for ease of debugging), but handles collisions
129 153
     gracefully.
  154
+
  155
+    :py:class:`UploadDirManager` assumes URIs to not need to be uploaded
  156
+    and thus does not store them. :py:meth:`uri` maps URIs to themselves.
130 157
     """
131 158
     def __init__(self, prefix):
132  
-        """Make an :py:class`ScratchDirManager`.
  159
+        """Make an :py:class`UploadDirManager`.
133 160
 
134 161
         :param string prefix: The URI for the directory (e.g.
135 162
                               `s3://bucket/dir/`). It doesn't matter if
@@ -143,8 +170,12 @@ def __init__(self, prefix):
143 170
 
144 171
     def add(self, path):
145 172
         """Add a path. If *path* hasn't been added before, assign it a name.
  173
+                       If *path* is a URI don't add it; just return the URI.
146 174
 
147 175
         :return: the URI assigned to the path"""
  176
+        if is_uri(path):
  177
+            return path
  178
+
148 179
         if path not in self._path_to_name:
149 180
             name = name_uniquely(path, names_taken=self._names_taken)
150 181
             self._names_taken.add(name)
@@ -153,15 +184,15 @@ def add(self, path):
153 184
         return self.uri(path)
154 185
 
155 186
     def uri(self, path):
156  
-        """Get the URI for the given path. If it's a path we don't know
157  
-        about, just return *path*.
158  
-
159  
-        (This makes it simpler to skip uploading URIs.)
  187
+        """Get the URI for the given path. If *path* is a URI, just return it.
160 188
         """
  189
+        if is_uri(path):
  190
+            return path
  191
+
161 192
         if path in self._path_to_name:
162 193
             return posixpath.join(self.prefix, self._path_to_name[path])
163 194
         else:
164  
-            return path
  195
+            raise ValueError('%r is not a URI or a known local file')
165 196
 
166 197
     def path_to_uri(self):
167 198
         """Get a map from path to URI for all paths that were added,
@@ -307,3 +338,10 @@ def _desc(self, type, path, name=''):
307 338
         """
308 339
         # this will almost certainly get broken out into another function
309 340
         return '%s#%s%s' % (path, name, '/' if type == 'archive' else '')
  341
+
  342
+
  343
+class BootstrapWorkingDirManager(WorkingDirManager):
  344
+    """Manage the working dir for the master bootstrap script. Identical
  345
+    to :py:class:`WorkingDirManager` except that it doesn't support archives.
  346
+    """
  347
+    _SUPPORTED_TYPES = ('file',)
301  mrjob/emr.py
@@ -55,7 +55,10 @@
55 55
     boto = None
56 56
 
57 57
 import mrjob
58  
-from mrjob import compat
  58
+from mrjob.cmd import BootstrapWorkingDirManager
  59
+from mrjob.cmd import UploadDirManager
  60
+from mrjob.cmd import parse_legacy_hash_path
  61
+from mrjob.compat import supports_new_distributed_cache_options
59 62
 from mrjob.conf import combine_cmds
60 63
 from mrjob.conf import combine_dicts
61 64
 from mrjob.conf import combine_lists
@@ -72,6 +75,7 @@
72 75
 from mrjob.logparsers import NODE_LOG_URI_RE
73 76
 from mrjob.logparsers import scan_for_counters_in_files
74 77
 from mrjob.logparsers import scan_logs_in_order
  78
+from mrjob.parse import is_uri
75 79
 from mrjob.parse import is_s3_uri
76 80
 from mrjob.parse import parse_s3_uri
77 81
 from mrjob.pool import est_time_to_hour
@@ -576,6 +580,14 @@ def __init__(self, **kwargs):
576 580
         else:
577 581
             self._output_dir = self._s3_tmp_uri + 'output/'
578 582
 
  583
+        # manage working dir for bootstrap script
  584
+        self._b_mgr = BootstrapWorkingDirManager()
  585
+
  586
+        # manage local files that we want to upload to S3. We'll add them
  587
+        # to this manager just before we need them.
  588
+        s3_files_dir = self._s3_tmp_uri + 'files/'
  589
+        self._upload_mgr = UploadDirManager(s3_files_dir)
  590
+
579 591
         # add the bootstrap files to a list of files to upload
580 592
         self._bootstrap_actions = []
581 593
         for action in self._opts['bootstrap_actions']:
@@ -583,42 +595,36 @@ def __init__(self, **kwargs):
583 595
             if not args:
584 596
                 raise ValueError('bad bootstrap action: %r' % (action,))
585 597
             # don't use _add_bootstrap_file() because this is a raw bootstrap
586  
-            # action, not part of mrjob's bootstrap utilities
587  
-            file_dict = self._add_file(args[0])
588  
-            file_dict['args'] = args[1:]
589  
-            self._bootstrap_actions.append(file_dict)
  598
+            self._bootstrap_actions.append({
  599
+                'path': args[0],
  600
+                'args': args[1:],
  601
+            })
590 602
 
591 603
         for path in self._opts['bootstrap_files']:
592  
-            self._add_bootstrap_file(path)
  604
+            self._b_mgr.add(**parse_legacy_hash_path(
  605
+                'file', path, must_name='bootstrap_files'))
593 606
 
594 607
         self._bootstrap_scripts = []
595 608
         for path in self._opts['bootstrap_scripts']:
596  
-            file_dict = self._add_bootstrap_file(path)
597  
-            self._bootstrap_scripts.append(file_dict)
  609
+            bootstrap_script = parse_legacy_hash_path('file', path)
  610
+            self._bootstrap_scripts.append(bootstrap_script)
  611
+            self._b_mgr.add(**bootstrap_script)
598 612
 
599 613
         self._bootstrap_python_packages = []
600 614
         for path in self._opts['bootstrap_python_packages']:
601  
-            name, path = self._split_path(path)
602  
-            if not path.endswith('.tar.gz'):
  615
+            bpp = parse_legacy_hash_path('file', path)
  616
+
  617
+            if not bpp['path'].endswith('.tar.gz'):
603 618
                 raise ValueError(
604 619
                     'bootstrap_python_packages only accepts .tar.gz files!')
605  
-            file_dict = self._add_bootstrap_file(path)
606  
-            self._bootstrap_python_packages.append(file_dict)
607  
-
608  
-        self._streaming_jar = None
609  
-        if self._opts.get('hadoop_streaming_jar'):
610  
-            self._streaming_jar = self._add_file_for_upload(
611  
-                self._opts['hadoop_streaming_jar'])
  620
+            self._bootstrap_python_packages.append(bpp)
  621
+            self._b_mgr.add(**bpp)
612 622
 
613 623
         if not (isinstance(self._opts['additional_emr_info'], basestring)
614 624
                 or self._opts['additional_emr_info'] is None):
615 625
             self._opts['additional_emr_info'] = json.dumps(
616 626
                 self._opts['additional_emr_info'])
617 627
 
618  
-        # if we're bootstrapping mrjob, keep track of the file_dict
619  
-        # for mrjob.tar.gz
620  
-        self._mrjob_tar_gz_file = None
621  
-
622 628
         # where our own logs ended up (we'll find this out once we run the job)
623 629
         self._s3_job_log_uri = None
624 630
 
@@ -627,7 +633,7 @@ def __init__(self, **kwargs):
627 633
         self._s3_input_uris = None
628 634
 
629 635
         # we'll create the script later
630  
-        self._master_bootstrap_script = None
  636
+        self._master_bootstrap_script_path = None
631 637
 
632 638
         # the ID assigned by EMR to this job (might be None)
633 639
         self._emr_job_flow_id = self._opts['emr_job_flow_id']
@@ -820,89 +826,76 @@ def _run(self):
820 826
         self._wait_for_job_to_complete()
821 827
 
822 828
     def _prepare_for_launch(self):
823  
-        self._setup_input()
  829
+        self._check_input_exists()
824 830
         self._create_wrapper_script()
825  
-        self._create_master_bootstrap_script()
826  
-        self._upload_non_input_files()
827  
-
828  
-    def _setup_input(self):
829  
-        """Copy local input files (if any) to a special directory on S3.
  831
+        self._add_bootstrap_files_for_upload()
  832
+        self._add_job_files_for_upload()
  833
+        self._upload_local_files_to_s3()
830 834
 
831  
-        Set self._s3_input_uris
832  
-        Helper for _run
  835
+    def _check_input_exists(self):
  836
+        """Make sure all input exists before continuing with our job.
833 837
         """
834  
-        self._create_s3_temp_bucket_if_needed()
835  
-        # winnow out s3 files from local ones
836  
-        self._s3_input_uris = []
837  
-        local_input_paths = []
838 838
         for path in self._input_paths:
839  
-            if is_s3_uri(path):
840  
-                # Don't even bother running the job if the input isn't there,
841  
-                # since it's costly to spin up instances.
842  
-                if not self.path_exists(path):
843  
-                    raise AssertionError(
844  
-                        'Input path %s does not exist!' % (path,))
845  
-                self._s3_input_uris.append(path)
846  
-            else:
847  
-                local_input_paths.append(path)
  839
+            if path == '-':
  840
+                continue  # STDIN always exists
848 841
 
849  
-        # copy local files into an input directory, with names like
850  
-        # 00000-actual_name.ext
851  
-        if local_input_paths:
852  
-            s3_input_dir = self._s3_tmp_uri + 'input/'
853  
-            log.info('Uploading input to %s' % s3_input_dir)
  842
+            if is_uri(path) and not is_s3_uri(path):
  843
+                continue  # can't check non-S3 URIs, hope for the best
854 844
 
855  
-            s3_conn = self.make_s3_conn()
856  
-            for file_num, path in enumerate(local_input_paths):
857  
-                if path == '-':
858  
-                    path = self._dump_stdin_to_local_file()
  845
+            if not self.path_exists(path):
  846
+                raise AssertionError(
  847
+                    'Input path %s does not exist!' % (path,))
859 848
 
860  
-                target = '%s%05d-%s' % (
861  
-                    s3_input_dir, file_num, os.path.basename(path))
862  
-                log.debug('uploading %s -> %s' % (path, target))
863  
-                s3_key = self.make_s3_key(target, s3_conn)
864  
-                s3_key.set_contents_from_filename(path)
  849
+    def _add_bootstrap_files_for_upload(self):
  850
+        """Add files needed by the bootstrap script to self._upload_mgr.
865 851
 
866  
-            self._s3_input_uris.append(s3_input_dir)
  852
+        Tar up mrjob if bootstrap_mrjob is True.
867 853
 
868  
-    def _add_bootstrap_file(self, path):
869  
-        name, path = self._split_path(path)
870  
-        file_dict = {'path': path, 'name': name, 'bootstrap': 'file'}
871  
-        self._files.append(file_dict)
872  
-        return file_dict
  854
+        Create the master bootstrap script if necessary.
  855
+        """
  856
+        # lazlily create mrjob.tar.gz
  857
+        if self._opts['bootstrap_mrjob']:
  858
+            self._create_mrjob_tar_gz()
  859
+            self._b_mgr.add('file', self._mrjob_tar_gz_path)
873 860
 
874  
-    def _pick_s3_uris_for_files(self):
875  
-        """Decide where each file will be uploaded on S3.
  861
+        # all other files needed by the script are already in _b_mgr
  862
+        for path in self._b_mgr.paths():
  863
+            self._upload_mgr.add(path)
876 864
 
877  
-        Okay to call this multiple times.
878  
-        """
879  
-        self._assign_unique_names_to_files(
880  
-            's3_uri', prefix=self._s3_tmp_uri + 'files/',
881  
-            match=is_s3_uri)
  865
+        # now that we know where the above files live, we can create
  866
+        # the master bootstrap script
  867
+        self._create_master_bootstrap_script_if_needed()
  868
+        if self._master_bootstrap_script_path:
  869
+            self._upload_mgr.add(self._master_bootstrap_script_path)
882 870
 
883  
-    def _upload_non_input_files(self):
884  
-        """Copy files to S3
  871
+        # finally, make sure bootstrap action scripts are on S3
  872
+        for bootstrap_action in self._bootstrap_actions:
  873
+            self._upload_mgr.add(bootstrap_action['path'])
885 874
 
886  
-        Pick S3 URIs for them if we haven't already."""
887  
-        self._create_s3_temp_bucket_if_needed()
888  
-        self._pick_s3_uris_for_files()
  875
+    def _add_job_files_for_upload(self):
  876
+        """Add files needed for running the job (setup and input)
  877
+        to self._upload_mgr."""
  878
+        for path in self._get_input_paths():
  879
+            self._upload_mgr.add(path)
889 880
 
890  
-        s3_files_dir = self._s3_tmp_uri + 'files/'
891  
-        log.info('Copying non-input files into %s' % s3_files_dir)
  881
+        for path in self._wd_mgr.paths():
  882
+            self._upload_mgr.add(path)
892 883
 
893  
-        s3_conn = self.make_s3_conn()
894  
-        for file_dict in self._files:
895  
-            path = file_dict['path']
  884
+        if self._opts['hadoop_streaming_jar']:
  885
+            self._upload_mgr.add(path)
896 886
 
897  
-            # don't bother with files that are already on s3
898  
-            if is_s3_uri(path):
899  
-                continue
  887
+    def _upload_local_files_to_s3(self):
  888
+        """Copy local files tracked by self._upload_mgr to S3."""
  889
+        self._create_s3_temp_bucket_if_needed()
900 890
 
901  
-            s3_uri = file_dict['s3_uri']
  891
+        log.info('Copying non-input files into %s' % self._upload_mgr.prefix)
902 892
 
  893
+        s3_conn = self.make_s3_conn()
  894
+
  895
+        for path, s3_uri in self._upload_mgr.path_to_uri().iteritems():
903 896
             log.debug('uploading %s -> %s' % (path, s3_uri))
904 897
             s3_key = self.make_s3_key(s3_uri, s3_conn)
905  
-            s3_key.set_contents_from_filename(file_dict['path'])
  898
+            s3_key.set_contents_from_filename(path)
906 899
 
907 900
     def setup_ssh_tunnel_to_job_tracker(self, host):
908 901
         """setup the ssh tunnel to the job tracker, if it's not currently
@@ -1164,10 +1157,6 @@ def _create_job_flow(self, persistent=False, steps=None):
1164 1157
         # make sure we can see the files we copied to S3
1165 1158
         self._wait_for_s3_eventual_consistency()
1166 1159
 
1167  
-        # figure out local names and S3 URIs for our bootstrap actions, if any
1168  
-        self._name_files()
1169  
-        self._pick_s3_uris_for_files()
1170  
-
1171 1160
         log.info('Creating Elastic MapReduce job flow')
1172 1161
         args = self._job_flow_args(persistent, steps)
1173 1162
 
@@ -1238,15 +1227,13 @@ def _job_flow_args(self, persistent=False, steps=None):
1238 1227
         # bootstrap actions
1239 1228
         bootstrap_action_args = []
1240 1229
 
1241  
-        for file_dict in self._bootstrap_actions:
1242  
-            # file_dict is not populated the same way by tools and real job
1243  
-            # runs, so use s3_uri or path as appropriate
1244  
-            s3_uri = file_dict.get('s3_uri', None) or file_dict['path']
  1230
+        for i, bootstrap_action in enumerate(self._bootstrap_actions):
  1231
+            s3_uri = self._upload_mgr.uri(bootstrap_action['path'])
1245 1232
             bootstrap_action_args.append(
1246 1233
                 boto.emr.BootstrapAction(
1247  
-                file_dict['name'], s3_uri, file_dict['args']))
  1234
+                'action %d' % i, s3_uri, bootstrap_action['args']))
1248 1235
 
1249  
-        if self._master_bootstrap_script:
  1236
+        if self._master_bootstrap_script_path:
1250 1237
             master_bootstrap_script_args = []
1251 1238
             if self._opts['pool_emr_job_flows']:
1252 1239
                 master_bootstrap_script_args = [
@@ -1281,18 +1268,6 @@ def _job_flow_args(self, persistent=False, steps=None):
1281 1268
     def _build_steps(self):
1282 1269
         """Return a list of boto Step objects corresponding to the
1283 1270
         steps we want to run."""
1284  
-        assert self._script  # can't build steps if no script!
1285  
-
1286  
-        # figure out local names for our files
1287  
-        self._name_files()
1288  
-        self._pick_s3_uris_for_files()
1289  
-
1290  
-        # we're going to instruct EMR to upload the MR script and the
1291  
-        # wrapper script (if any) to the job's local directory
1292  
-        self._script['upload'] = 'file'
1293  
-        if self._wrapper_script:
1294  
-            self._wrapper_script['upload'] = 'file'
1295  
-
1296 1271
         # quick, add the other steps before the job spins up and
1297 1272
         # then shuts itself down (in practice this takes several minutes)
1298 1273
         steps = self._get_steps()
@@ -1352,6 +1327,12 @@ def _build_jar_step(self, step):
1352 1327
             step_args=step['step_args'],
1353 1328
             action_on_failure=self._action_on_failure)
1354 1329
 
  1330
+
  1331
+    def _upload_hash_paths(self, type):
  1332
+        for name, path in self._wd_mgr.name_to_path(type).iteritems():
  1333
+            uri = self._upload_mgr.uri(path)
  1334
+            yield '%s#%s' % (uri, name)
  1335
+
1355 1336
     def _cache_kwargs(self):
1356 1337
         """Returns
1357 1338
         ``{'step_args': [..], 'cache_files': [..], 'cache_archives': [..])``,
@@ -1373,36 +1354,16 @@ def _cache_kwargs(self):
1373 1354
         cache_files = []
1374 1355
         cache_archives = []
1375 1356
 
1376  
-        if compat.supports_new_distributed_cache_options(version):
  1357
+        if supports_new_distributed_cache_options(version):
1377 1358
             # boto doesn't support non-deprecated 0.20 options, so insert
1378 1359
             # them ourselves
1379  
-
1380  
-            def escaped_paths(file_dicts):
1381  
-                # return list of strings to join with commas and pass to the
1382  
-                # hadoop binary
1383  
-                return ["%s#%s" % (fd['s3_uri'], fd['name'])
1384  
-                        for fd in file_dicts]
1385  
-
1386  
-            # index by type
1387  
-            all_files = {}
1388  
-            for fd in self._files:
1389  
-                all_files.setdefault(fd.get('upload'), []).append(fd)
1390  
-
1391  
-            if 'file' in all_files:
1392  
-                step_args.append('-files')
1393  
-                step_args.append(','.join(escaped_paths(all_files['file'])))
1394  
-
1395  
-            if 'archive' in all_files:
1396  
-                step_args.append('-archives')
1397  
-                step_args.append(','.join(escaped_paths(all_files['archive'])))
  1360
+            step_args.append(
  1361
+                self._new_distributed_cache_args(self._upload_mgr))
1398 1362
         else:
1399  
-            for file_dict in self._files:
1400  
-                if file_dict.get('upload') == 'file':
1401  
-                    cache_files.append(
1402  
-                        '%s#%s' % (file_dict['s3_uri'], file_dict['name']))
1403  
-                elif file_dict.get('upload') == 'archive':
1404  
-                    cache_archives.append(
1405  
-                        '%s#%s' % (file_dict['s3_uri'], file_dict['name']))
  1363
+            cache_files.extend(
  1364
+                self._arg_hash_paths('file', self._upload_mgr))
  1365
+            cache_archives.extend(
  1366
+                self._arg_hash_paths('archive', self._upload_mgr))
1406 1367
 
1407 1368
         return {
1408 1369
             'step_args': step_args,
@@ -1411,11 +1372,8 @@ def escaped_paths(file_dicts):
1411 1372
         }
1412 1373
 
1413 1374
     def _get_jar(self):
1414  
-        self._name_files()
1415  
-        self._pick_s3_uris_for_files()
1416  
-
1417  
-        if self._streaming_jar:
1418  
-            return self._streaming_jar['s3_uri']
  1375
+        if self._opts['hadoop_streaming_jar']:
  1376
+            return self._upload_mgr.uri(self._opts['hadoop_streaming_jar'])
1419 1377
         else:
1420 1378
             return self._opts['hadoop_streaming_jar_on_emr']
1421 1379
 
@@ -1827,15 +1785,19 @@ def _find_probable_cause_of_failure_s3(self, step_nums):
1827 1785
 
1828 1786
     ### Bootstrapping ###
1829 1787
 
1830  
-    def _create_master_bootstrap_script(self, dest='b.py'):
  1788
+    def _create_master_bootstrap_script_if_needed(self, dest='b.py'):
1831 1789
         """Create the master bootstrap script and write it into our local
1832  
-        temp directory.
  1790
+        temp directory. Set self._master_bootstrap_script_path.
1833 1791
 
1834 1792
         This will do nothing if there are no bootstrap scripts or commands,
1835  
-        or if _create_master_bootstrap_script() has already been called."""
  1793
+        or if it has already been called."""
  1794
+
1836 1795
         # we call the script b.py because there's a character limit on
1837 1796
         # bootstrap script names (or there was at one time, anyway)
1838 1797
 
  1798
+        if self._master_bootstrap_script_path:
  1799
+            return
  1800
+
1839 1801
         if not any(key.startswith('bootstrap_') and value
1840 1802
                    for (key, value) in self._opts.iteritems()):
1841 1803
             return
@@ -1854,15 +1816,6 @@ def _create_master_bootstrap_script(self, dest='b.py'):
1854 1816
                 for (key, value) in self._opts.iteritems())):
1855 1817
             return
1856 1818
 
1857  
-        if self._opts['bootstrap_mrjob']:
1858  
-            if self._mrjob_tar_gz_file is None:
1859  
-                self._mrjob_tar_gz_file = self._add_bootstrap_file(
1860  
-                    self._create_mrjob_tar_gz() + '#')
1861  
-
1862  
-        # need to know what files are called
1863  
-        self._name_files()
1864  
-        self._pick_s3_uris_for_files()
1865  
-
1866 1819
         path = os.path.join(self._get_local_tmp_dir(), dest)
1867 1820
         log.info('writing master bootstrap script to %s' % path)
1868 1821
 
@@ -1874,21 +1827,15 @@ def _create_master_bootstrap_script(self, dest='b.py'):
1874 1827
         f.write(contents)
1875 1828
         f.close()
1876 1829
 
1877  
-        name, _ = self._split_path(path)
1878  
-        self._master_bootstrap_script = {'path': path, 'name': name}
1879  
-        self._files.append(self._master_bootstrap_script)
  1830
+        self._master_bootstrap_script_path = path
1880 1831
 
1881 1832
     def _master_bootstrap_script_content(self):
1882 1833
         """Create the contents of the master bootstrap script.
1883  
-
1884  
-        This will give names and S3 URIs to files that don't already have them.
1885  
-
1886  
-        This function does NOT pick S3 URIs for files or anything like
1887  
-        that; _create_master_bootstrap_script() is responsible for that.
1888 1834
         """
1889 1835
         out = StringIO()
1890 1836
 
1891  
-        python_bin_in_list = ', '.join(repr(opt) for opt in self._opts['python_bin'])
  1837
+        python_bin_in_list = ', '.join(
  1838
+            repr(arg) for arg in self._opts['python_bin'])
1892 1839
 
1893 1840
         def writeln(line=''):
1894 1841
             out.write(line + '\n')
@@ -1910,28 +1857,29 @@ def writeln(line=''):
1910 1857
 
1911 1858
         # download files using hadoop fs
1912 1859
         writeln('# download files using hadoop fs -copyToLocal')
1913  
-        for file_dict in self._files:
1914  
-            if file_dict.get('bootstrap'):
1915  
-                writeln(
1916  
-                    "check_call(['hadoop', 'fs', '-copyToLocal', %r, %r])" %
1917  
-                    (file_dict['s3_uri'], file_dict['name']))
  1860
+        for name, path in self._b_mgr.name_to_path('file').iteritems():
  1861
+            s3_uri = self._upload_mgr.uri(path)
  1862
+            writeln(
  1863
+                "check_call(['hadoop', 'fs', '-copyToLocal', %r, %r])" %
  1864
+                (s3_uri, name))
1918 1865
         writeln()
1919 1866
 
1920 1867
         # make scripts executable
1921 1868
         if self._bootstrap_scripts:
1922 1869
             writeln('# make bootstrap scripts executable')
1923  
-            for file_dict in self._bootstrap_scripts:
  1870
+            for path_dict in self._bootstrap_scripts:
1924 1871
                 writeln("check_call(['chmod', 'a+rx', %r])" %
1925  
-                        file_dict['name'])
  1872
+                        path_dict['name'])
1926 1873
             writeln()
1927 1874
 
1928 1875
         # bootstrap mrjob
1929 1876
         if self._opts['bootstrap_mrjob']:
  1877
+            name = self._b_mgr.name('file', self._mrjob_tar_gz_path)
1930 1878
             writeln('# bootstrap mrjob')
1931 1879
             writeln("site_packages = distutils.sysconfig.get_python_lib()")
1932 1880
             writeln(
1933 1881
                 "check_call(['sudo', 'tar', 'xfz', %r, '-C', site_packages])" %
1934  
-                self._mrjob_tar_gz_file['name'])
  1882
+                name)
1935 1883
             # re-compile pyc files now, since mappers/reducers can't
1936 1884
             # write to this directory. Don't fail if there is extra
1937 1885
             # un-compileable crud in the tarball.
@@ -1944,12 +1892,11 @@ def writeln(line=''):
1944 1892
         # install our python modules
1945 1893
         if self._bootstrap_python_packages:
1946 1894
             writeln('# install python modules:')
1947  
-            for file_dict in self._bootstrap_python_packages:
1948  
-                writeln("check_call(['tar', 'xfz', %r])" %
1949  
-                        file_dict['name'])
  1895
+            for path_dict in self._bootstrap_python_packages:
  1896
+                writeln("check_call(['tar', 'xfz', %r])" % path_dict['name'])
1950 1897
                 # figure out name of dir to CD into
1951  
-                assert file_dict['path'].endswith('.tar.gz')
1952  
-                cd_into = extract_dir_for_tar(file_dict['path'])
  1898
+                assert path_dict['path'].endswith('.tar.gz')
  1899
+                cd_into = extract_dir_for_tar(path_dict['path'])
1953 1900
                 # install the module
1954 1901
                 writeln("check_call(["
1955 1902
                         "'sudo', %s, 'setup.py', 'install'], cwd=%r)" %
@@ -1968,9 +1915,9 @@ def writeln(line=''):
1968 1915
         # run our scripts
1969 1916
         if self._bootstrap_scripts:
1970 1917
             writeln('# run bootstrap scripts:')
1971  
-            for file_dict in self._bootstrap_scripts:
  1918
+            for path_dict in self._bootstrap_scripts:
1972 1919
                 writeln('check_call(%r)' % (
1973  
-                    ['./' + file_dict['name']],))
  1920
+                    ['./' + path_dict['name']],))
1974 1921
             writeln()
1975 1922
 
1976 1923
         return out.getvalue()
@@ -1990,8 +1937,8 @@ def make_persistent_job_flow(self):
1990 1937
 
1991 1938
         log.info('Creating persistent job flow to run several jobs in...')
1992 1939
 
1993  
-        self._create_master_bootstrap_script()
1994  
-        self._upload_non_input_files()
  1940
+        self._add_bootstrap_files_for_upload()
  1941
+        self._upload_local_files_to_s3()
1995 1942
 
1996 1943
         # don't allow user to call run()
1997 1944
         self._ran_job = True
79  mrjob/hadoop.py
@@ -21,7 +21,7 @@
21 21
 from subprocess import PIPE
22 22
 from subprocess import CalledProcessError
23 23
 
24  
-from mrjob.cmd import ScratchDirManager
  24
+from mrjob.cmd import UploadDirManager
25 25
 from mrjob.compat import supports_new_distributed_cache_options
26 26
 from mrjob.conf import combine_cmds
27 27
 from mrjob.conf import combine_dicts
@@ -35,7 +35,6 @@
35 35
 from mrjob.logparsers import scan_for_counters_in_files
36 36
 from mrjob.logparsers import scan_logs_in_order
37 37
 from mrjob.parse import HADOOP_STREAMING_JAR_RE
38  
-from mrjob.parse import is_uri
39 38
 from mrjob.runner import MRJobRunner
40 39
 from mrjob.runner import RunnerOptionStore
41 40
 from mrjob.util import cmd_line
@@ -174,9 +173,10 @@ def __init__(self, **kwargs):
174 173
             posixpath.join(
175 174
             self._opts['hdfs_scratch_dir'], self._job_name))
176 175
 
177  
-        # Keep track of local files uploaded to Hadoop. This is filled
178  
-        # by _upload_local_files_to_hdfs()
179  
-        self._upload_mgr = None
  176
+        # Keep track of local files to upload to HDFS. We'll add them
  177
+        # to this manager just before we need them.
  178
+        hdfs_files_dir = posixpath.join(self._hdfs_scratch_dir, 'files', '')
  179
+        self._upload_mgr = UploadDirManager(hdfs_files_dir)
180 180
 
181 181
         # Set output dir if it wasn't set explicitly
182 182
         self._output_dir = fully_qualify_hdfs_path(
@@ -225,6 +225,7 @@ def _run(self):
225 225
             self._add_python_archive(self._create_mrjob_tar_gz())
226 226
 
227 227
         self._check_input_exists()
  228
+        self._create_wrapper_script()
228 229
         self._upload_local_files_to_hdfs()
229 230
         self._run_job_in_hadoop()
230 231
 
@@ -233,28 +234,22 @@ def _check_input_exists(self):
233 234
         """
234 235
         for path in self._input_paths:
235 236
             if path == '-':
236  
-                continue
  237
+                continue  # STDIN always exists
237 238
 
238  
-            # Don't even bother running the job if the input isn't there.
239  
-            if not self.ls(path):
  239
+            if not self.path_exists(path):
240 240
                 raise AssertionError(
241 241
                     'Input path %s does not exist!' % (path,))
242 242
 
243 243
     def _upload_local_files_to_hdfs(self):
244 244
         """Copy files to HDFS, and set the 'hdfs_uri' field for each file.
245 245
         """
246  
-        assert not self._upload_mgr
247  
-
248  
-        hdfs_files_dir = posixpath.join(self._hdfs_scratch_dir, 'files', '')
249  
-        self._mkdir_on_hdfs(hdfs_files_dir)
250  
-        self._upload_mgr = ScratchDirManager(hdfs_files_dir)
  246
+        # choose paths to upload
  247
+        for path in self._get_input_paths() + list(self._wd_mgr.paths()):
  248
+            self._upload_mgr.add(path)
251 249
 
252  
-        for path_dict in self._get_input_paths() + list(self._wd_mgr.paths()):
253  
-            path = path_dict['path']
254  
-            if not is_uri(path):
255  
-                self._upload_mgr.add(path)
  250
+        self._mkdir_on_hdfs(self._upload_mgr.prefix)
256 251
 
257  
-        log.info('Copying local files into %s' % hdfs_files_dir)
  252
+        log.info('Copying local files into %s' % self._upload_mgr.prefix)
258 253
         for path, uri in self._upload_mgr.path_to_uri().iteritems():
259 254
             self._upload_to_hdfs(path, uri)
260 255
 
@@ -357,7 +352,8 @@ def _streaming_args(self, step, step_num, num_steps):
357 352
         # -files/-archives (generic options, new-style)
358 353
         if supports_new_distributed_cache_options(version):
359 354
             # set up uploading from HDFS to the working dir
360  
-            streaming_args.extend(self._new_upload_args())
  355
+            streaming_args.extend(
  356
+                self._new_distributed_cache_args(self._upload_mgr))
361 357
 
362 358
         # Add extra hadoop args first as hadoop args could be a hadoop
363 359
         # specific argument (e.g. -libjar) which must come before job
@@ -375,7 +371,8 @@ def _streaming_args(self, step, step_num, num_steps):
375 371
         # -cacheFile/-cacheArchive (streaming options, old-style)
376 372
         if not supports_new_distributed_cache_options(version):
377 373
             # set up uploading from HDFS to the working dir
378  
-            streaming_args.extend(self._old_upload_args())
  374
+            streaming_args.extend(
  375
+                self._old_distributed_cache_args(self._upload_mgr))
379 376
 
380 377
         mapper, combiner, reducer = (
381 378
             self._hadoop_streaming_commands(step, step_num))
@@ -411,48 +408,6 @@ def _hdfs_step_output_dir(self, step_num):
411 408
             return posixpath.join(
412 409
                 self._hdfs_scratch_dir, 'step-output', str(step_num + 1))
413 410
 
414  
-    def _upload_hash_paths(self, type):
415  
-        for name, path in self._wd_mgr.name_to_path(type).iteritems():
416  
-            uri = self._upload_mgr.uri(path)
417  
-            yield '%s#%s' % (uri, name)
418  
-
419  
-    def _new_upload_args(self):
420  
-        """Args to upload files from HDFS to the hadoop nodes using new
421  
-        distributed cache options.
422  
-        """
423  
-        args = []
424  
-
425  
-        # TODO: does Hadoop have a way of coping with paths that have
426  
-        # commas in their names?
427  
-
428  
-        file_hash_paths = list(self._upload_hash_paths('file'))
429  
-        if file_hash_paths:
430  
-            args.append('-files')
431  
-            args.append(','.join(file_hash_paths))
432  
-
433  
-        archive_hash_paths = list(self._upload_hash_paths('archive'))
434  
-        if archive_hash_paths:
435  
-            args.append('-archives')
436  
-            args.append(','.join(archive_hash_paths))
437  
-
438  
-        return args
439  
-
440  
-    def _old_upload_args(self):
441  
-        """Args to upload files from HDFS to the hadoop nodes using old cache
442  
-        options.
443  
-        """
444  
-        args = []
445  
-
446  
-        for file_hash in self._upload_hash_paths('file'):
447  
-            args.append('-cacheFile')
448  
-            args.append(file_hash)
449  
-
450  
-        for archive_hash in self._upload_hash_paths('archive'):
451  
-            args.append('-cacheArchive')
452  
-            args.append(archive_hash)
453  
-
454  
-        return args
455  
-
456 411
     def _cleanup_local_scratch(self):
457 412
         super(HadoopJobRunner, self)._cleanup_local_scratch()
458 413
 
2  mrjob/inline.py
@@ -125,8 +125,6 @@ def _check_step_is_mrjob_only(self, step_dict):
125 125
     def _run(self):
126 126
         self._setup_output_dir()
127 127
 
128  
-        assert self._script  # shouldn't be able to run if no script
129  
-
130 128
         for ignored_opt in self.IGNORED_HADOOP_OPTS:
131 129
             if ((not self._opts.is_default(ignored_opt)) and
132 130
                 self._opts[ignored_opt]):
4  mrjob/local.py
@@ -517,9 +517,9 @@ def _filter_if_any(self, substep_dict):
517 517
     def _executable(self, steps=False):
518 518
         # detect executable files so we can discard the explicit interpreter if
519 519
         # possible
520  
-        if os.access(self._script['path'], os.X_OK):
  520
+        if os.access(self._script_path, os.X_OK):
521 521
             return [os.path.join(self._working_dir,
522  
-                                 self._wd_mgr.name(**self._script))]
  522
+                                 self._wd_mgr.name('file', self._script_path))]
523 523
         else:
524 524
             return super(LocalMRJobRunner, self)._executable(steps)
525 525
 
110  mrjob/runner.py
@@ -42,7 +42,7 @@
42 42
     import json
43 43
 
44 44
 from mrjob.cmd import WorkingDirManager
45  
-from mrjob.cmd import parse_hash_path
  45
+from mrjob.cmd import parse_legacy_hash_path
46 46
 from mrjob.compat import supports_combiners_in_hadoop_streaming
47 47
 from mrjob.compat import uses_generic_jobconf
48 48
 from mrjob.conf import combine_cmds
@@ -325,21 +325,19 @@ def __init__(self, mr_job_script=None, conf_path=None,
325 325
 
326 326
         self._wd_mgr = WorkingDirManager()
327 327
 
328  
-        if mr_job_script:
329  
-            self._script = parse_hash_path(mr_job_script, type='file')
330  
-            self._wd_mgr.add(**self._script)
331  
-        else:
332  
-            self._script = None
  328
+        self._script_path = mr_job_script
  329
+        if self._script_path:
  330
+            self._wd_mgr.add('file', self._script_path)
333 331
 
334 332
         # setup cmds and wrapper script
335 333
         self._setup_scripts = []
336  
-        for hash_path in self._opts['setup_scripts']:
337  
-            setup_script = parse_hash_path(hash_path, type='file')
  334
+        for path in self._opts['setup_scripts']:
  335
+            setup_script = parse_legacy_hash_path('file', path)
338 336
             self._setup_scripts.append(setup_script)
339 337
             self._wd_mgr.add(**setup_script)
340 338
 
341 339
         # we'll create the wrapper script later
342  
-        self._wrapper_script = None
  340
+        self._wrapper_script_path = None
343 341
 
344 342
         # extra args to our job
345 343
         self._extra_args = list(extra_args) if extra_args else []
@@ -348,22 +346,22 @@ def __init__(self, mr_job_script=None, conf_path=None,
348 346
         self._file_upload_args = []
349 347
         if file_upload_args:
350 348
             for arg, path in file_upload_args:
351  
-                arg_file = parse_hash_path(path, type='file')
  349
+                arg_file = parse_legacy_hash_path('file', path)
352 350
                 self._wd_mgr.add(**arg_file)
353 351
                 self._file_upload_args.append((arg, arg_file))
354 352
 
355 353
         # set up uploading
356  
-        for hash_path in self._opts['upload_archives']:
357  
-            self._wd_mgr.add(**parse_hash_path(
358  
-                path, type='archive', name_if_no_hash=True))
359  
-        for hash_path in self._opts['upload_files']:
360  
-            self._wd_mgr.add(**parse_hash_path(
361  
-                path, type='file', name_if_no_hash=True))
  354
+        for path in self._opts['upload_files']:
  355
+            self._wd_mgr.add(**parse_legacy_hash_path(
  356
+                'file', path, must_name='upload_files'))
  357
+        for path in self._opts['upload_archives']:
  358
+            self._wd_mgr.add(**parse_legacy_hash_path(
  359
+                'archive', path, must_name='upload_archives'))
362 360
 
363 361
         # set up python archives
364 362
         self._python_archives = []
365  
-        for hash_path in self._opts['python_archives']:
366  
-            self._add_python_archive(hash_path)
  363
+        for path in self._opts['python_archives']:
  364
+            self._add_python_archive(path)
367 365
 
368 366
         # Where to read input from (log files, etc.)
369 367
         self._input_paths = input_paths or ['-']  # by default read from stdin
@@ -425,7 +423,7 @@ def run(self):
425 423
 
426 424
         Raise an exception if there are any problems.
427 425
         """
428  
-        if not self._script:
  426
+        if not self._script_path:
429 427
             raise AssertionError("No script to run!")
430 428
 
431 429
         if self._ran_job:
@@ -438,7 +436,8 @@ def stream_output(self):
438 436
         """Stream raw lines from the job's output. You can parse these
439 437
         using the read() method of the appropriate HadoopStreamingProtocol
440 438
         class."""
441  
-        assert self._ran_job
  439
+        if not self._ran_job:
  440
+            raise AssertionError('Run the job before streaming output')
442 441
 
443 442
         output_dir = self.get_output_dir()
444 443
         log.info('Streaming final output from %s' % output_dir)
@@ -624,8 +623,8 @@ def _run(self):
624 623
 
625 624
     ### internal utilities for implementing MRJobRunners ###
626 625
 
627  
-    def _add_python_archive(self, hash_path):
628  
-        python_archive = parse_hash_path(hash_path, type='archive')
  626
+    def _add_python_archive(self, path):
  627
+        python_archive = parse_legacy_hash_path('archive', path)
629 628
         self._wd_mgr.add(**python_archive)
630 629
         self._python_archives.append(python_archive)
631 630
 
@@ -663,9 +662,8 @@ def _make_unique_job_name(self, label=None, owner=None):
663 662
         # use the name of the script if one wasn't explicitly
664 663
         # specified
665 664
         if not label:
666  
-            if self._script:
667  
-                label = os.path.basename(
668  
-                    self._script['path']).split('.')[0]
  665
+            if self._script_path:
  666
+                label = os.path.basename(self._script_path).split('.')[0]
669 667
             else:
670 668
                 label = 'no_script'
671 669
 
@@ -686,7 +684,7 @@ def _get_steps(self):
686 684
         cached to avoid round trips to a subprocess.
687 685
         """
688 686
         if self._steps is None:
689  
-            if not self._script:
  687
+            if not self._script_path:
690 688
                 self._steps = []
691 689
             else:
692 690
                 args = (self._executable(True) + ['--steps'] +
@@ -725,22 +723,22 @@ def _executable(self, steps=False):
725 723
         # hadoop runners check for executable script paths and prepend the
726 724
         # working_dir, discarding the interpreter if possible.
727 725
         if steps:
728  
-            return self._opts['steps_interpreter'] + [self._script['path']]
  726
+            return self._opts['steps_interpreter'] + [self._script_path]
729 727
         else:
730 728
             return (self._opts['interpreter'] +
731  
-                    [self._wd_mgr.name(**self._script)])
  729
+                    [self._wd_mgr.name('file', self._script_path)])
732 730
 
733 731
     def _script_args_for_step(self, step_num, mrc):
734  
-        assert self._script
  732
+        assert self._script_path
735 733
 
736 734
         args = self._executable() + [
737 735
             '--step-num=%d' % step_num,
738 736
             '--%s' % mrc,
739 737
         ] + self._mr_job_extra_args()
740  
-        if self._wrapper_script:
  738
+        if self._wrapper_script_path:
741 739
             return (
742 740
                 self._opts['python_bin'] +
743  
-                [self._wd_mgr.name(**self._wrapper_script)] +
  741
+                [self._wd_mgr.name('file', self._wrapper_script_path)] +
744 742
                 args)
745 743
         else:
746 744
             return args
@@ -826,12 +824,12 @@ def _get_file_upload_args(self, local=False):
826 824
             the path they'll have inside Hadoop streaming
827 825
         """
828 826
         args = []
829  
-        for arg, arg_file in self._file_upload_args:
  827
+        for arg, path_dict in self._file_upload_args:
830 828
             args.append(arg)
831 829
             if local:
832  
-                args.append(arg_file['path'])
  830
+                args.append(path_dict['path'])
833 831
             else:
834  
-                args.append(self._wd_mgr.name(**arg_file))
  832
+                args.append(self._wd_mgr.name(**path_dict))
835 833
         return args
836 834
 
837 835
     def _wrapper_script_content(self):
@@ -889,7 +887,7 @@ def _create_wrapper_script(self, dest='wrapper.py'):
889 887
         """Create the wrapper script, and write it into our local temp
890 888
         directory (by default, to a file named wrapper.py).
891 889
 
892  
-        This will set self._wrapper_script, and add it to self._wd_mgr
  890
+        This will set self._wrapper_script_path, and add it to self._wd_mgr
893 891
 
894 892
         This will do nothing if setup_cmds and setup_scripts are
895 893
         empty, or _create_wrapper_script() has already been called.
@@ -897,7 +895,7 @@ def _create_wrapper_script(self, dest='wrapper.py'):
897 895
         if not (self._opts['setup_cmds'] or self._setup_scripts):
898 896
             return
899 897
 
900  
-        if self._wrapper_script:
  898
+        if self._wrapper_script_path:
901 899
             return
902 900
 
903 901
         path = os.path.join(self._get_local_tmp_dir(), dest)
@@ -911,8 +909,8 @@ def _create_wrapper_script(self, dest='wrapper.py'):
911 909
         f.write(contents)
912 910
         f.close()
913 911
 
914  
-        self._wrapper_script = parse_hash_path(path, type='file')
915  
-        self._wd_mgr.add(**self._wrapper_script)
  912
+        self._wrapper_script_path = path
  913
+        self._wd_mgr.add('file', self._wrapper_script_path)
916 914
 
917 915
     def _get_input_paths(self):
918 916
         """Get the paths to input files, dumping STDIN to a local
@@ -1020,6 +1018,42 @@ def _hadoop_conf_args(self, step_num, num_steps):
1020 1018
                 args.extend(['-jobconf', '%s=%s' % (key, value)])
1021 1019
 
1022 1020
         return args
  1021
+    def _arg_hash_paths(self, type, upload_mgr):
  1022
+        """Helper function for the *distributed_cache_args methods."""
  1023
+        for name, path in self._wd_mgr.name_to_path(type).iteritems():
  1024
+            uri = self._upload_mgr.uri(path)
  1025
+            yield '%s#%s' % (uri, name)
  1026
+
  1027
+    def _new_distributed_cache_args(self, upload_mgr):
  1028
+        args = []
  1029
+
  1030
+        # TODO: does Hadoop have a way of coping with paths that have
  1031
+        # commas in their names?
  1032
+
  1033
+        file_hash_paths = list(self._arg_hash_paths('file', upload_mgr))
  1034
+        if file_hash_paths:
  1035
+            args.append('-files')
  1036
+            args.append(','.join(file_hash_paths))
  1037
+
  1038
+        archive_hash_paths = list(self._arg_hash_paths('archive', upload_mgr))
  1039
+        if archive_hash_paths:
  1040
+            args.append('-archives')
  1041
+            args.append(','.join(archive_hash_paths))
  1042
+
  1043
+        return args
  1044
+
  1045
+    def _old_distributed_cache_args(self, upload_mgr):
  1046
+        args = []
  1047
+
  1048
+        for file_hash in self._upload_hash_paths('file', upload_mgr):
  1049
+            args.append('-cacheFile')
  1050
+            args.append(file_hash)
  1051
+
  1052
+        for archive_hash in self._upload_hash_paths('archive', upload_mgr):
  1053
+            args.append('-cacheArchive')
  1054
+            args.append(archive_hash)
  1055
+
  1056
+        return args
1023 1057
 
1024 1058
     def _invoke_sort(self, input_paths, output_path):
1025 1059
         """Use the local sort command to sort one or more input files. Raise
18  tests/test_cmd.py
@@ -20,7 +20,7 @@
20 20
     import unittest
21 21
 
22 22
 from mrjob.cmd import name_uniquely
23  
-from mrjob.cmd import ScratchDirManager
  23
+from mrjob.cmd import UploadDirManager
24 24
 from mrjob.cmd import WorkingDirManager
25 25
 
26 26
 
@@ -93,19 +93,19 @@ def test_initial_dot_isnt_extension(self):
93 93
             '.mrjob-1.conf')  # not '-1.mrjob.conf'
94 94
 
95 95
 
96  
-class ScratchDirManagerTestCase(unittest.TestCase):