-
Notifications
You must be signed in to change notification settings - Fork 28k
/
test_daemon.py
89 lines (71 loc) · 3.02 KB
/
test_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import time
import unittest
from pyspark.serializers import read_int
class DaemonTests(unittest.TestCase):
def connect(self, port):
from socket import socket, AF_INET, AF_INET6, SOCK_STREAM
family, host = AF_INET, "127.0.0.1"
if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
family, host = AF_INET6, "::1"
sock = socket(family, SOCK_STREAM)
sock.connect((host, port))
# send a split index of -1 to shutdown the worker
sock.send(b"\xFF\xFF\xFF\xFF")
sock.close()
return True
def do_termination_test(self, terminator):
from subprocess import Popen, PIPE
from errno import ECONNREFUSED
# start daemon
daemon_path = os.path.join(os.path.dirname(__file__), "..", "daemon.py")
python_exec = sys.executable or os.environ.get("PYSPARK_PYTHON")
daemon = Popen([python_exec, daemon_path], stdin=PIPE, stdout=PIPE)
# read the port number
port = read_int(daemon.stdout)
# daemon should accept connections
self.assertTrue(self.connect(port))
# wait worker process spawned from daemon exit.
time.sleep(1)
# request shutdown
terminator(daemon)
time.sleep(1)
# daemon should no longer accept connections
try:
self.connect(port)
except EnvironmentError as exception:
self.assertEqual(exception.errno, ECONNREFUSED)
else:
self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""
self.do_termination_test(lambda daemon: daemon.stdin.close())
def test_termination_sigterm(self):
"""Ensure that daemon and workers terminate on SIGTERM."""
from signal import SIGTERM
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
if __name__ == "__main__":
from pyspark.tests.test_daemon import * # noqa: F401
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)