Permalink
Browse files

Merge from master

  • Loading branch information...
2 parents 7c47761 + 67d1236 commit 680b5d931705a80b68541fd2ef73224f33159e8f Steve Johnson committed Jun 15, 2012
Showing with 38 additions and 4 deletions.
  1. +4 −0 CHANGES.txt
  2. +11 −4 mrjob/local.py
  3. +23 −0 tests/test_local.py
View
@@ -12,6 +12,10 @@ v0.4, 2012-06-?? -- Slouching toward nirvana
* IF_SUCCESSFUL
* DEFAULT_CLEANUP
+v0.3.4.1, 2012-06-12 -- The test suite doesn't catch everything...
+ * Local mode doesn't try to send multiple mappers to the same output file
+ when using multiple compressed files as input
+
v0.3.4, 2012-06-11 -- We are friendly people.
* Experimental support for IronPython in the local and inline runners
* set_status() and increment_counter() will encode messages/names of type
View
@@ -309,14 +309,20 @@ def _get_file_splits(self, input_paths, num_splits, keep_sorted=False):
file_names = {}
input_paths_to_split = []
+ # Each file is assigned a 'task number' as if coming from some previous
+ # task. The task number is used to choose the split file name, and
+ # sometimes the file name of the sorted split. This is done so that
+ # when the output files are combined after the final step, they are in
+ # sorted order due to already being lexicographically sorted.
+
for input_path in input_paths:
for path in self.ls(input_path):
if path.endswith('.gz'):
# do not split compressed files
file_names[path] = {
'orig_name': path,
'start': 0,
- 'task_num': 0,
+ 'task_num': len(file_names),
'length': os.stat(path)[stat.ST_SIZE],
}
# this counts as "one split"
@@ -455,9 +461,10 @@ def _invoke_step(self, args, outfile_name, step_num=0, num_tasks=1,
self._prev_outfiles = []
# The correctly-ordered list of task_num, file_name pairs
- file_tasks = sorted([(t.get('task_num', 0), file_name) for file_name, t in
- file_splits.items()], key=lambda t: t[0])
-
+ file_tasks = sorted([
+ (t['task_num'], file_name) for file_name, t
+ in file_splits.items()], key=lambda t: t[0])
+
for task_num, file_name in file_tasks:
# setup environment variables
View
@@ -260,6 +260,29 @@ def test_multi_step_counters(self):
{'group': {'counter_name': 2}},
{'group': {'counter_name': 2}}])
+ def test_gz_split_regression(self):
+ gz_path_1 = os.path.join(self.tmp_dir, '1.gz')
+ gz_path_2 = os.path.join(self.tmp_dir, '2.gz')
+ path_3 = os.path.join(self.tmp_dir, '3')
+
+ input_gz_1 = gzip.GzipFile(gz_path_1, 'w')
+ input_gz_1.write('x\n')
+ input_gz_1.close()
+
+ input_gz_2 = gzip.GzipFile(gz_path_2, 'w')
+ input_gz_2.write('y\n')
+ input_gz_2.close()
+
+ with open(path_3, 'w') as f:
+ f.write('z')
+
+ mr_job = MRCountingJob(['--no-conf', '-r', 'local', gz_path_1,
+ gz_path_2, path_3])
+ with mr_job.make_runner() as r:
+ splits = r._get_file_splits([gz_path_1, gz_path_2, path_3], 1)
+ self.assertEqual(
+ len(set(s['task_num'] for s in splits.values())), 3)
+
class LocalMRJobRunnerNoSymlinksTestCase(LocalMRJobRunnerEndToEndTestCase):
"""Test systems without os.symlink (e.g. Windows). See Issue #46"""

0 comments on commit 680b5d9

Please sign in to comment.