Skip to content

Commit

Permalink
Chain generators to prevent potential deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
megatron-me-uk committed Jun 19, 2015
1 parent 4153b02 commit 8ed89a6
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,12 +704,15 @@ def pipe_objs(out):
out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
pipe.wait()
if pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
return result
def check_return_code():
pipe.wait()
if pipe.returncode:
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
return None
return (x.rstrip(b'\n').decode('utf-8') for x in
chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None)))
return self.mapPartitions(func)

def foreach(self, f):
Expand Down

0 comments on commit 8ed89a6

Please sign in to comment.