Skip to content

Commit

Permalink
Merge pull request #2 from megatron-me-uk/patch-4
Browse files Browse the repository at this point in the history
Use boolean checkCode optional parameter
  • Loading branch information
megatron-me-uk committed Jul 7, 2015
2 parents eb4801c + 0c1e762 commit 574b564
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
17 changes: 4 additions & 13 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,24 +687,15 @@ def groupBy(self, f, numPartitions=None):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)

@ignore_unicode_prefix
def pipe(self, command, env={}, mode='permissive'):
def pipe(self, command, env={}, checkCode=False):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']
:param checkCode: whether or not to check the return value of the shell command.
"""
if mode == 'permissive':
def fail_condition(x):
return False
elif mode == 'strict':
def fail_condition(x):
return x != 0
elif mode == 'grep':
def fail_condition(x):
return x != 0 and x != 1
else:
raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.")

def func(iterator):
pipe = Popen(
Expand All @@ -719,7 +710,7 @@ def pipe_objs(out):

def check_return_code():
pipe.wait()
if fail_condition(pipe.returncode):
if checkCode and pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,13 +879,12 @@ def test_pipe_functions(self):
rdd = self.sc.parallelize(data)
with QuietTest(self.sc):
self.assertEqual([], rdd.pipe('cc').collect())
self.assertRaises(Py4JJavaError, rdd.pipe('cc', mode='strict').collect)
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
result = rdd.pipe('cat').collect()
result.sort()
[self.assertEqual(x, y) for x, y in zip(data, result)]
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', mode='strict').collect)
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
self.assertEqual([], rdd.pipe('grep 4').collect())
self.assertEqual([], rdd.pipe('grep 4', mode='grep').collect())


class ProfilerTests(PySparkTestCase):
Expand Down

0 comments on commit 574b564

Please sign in to comment.