-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_timing_connections.py
executable file
·86 lines (69 loc) · 2.66 KB
/
test_timing_connections.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python-sirius
import time
from epics import PV
from siriuspy.search import PSSearch, LLTimeSearch
cycle = PV('AS-RaMO:TI-EVG:CycleExtTrig-Cmd')
def put_pv(sp, rb, value):
sp.value = value
while rb.value != value:
time.sleep(0.1)
def test_trigger(trig_sp, trig_rb, pvs_ps, trigs, trig):
print('{}'.format(trig_sp.pvname.replace('State-Sel', '')))
vals0 = [pv.get(use_monitor=False) for pv in pvs_ps]
put_pv(trig_sp, trig_rb, 1)
cycle.value = 1
time.sleep(0.5)
put_pv(trig_sp, trig_rb, 0)
time.sleep(1)
vals1 = [pv.get(use_monitor=False) for pv in pvs_ps]
pss = set()
for v0, v1, pv in zip(vals0, vals1, pvs_ps):
if v0 != v1:
pss.add(pv.pvname.replace(':WfmSyncPulseCount-Mon', ''))
for ps in pss:
print(' {}'.format(ps))
amais = pss - trigs[trig]
for name in amais:
print(' a mais: {}'.format(name))
amenos = trigs[trig] - pss
for name in amenos:
print(' a menos: {}'.format(name))
if __name__ == '__main__':
pss = PSSearch.get_psnames(
{'sec': 'BO', 'dis': 'PS', 'sub': '[0-9]{2}.*',})
pss.extend(PSSearch.get_psnames(
{'sec': 'SI', 'dis': 'PS', 'sub': '[0-9]{2}.*', 'dev': '[QC]'}))
trigs = dict()
for i, ps in enumerate(pss):
chan = ps.substitute(propty_name='BCKPLN')
trig = LLTimeSearch.get_trigger_name(chan)
if trig in trigs:
trigs[trig].add(ps)
else:
trigs[trig] = {ps, }
pvs_ps = list()
for ps in sorted(pss):
pvs_ps.append(PV(ps.substitute(
propty_name='WfmSyncPulseCount', propty_suffix='Mon')))
pvs_trig_sp = dict()
pvs_trig_rb = dict()
for trig in trigs:
pvn = LLTimeSearch.get_channel_output_port_pvname(trig)
pvs_trig_sp[trig] = PV(pvn.substitute(
propty_name=pvn.propty+'State', propty_suffix='Sel'))
pvs_trig_rb[trig] = PV(pvn.substitute(
propty_name=pvn.propty+'State', propty_suffix='Sts'))
ps_conn = all(map(lambda x: x.wait_for_connection(2), pvs_ps))
tr_conn_sp = all(map(
lambda x: x.wait_for_connection(2), pvs_trig_sp.values()))
tr_conn_rb = all(map(
lambda x: x.wait_for_connection(2), pvs_trig_rb.values()))
print(ps_conn, tr_conn_sp, tr_conn_rb, cycle.connected)
for trig in sorted(trigs):
if not trig.startswith('IA-10RaBPM:TI-AMCFPGAEVR'):
continue
put_pv(pvs_trig_sp[trig], pvs_trig_sp[trig], 0)
for trig in sorted(trigs):
if not trig.startswith('IA-10RaBPM:TI-AMCFPGAEVR'):
continue
test_trigger(pvs_trig_sp[trig], pvs_trig_sp[trig], pvs_ps, trigs, trig)