-
Notifications
You must be signed in to change notification settings - Fork 6
/
server.py
171 lines (134 loc) · 5.47 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import atexit
import contextlib
import os
import shutil
import subprocess
import time
import requests
from devpi_plumber.client import DevpiClient
from six import iteritems
from twitter.common.contextutil import temporary_dir
@contextlib.contextmanager
def TestServer(users={}, indices={}, config={}, fail_on_output=['Traceback']):
"""
Starts a devpi server to be used within tests.
"""
with temporary_dir() as server_dir:
server_options = {
'host': os.getenv('DEVPI_PLUMBER_SERVER_HOST', 'localhost'),
'port': os.getenv('DEVPI_PLUMBER_SERVER_PORT', 2414),
'serverdir': server_dir,
}
server_options.update(config)
initialize_serverdir(server_options)
with DevpiServer(server_options) as url:
with DevpiClient(url, 'root', '') as client:
for user, kwargs in iteritems(users):
client.create_user(user, **kwargs)
for index, kwargs in iteritems(indices):
client.create_index(index, **kwargs)
yield client
_assert_no_logged_errors(fail_on_output, server_options['serverdir'] + '/server.log')
def import_state(serverdir, importdir):
devpi_server_command(serverdir=serverdir, init=None)
devpi_server_command(serverdir=serverdir, **{'import': importdir, 'no-events': None})
def export_state(serverdir, exportdir):
devpi_server_command(serverdir=serverdir, export=exportdir)
def _assert_no_logged_errors(fail_on_output, logfile):
with open(logfile) as f:
logs = f.read()
for message in fail_on_output:
if message not in logs:
continue
if message == 'Traceback' and logs.count(message) == logs.count('ValueError: I/O operation on closed file'):
# Heuristic to ignore false positives on the shutdown of replicas
# The master might still be busy serving root/pypi/simple for a stopping replica
continue
raise RuntimeError(logs)
def _dump_log(filename):
with open(filename) as log:
print(log.read())
@contextlib.contextmanager
def DevpiServer(options):
url = 'http://localhost:{}'.format(options['port'])
server = None
logfile = options['serverdir'] + '/server.log' if 'serverdir' in options else os.devnull
with open(logfile, 'wb', buffering=0) as stdout:
try:
try:
server = subprocess.Popen(
build_devpi_server_command(**options),
stderr=subprocess.STDOUT,
stdout=stdout,
close_fds=True,
)
wait_for_startup(server, url)
except:
_dump_log(logfile)
raise
yield url
finally:
if server and server.poll() is None:
server.terminate()
try:
wait_for_shutdown(server)
except TimeoutError:
server.kill()
wait_for_shutdown(server)
def build_devpi_server_command(**options):
opts = ['--{}={}'.format(k, v) for k, v in iteritems(options) if v is not None]
flags = ['--{}'.format(k) for k, v in iteritems(options) if v is None]
return ['devpi-server'] + opts + flags
def devpi_server_command(**options):
subprocess.check_output(build_devpi_server_command(**options), stderr=subprocess.STDOUT)
def wait_for_startup(server, url):
deadline = time.time() + 30
while time.time() < deadline:
if server.poll() is not None:
raise Exception('Server failed to start up.')
try:
requests.get(url, timeout=1)
except requests.RequestException:
time.sleep(0.1) # Request failed, try again
else:
return # Server came up
raise Exception('Server failed to start up within 30 seconds.')
def wait_for_shutdown(server):
"""
Wait 30s for the server to shut-down.
Raises a TimeoutError if the server fails to shut down.
"""
deadline = time.time() + 30
while time.time() < deadline and server.poll() is None:
time.sleep(0.1)
if server.poll() is None:
raise TimeoutError('Server failed to shut down within 30 seconds.')
serverdir_cache = '/tmp/devpi-plumber-cache'
atexit.register(shutil.rmtree, serverdir_cache, ignore_errors=True)
def initialize_serverdir(server_options):
"""
Starting a new devpi-server is costly due to its initial sync with pypi.python.org.
We can speedup this process by using the content of a cached serverdir.
"""
def init_serverdir():
devpi_server_command(init=None, **server_options)
serverdir_new = server_options['serverdir']
if os.path.exists(serverdir_new) and os.listdir(serverdir_new):
# Don't touch already populated directory.
return
if 'no-root-pypi' in server_options:
# Always run servers called with `--no-root-pypi in a freshly initialized serverdir.
init_serverdir()
return
if 'master-url' in server_options:
# Running as replica. Aways has to be a fresh sync.
init_serverdir()
else:
# Running as master.
if os.path.exists(serverdir_cache) and os.listdir(serverdir_cache):
shutil.rmtree(serverdir_new)
shutil.copytree(serverdir_cache, serverdir_new)
else:
init_serverdir()
shutil.rmtree(serverdir_cache, ignore_errors=True)
shutil.copytree(serverdir_new, serverdir_cache)