-
Notifications
You must be signed in to change notification settings - Fork 12
/
wand_server.py
451 lines (357 loc) · 15.6 KB
/
wand_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
""" Wavelength Analysis 'nd Display server
The server exposes two interfaces to clients:
1. The "control" interface: an RPC interface for tasks such as:
frequency/OSA measurements; querying server parameters, such as the laser
database; setting WLM exposure times; etc
2. The "notify" interface: an asynchronous "sync_struct" interface that keeps
clients notified about new measurement data, changes to laser parameters,
etc.
"""
import argparse
import asyncio
import atexit
import time
import logging
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from sipyco import pyon
from sipyco.pc_rpc import Server as RPCServer
from sipyco.sync_struct import Publisher, Notifier
from sipyco.common_args import (simple_network_args, bind_address_from_args,
init_logger_from_args, verbosity_args)
from sipyco.asyncio_tools import atexit_register_coroutine
from wand.drivers.leoni_switch import LeoniSwitch
from wand.drivers.high_finesse import WLM
from wand.drivers.ni_osa import NiOSA
from wand.tools import (load_config, backup_config, regular_config_backup,
get_config_path, WLMMeasurementStatus)
from wand.server import ControlInterface
from wand.drivers.dl_pro import DLPro
logger = logging.getLogger(__name__)
def task_id_generator():
""" Yields unique, sequential task ids beginning with 0 """
task_id = 0
while True:
yield task_id
task_id += 1
def get_argparser():
parser = argparse.ArgumentParser(description="WAnD server")
simple_network_args(parser, [
("notify", "notifications", 3250),
("control", "control", 3251),
])
verbosity_args(parser)
parser.add_argument("-n", "--name",
default="test",
help="server name, used to locate configuration file")
parser.add_argument("--simulation",
action='store_true',
help="run in simulation mode")
parser.add_argument("--fast-mode-timeout",
default=1800,
type=int,
help="fast mode timeout (s) (default: '%(default)s')")
parser.add_argument("-b", "--backup-dir",
default="",
type=str,
help="directory containing backup copies of "
"configuration files")
return parser
class WandServer:
def __init__(self):
self.args = args = get_argparser().parse_args()
init_logger_from_args(args)
self.config = load_config(args, "_server")
self.lasers = self.config["lasers"].keys()
for laser in self.lasers:
self.config["lasers"][laser]["lock_ready"] = False
# connect to hardware
self.wlm = WLM(args.simulation)
if self.config.get("osas", "wlm") != "wlm":
self.osas = NiOSA(self.config["osas"], args.simulation)
self.exp_min = self.wlm.get_exposure_min()
self.exp_max = self.wlm.get_exposure_max()
self.num_ccds = self.wlm.get_num_ccds()
if self.config["switch"]["type"] == "internal":
self.switch = self.wlm.get_switch()
elif self.config["switch"]["type"] == "leoni":
self.switch = LeoniSwitch(
self.config["switch"]["ip"], args.simulation)
else:
raise ValueError("Unrecognised switch type: {}".format(
self.config["switch"]["type"]))
# measurement queue, processed by self.measurement_task
self.measurement_ids = task_id_generator()
self.measurements_queued = asyncio.Event()
self.queue = []
self.wake_locks = {laser: asyncio.Event() for laser in self.lasers}
# schedule initial frequency/osa readings all lasers
self.measurements_queued.set()
for laser in self.lasers:
self.queue.append({
"laser": laser,
"priority": 0,
"expiry": time.time(),
"id": next(self.measurement_ids),
"get_osa_trace": True,
"done": asyncio.Event()
})
# "notify" interface
self.laser_db = Notifier(self.config["lasers"])
self.freq_db = Notifier({laser: {
"freq": None,
"status": WLMMeasurementStatus.ERROR,
"timestamp": 0
} for laser in self.lasers})
self.osa_db = Notifier({laser: {
"trace": None,
"timestamp": 0
} for laser in self.lasers})
self.server_notify = Publisher({
"laser_db": self.laser_db, # laser settings
"freq_db": self.freq_db, # most recent frequency measurements
"osa_db": self.osa_db # most recent osa traces
})
# "control" interface
self.control_interface = ControlInterface(self)
self.server_control = RPCServer({"control": self.control_interface},
allow_parallel=True)
self.running = False
def start(self):
""" Start the server """
self.executor = ThreadPoolExecutor(max_workers=2)
self.loop = loop = asyncio.get_event_loop()
atexit.register(loop.close)
# start control server
bind = bind_address_from_args(self.args)
loop.run_until_complete(
self.server_control.start(bind, self.args.port_control))
atexit_register_coroutine(self.server_control.stop)
# start notify server
loop.run_until_complete(
self.server_notify.start(bind, self.args.port_notify))
atexit_register_coroutine(self.server_notify.stop)
asyncio.ensure_future(self.measurement_task())
for laser in self.lasers:
asyncio.ensure_future(self.lock_task(laser))
# backup of configuration file
backup_config(self.args, "_server")
asyncio.ensure_future(regular_config_backup(self.args, "_server"))
atexit.register(backup_config, self.args, "_server")
logger.info("server started")
self.running = True
loop.run_forever()
async def lock_task(self, laser):
conf = self.laser_db.raw_view[laser]
# only try to lock lasers with a controller specified
if not conf.get("host") or self.args.simulation:
return
while self.running:
conf["lock_ready"] = False
try:
iface = DLPro(conf["host"],
target=conf.get("target", "laser1"))
except OSError:
logger.warning(
"could not connect to laser '{}', retrying in 60s"
.format(laser))
if conf["locked"]:
self.control_interface.unlock(laser, conf["lock_owner"])
await asyncio.sleep(60)
continue
self.wake_locks[laser].set()
conf["lock_ready"] = True
while self.running:
if not conf["locked"]:
await self.wake_locks[laser].wait()
self.wake_locks[laser].clear()
continue
poll_time = conf["lock_poll_time"]
locked_at = conf["locked_at"]
timeout = conf["lock_timeout"]
set_point = conf["lock_set_point"]
gain = conf["lock_gain"] * poll_time
capture_range = conf["lock_capture_range"]
await asyncio.wait({self.wake_locks[laser].wait()},
timeout=poll_time)
self.wake_locks[laser].clear()
if timeout is not None and time.time() > (locked_at + timeout):
logger.info("'{}'' lock timed out".format(laser))
self.control_interface.unlock(laser, conf["lock_owner"])
await asyncio.sleep(0)
continue
status, delta, _ = await self.control_interface.get_freq(
laser, age=0, priority=5, get_osa_trace=False,
blocking=True, mute=False, offset_mode=True)
if status != WLMMeasurementStatus.OKAY:
continue
f_error = delta - set_point
V_error = f_error * gain
if abs(f_error) > capture_range:
logger.warning("'{}'' outside capture range".format(laser))
self.control_interface.unlock(laser, conf["lock_owner"])
await asyncio.sleep(0)
continue
# don't drive the PZT too far in a single step
V_error = min(V_error, 0.25)
V_error = max(V_error, -0.25)
try:
v_pzt = iface.get_pzt_voltage()
v_pzt -= V_error
if v_pzt > 100 or v_pzt < 25:
logger.warning("'{}'' lock railed".format(laser))
self.control_interface.unlock(laser,
conf["lock_owner"])
await asyncio.sleep(0)
continue
iface.set_pzt_voltage(v_pzt)
except OSError:
logger.warning("Connection to laser '{}' lost"
.format(laser))
self.control_interface.unlock(laser, conf["lock_owner"])
await asyncio.sleep(0)
break
try:
iface.close()
except Exception:
pass
finally:
conf["lock_ready"] = False
async def measurement_task(self):
""" Process queued measurements """
active_laser = ""
while True:
if self.queue == []:
self.measurements_queued.clear()
await self.measurements_queued.wait()
# process in order of priority, followed by submission time
priorities = [meas["priority"] for meas in self.queue]
meas = self.queue[priorities.index(max(priorities))]
laser = meas["laser"]
laser_conf = self.laser_db.raw_view[laser]
if laser != active_laser:
self.switch.set_active_channel(laser_conf["channel"])
# Switching is slow so we might as well take an OSA trace!
meas["get_osa_trace"] = True
active_laser = meas["laser"]
await asyncio.sleep(self.config["switch"]["dead_time"])
exposure = laser_conf["exposure"]
for ccd, exp in enumerate(exposure):
self.wlm.set_exposure(exposure[ccd], ccd)
if laser_conf.get("osa", "wlm") == "wlm":
freq_osa_measurement = self.loop.run_in_executor(
self.executor,
self.take_freq_osa_measurement,
laser,
laser_conf["f_ref"],
meas["get_osa_trace"])
wlm_data, osa = await freq_osa_measurement
else:
freq_measurement = self.loop.run_in_executor(
self.executor,
self.take_freq_measurement,
laser,
laser_conf["f_ref"])
osa_measurement = self.loop.run_in_executor(
self.executor,
self.take_osa_measurement,
laser,
laser_conf.get("osa"),
meas["get_osa_trace"])
wlm_data, osa = (await asyncio.gather(freq_measurement,
osa_measurement))[:]
freq, peaks = wlm_data
self.freq_db[laser] = freq
if meas["get_osa_trace"]:
self.osa_db[laser] = osa
# fast mode timeout
if laser_conf["fast_mode"]:
t_en = laser_conf["fast_mode_set_at"]
if time.time() > (t_en + self.args.fast_mode_timeout):
self.laser_db[laser]["fast_mode"] = False
self.save_config_file()
logger.info("{} fast mode timeout".format(laser))
# auto-exposure
if laser_conf["auto_exposure"]:
new_exp = laser_conf["exposure"]
for ccd, peak in enumerate(peaks):
# don't try to find a suitable exposure for lasers that
# aren't on!
if peak < 0.05:
break
if not (0.4 < peak < 0.6):
exp = laser_conf["exposure"][ccd]
new_exp[ccd] = exp + 1 if peak < 0.4 else exp - 1
new_exp[ccd] = min(new_exp[ccd], self.exp_max[ccd])
new_exp[ccd] = max(new_exp[ccd], self.exp_min[ccd])
if new_exp != exp:
self.laser_db[laser]["exposure"] = new_exp
self.save_config_file()
# check which other measurements wanted this data
for task in self.queue:
if task["laser"] == laser \
and (meas["get_osa_trace"] or not task["get_osa_trace"]):
task["done"].set()
self.queue.remove(task)
logger.info("task {} complete".format(task["id"]))
def take_freq_measurement(self, laser, f0):
""" Preform a single frequency measurement """
logger.info("Taking new frequency measurement for {}".format(laser))
status, freq = self.wlm.get_frequency()
freq = {
"freq": freq,
"status": int(status),
"timestamp": time.time()
}
# make simulation data more interesting!
if self.args.simulation:
freq["freq"] = f0 + np.random.normal(loc=0, scale=10e6)
peaks = [self.wlm.get_fringe_peak(ccd) for ccd in range(self.num_ccds)]
return freq, peaks
def take_osa_measurement(self, laser, osa, get_osa_trace):
""" Capture an osa trace """
if not get_osa_trace:
return {
"trace": None,
"timestamp": time.time()
}
osa = {"trace": self.osas.get_trace(osa).tolist(),
"timestamp": time.time()
}
return osa
def take_freq_osa_measurement(self, laser, f0, get_osa_trace):
""" Get frequency and spectral data from the wlm """
logger.info("Taking new frequency + spectral measurement for {}"
.format(laser))
status, freq = self.wlm.get_frequency()
freq = {
"freq": freq,
"status": int(status),
"timestamp": time.time()
}
# make simulation data more interesting!
if self.args.simulation:
freq["freq"] = f0 + np.random.normal(loc=0, scale=10e6)
peaks = [self.wlm.get_fringe_peak(ccd) for ccd in range(self.num_ccds)]
if not get_osa_trace:
osa = {
"trace": None,
"timestamp": time.time()
}
else:
osa = {"trace": self.wlm.get_pattern().tolist(),
"timestamp": time.time()
}
return (freq, peaks), osa
def save_config_file(self):
try:
self.config["lasers"] = self.laser_db.raw_view
config_path, _ = get_config_path(self.args, "_server")
pyon.store_file(config_path, self.config)
except Exception:
log.warning("error when trying to save config data")
def main():
server = WandServer()
server.start()
if __name__ == "__main__":
main()