This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
/
test_cluster_util.py
211 lines (166 loc) · 6.98 KB
/
test_cluster_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python
"""Tests for functions in utils."""
__author__ = "Jens Reeder"
__copyright__ = "Copyright 2011, The QIIME Project"
# remember to add yourself if you make changes
__credits__ = ["Jens Reeder", "Rob Knight"]
__license__ = "GPL"
__version__ = "1.9.1-dev"
__maintainer__ = "Jens Reeder"
__email__ = "jens.reeder@gmail.com"
import signal
import os
from os import remove, rmdir, environ, close
from time import sleep, time
from os.path import exists
from StringIO import StringIO
from socket import error
from tempfile import mkstemp, mkdtemp
from unittest import TestCase, main
from bfillings.denoiser import Flowgram
from skbio.parse.sequences import parse_fasta
from burrito.util import ApplicationNotFoundError
from skbio.util import remove_files
from qiime.util import load_qiime_config
from qiime.denoiser.cluster_utils import submit_jobs, setup_server,\
setup_workers, adjust_workers, stop_workers, check_workers
# timeout handling taken from test_workflow.py
class TimeExceededError(Exception):
pass
allowed_seconds_per_test = 60
def timeout(signum, frame):
raise TimeExceededError("Test failed to run in allowed time (%d seconds)."
% allowed_seconds_per_test)
class TestUtils(TestCase):
def setUp(self):
self.home = environ['HOME']
self.server_socket = None
fd, self.tmp_result_file = mkstemp(dir=self.home,
prefix="test_hello_",
suffix=".txt")
close(fd)
self.tmp_dir = mkdtemp(dir=self.home,
prefix="test_cluster_util",
suffix="/")
self.files_to_remove = [self.tmp_result_file]
self.command = "echo hello > %s\n" % self.tmp_result_file
signal.signal(signal.SIGALRM, timeout)
# set the 'alarm' to go off in allowed_seconds seconds
signal.alarm(allowed_seconds_per_test)
def tearDown(self):
"""Clean up tmp files."""
# turn off the alarm
signal.alarm(0)
remove_files(self.files_to_remove, False)
if self.server_socket:
self.server_socket.close()
# give clients time to clean up
sleep(1)
if exists(self.tmp_dir):
try:
rmdir(self.tmp_dir)
except OSError:
# give clients some more time, fail if still error
sleep(5)
rmdir(self.tmp_dir)
def _setup_server_and_clients(self):
self.server_socket = setup_server()
workers, client_sockets = setup_workers(4, self.tmp_dir,
self.server_socket,
verbose=False)
return workers, [sock for sock, addr in client_sockets]
def test_submit_jobs(self):
"""submit_jobs executes commands on the cluster"""
submit_jobs([self.command], prefix="test_job")
# Try and wait ten times, could be made nicer with alarm()
for i in range(10):
if exists(self.tmp_result_file):
observed_text = "".join(list(open(self.tmp_result_file)))
self.assertEqual(observed_text, "hello\n")
return
else:
sleep(10)
# if we get here we failed
self.fail("The test job apparently never finished.\n"
+ "check the jobs error log and check the queue status\n.")
def test_setup_workers(self):
"""setup_workers starts clients"""
self.server_socket = setup_server()
workers, client_sockets = setup_workers(4, self.tmp_dir,
self.server_socket,
verbose=False)
self.assertEqual(len(workers), 4)
# Try sending some data, return of send should be length of message
for client_sock, addr in client_sockets:
self.assertEqual(client_sock.send("Hello"), 5)
# workers die, once server_socket closes in tearDown()
def test_adjust_workers(self):
"""adjust_workers stops clients"""
workers, client_sockets = self._setup_server_and_clients()
last_sock = client_sockets[-1]
qiime_config = load_qiime_config()
min_per_core = int(qiime_config['denoiser_min_per_core'])
# no sockets get stopped
self.assertEqual(
adjust_workers(
4 *
min_per_core -
1,
4,
client_sockets),
4)
# if we can send something the socket is still alive
self.assertEqual(last_sock.send("Hello"), 5)
# now, kill one client
self.assertEqual(
adjust_workers(
3 *
min_per_core -
1,
4,
client_sockets),
3)
# socket should be closed
self.assertRaises(error, last_sock.send, "Hello")
def test_stop_workers(self):
"""stop_workers terminates all clients"""
workers, client_sockets = self._setup_server_and_clients()
stop_workers(client_sockets)
for client_socket in client_sockets:
self.assertRaises(error, client_socket.send, "hello")
def test_stop_workers_on_closed_socket(self):
"""stop_workers terminates all clients"""
# Repeat test but this time close one of the sockets early.
# simulates crashed client
workers, client_sockets = self._setup_server_and_clients()
client_sockets[-1].close()
fake_fh = StringIO()
stop_workers(client_sockets, fake_fh)
self.assertEqual(fake_fh.getvalue(),
"Worker 3 seems to be dead already. Check for runaways!\n")
for client_socket in client_sockets:
self.assertRaises(error, client_socket.send, "hello")
def test_check_workers(self):
"""check_workers checks for dead workers"""
workers, client_sockets = self._setup_server_and_clients()
self.assertTrue(check_workers(workers, client_sockets))
# Now close and terminate a client, wait and check again
client_sockets[0].close()
self.server_socket.close()
sleep(1)
self.assertFalse(check_workers(workers, client_sockets))
def test_setup_server(self):
"""setup_server opens a port and listens"""
self.server_socket = setup_server()
host, port = self.server_socket.getsockname()
# Not much to test here, if we get something back it should work
self.assertGreater(port, 1023)
# binding to a known port should fail
self.assertRaises(error, setup_server, 80)
def test_save_send(self):
"""save_send reliably send data to a socket"""
# Don't really know how to test this effectively...
# Would require to simulate a blocking socket on the recipient side...
pass
if __name__ == "__main__":
main()