Skip to content

Commit

Permalink
Catch non-zero exit from pipe commands
Browse files Browse the repository at this point in the history
This will allow problems with piped commands to be detected.
This will also allow tasks to be retried where errors are rare (such as network problems in piped commands).
  • Loading branch information
megatron-me-uk committed May 19, 2015
1 parent df34793 commit f552d49
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,11 @@ def pipe_objs(out):
out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
pipe.wait()
if pipe.returncode:
raise Exception("Pipe function `%s' exited with error code %d" %(command, pipe.returncode) )
return result
return self.mapPartitions(func)

def foreach(self, f):
Expand Down

0 comments on commit f552d49

Please sign in to comment.