Skip to content

Commit

Permalink
SPARK-1115: Catch depickling errors
Browse files Browse the repository at this point in the history
This surroungs the complete worker code in a try/except block so we catch any error that arrives. An example would be the depickling failing for some reason

@JoshRosen

Author: Bouke van der Bijl <boukevanderbijl@gmail.com>

Closes #644 from bouk/catch-depickling-errors and squashes the following commits:

f0f67cc [Bouke van der Bijl] Lol indentation
0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try block
  • Loading branch information
bouk authored and JoshRosen committed Feb 26, 2014
1 parent c86eec5 commit 12738c1
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish):


def main(infile, outfile):
boot_time = time.time()
split_index = read_int(infile)
if split_index == -1: # for unit tests
return
try:
boot_time = time.time()
split_index = read_int(infile)
if split_index == -1: # for unit tests
return

# fetch name of workdir
spark_files_dir = utf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
# fetch name of workdir
spark_files_dir = utf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True

# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = pickleSer._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)
# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = pickleSer._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))

command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
try:
command = pickleSer._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception as e:
Expand Down

0 comments on commit 12738c1

Please sign in to comment.