Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7735] [pyspark] Raise Exception on non-zero exit from pipe commands #6262

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f552d49
Catch non-zero exit from pipe commands
megatron-me-uk May 19, 2015
5745d85
Remove space to fix style
megatron-me-uk Jun 3, 2015
45f4977
fix line too long style error
megatron-me-uk Jun 4, 2015
0974f98
add space between words in multiline string
megatron-me-uk Jun 4, 2015
1b3dc4e
fix missing space around operator style
megatron-me-uk Jun 4, 2015
8db4073
Add a test for rdd pipe functions
megatron-me-uk Jun 15, 2015
cc1a73d
fix style issues in pipe test
megatron-me-uk Jun 15, 2015
3ab8c7a
remove whitespace for style
megatron-me-uk Jun 18, 2015
3344a21
wrap assertRaises with QuietTest
megatron-me-uk Jun 18, 2015
491d3fc
Pass a function handle to assertRaises
megatron-me-uk Jun 18, 2015
4153b02
fix list.sort returns None
megatron-me-uk Jun 18, 2015
8ed89a6
Chain generators to prevent potential deadlock
megatron-me-uk Jun 19, 2015
0486ae3
style fixes
megatron-me-uk Jun 19, 2015
8a9ef9c
make check_return_code an iterator
megatron-me-uk Jun 24, 2015
a0c0161
fix generator issue
megatron-me-uk Jun 24, 2015
34fcdc3
add optional argument 'mode' for rdd.pipe
megatron-me-uk Jun 30, 2015
a307d13
update rdd tests to test pipe modes
megatron-me-uk Jun 30, 2015
b0ac3a4
Merge pull request #1 from megatron-me-uk/megatron-me-uk-patch-1
megatron-me-uk Jun 30, 2015
eb4801c
fix fail_condition
megatron-me-uk Jun 30, 2015
ab9a2e1
Update rdd pipe tests for checkCode
megatron-me-uk Jul 7, 2015
0c1e762
Update rdd pipe method for checkCode
megatron-me-uk Jul 7, 2015
574b564
Merge pull request #2 from megatron-me-uk/patch-4
megatron-me-uk Jul 7, 2015
98fa101
fix blank line style error
megatron-me-uk Jul 7, 2015
04ae1d5
Remove spurious empty line
megatron-me-uk Jul 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,14 @@ def groupBy(self, f, numPartitions=None):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)

@ignore_unicode_prefix
def pipe(self, command, env={}):
def pipe(self, command, env={}, checkCode=False):
"""
Return an RDD created by piping elements to a forked external process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add doc for mode?

>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']

:param checkCode: whether or not to check the return value of the shell command.
"""
def func(iterator):
pipe = Popen(
Expand All @@ -704,7 +706,17 @@ def pipe_objs(out):
out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))

def check_return_code():
pipe.wait()
if checkCode and pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
for i in range(0):
yield i
return (x.rstrip(b'\n').decode('utf-8') for x in
chain(iter(pipe.stdout.readline, b''), check_return_code()))
return self.mapPartitions(func)

def foreach(self, f):
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,18 @@ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
for size in sizes:
self.assertGreater(size, 0)

def test_pipe_functions(self):
data = ['1', '2', '3']
rdd = self.sc.parallelize(data)
with QuietTest(self.sc):
self.assertEqual([], rdd.pipe('cc').collect())
self.assertRaises(Py4JJavaError, rdd.pipe('cc', checkCode=True).collect)
result = rdd.pipe('cat').collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test for grep? It may exit with no-zero code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do although I think a non-zero code from grep will raise an Exception in the scala implementation. See (that I just found):
https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala

I see that grep will return 1 if it doesn't match any lines in a partition. Raising an exception in this case may well be annoying but other shell functions can return 1 on errors (for example http://www.postgresql.org/docs/8.3/static/app-psql.html). I wonder what the best solution is here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have an option for this? At least, we should have an option to support grep on a partition without any matching lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue can be worked around using grep target; test $? -le 1 although maybe not the best solution.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NAVER - http://www.naver.com/

sujkh@naver.com 님께 보내신 메일 <Re: [spark] [SPARK-7735] [pyspark] Raise Exception on non-zero exit from pipe commands (#6262)> 이 다음과 같은 이유로 전송 실패했습니다.


받는 사람이 회원님의 메일을 수신차단 하였습니다.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies, I guess that your comment about grep is due to the fact that the fix here might break programs that used to work in case they relied on the old behavior? I guess that one option might be to add a configuration flag for controlling the exit-code-handling behavior and to update the documentation to address this. Any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's something in my mind. Maybe add a option to RDD.pipe() would be friendly, having the default behavior as before.

result.sort()
[self.assertEqual(x, y) for x, y in zip(data, result)]
self.assertRaises(Py4JJavaError, rdd.pipe('grep 4', checkCode=True).collect)
self.assertEqual([], rdd.pipe('grep 4').collect())


class ProfilerTests(PySparkTestCase):

Expand Down