Skip to content

Commit

Permalink
[SPARK-2627] keep up with the PEP 8 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nchammas committed Aug 4, 2014
1 parent 9da347f commit bfb9f9f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
3 changes: 1 addition & 2 deletions python/pyspark/daemon.py
Expand Up @@ -138,8 +138,7 @@ def handle_sigchld(*args):
try:
os.kill(worker_pid, signal.SIGKILL)
except OSError:
pass # process already died

pass # process already died

if listen_sock in ready_fds:
sock, addr = listen_sock.accept()
Expand Down
11 changes: 6 additions & 5 deletions python/pyspark/serializers.py
Expand Up @@ -293,6 +293,7 @@ def _hack_namedtuple(cls):
""" Make class generated by namedtuple picklable """
name = cls.__name__
fields = cls._fields

def __reduce__(self):
return (_restore, (name, fields, tuple(self)))
cls.__reduce__ = __reduce__
Expand All @@ -301,11 +302,11 @@ def __reduce__(self):

def _hijack_namedtuple():
""" Hack namedtuple() to make it picklable """
global _old_namedtuple # or it will put in closure
global _old_namedtuple # or it will put in closure

def _copy_func(f):
return types.FunctionType(f.func_code, f.func_globals, f.func_name,
f.func_defaults, f.func_closure)
f.func_defaults, f.func_closure)

_old_namedtuple = _copy_func(collections.namedtuple)

Expand All @@ -323,9 +324,9 @@ def namedtuple(name, fields, verbose=False, rename=False):
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.iteritems():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace


_hijack_namedtuple()
Expand Down
9 changes: 6 additions & 3 deletions python/pyspark/tests.py
Expand Up @@ -840,12 +840,15 @@ def test_termination_sigterm(self):


class TestWorker(PySparkTestCase):

def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
path = temp.name

def sleep(x):
import os, time
import os
import time
with open(path, 'w') as f:
f.write("%d %d" % (os.getppid(), os.getpid()))
time.sleep(100)
Expand Down Expand Up @@ -875,7 +878,7 @@ def run():
os.kill(worker_pid, 0)
time.sleep(0.1)
except OSError:
break # worker was killed
break # worker was killed
else:
self.fail("worker has not been killed after 5 seconds")

Expand All @@ -885,7 +888,7 @@ def run():
self.fail("daemon had been killed")

def test_fd_leak(self):
N = 1100 # fd limit is 1024 by default
N = 1100 # fd limit is 1024 by default
rdd = self.sc.parallelize(range(N), N)
self.assertEquals(N, rdd.count())

Expand Down

0 comments on commit bfb9f9f

Please sign in to comment.