-
Notifications
You must be signed in to change notification settings - Fork 14
/
capart.py
121 lines (108 loc) · 4.6 KB
/
capart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from malcolm.core import Part, method_takes, REQUIRED
from malcolm.core.vmetas import StringMeta
from malcolm.controllers.defaultcontroller import DefaultController
from malcolm.parts.ca.cothreadimporter import CothreadImporter
@method_takes(
"name", StringMeta("Name of the created attribute"), REQUIRED,
"description", StringMeta("Desc of created attribute"), REQUIRED,
"pv", StringMeta("Full pv of demand and default for rbv"), "",
"rbv", StringMeta("Override for rbv"), "",
"rbvSuff", StringMeta("Set rbv ro pv + rbv_suff"), "",
"widget", StringMeta("Widget, like 'combo' or 'textinput'"), "",
"inportType", StringMeta(
"Flowgraph port Type if it is one (like 'CS' or 'NDArray')"), "")
class CAPart(Part):
# Camonitor subscription
monitor = None
# Attribute instance
attr = None
def __init__(self, process, params):
self.cothread, self.catools = CothreadImporter.get_cothread(process)
# Format for all caputs
self.ca_format = self.catools.FORMAT_TIME
super(CAPart, self).__init__(process, params)
def create_attributes(self):
params = self.params
if not params.rbv and not params.pv:
raise ValueError('Must pass pv or rbv')
if not params.rbv:
if params.rbvSuff:
params.rbv = params.pv + params.rbvSuff
else:
params.rbv = params.pv
# Find the tags
tags = self.create_tags(params)
# The attribute we will be publishing
self.attr = self.create_meta(params.description, tags).make_attribute()
if self.params.pv:
writeable_func = self.caput
else:
writeable_func = None
yield params.name, self.attr, writeable_func
def create_tags(self, params):
tags = []
if params.widget:
assert ":" not in params.widget, \
"Widget tag %r should not specify 'widget:' prefix" \
% params.widget
tags.append("widget:%s" % params.widget)
if params.inportType:
assert ":" not in params.inportType, \
"Inport tag %r should not specify 'flowgraph:inport:' prefix" \
% params.inportType
tags.append("flowgraph:inport:%s:" % params.inportType)
return tags
def create_meta(self, description, tags):
raise NotImplementedError
def get_datatype(self):
raise NotImplementedError
def set_initial_value(self, value):
self.update_value(value)
@DefaultController.Reset
def reset(self, task=None):
# release old monitor
self.close_monitor()
# make the connection in cothread's thread, use caget for initial value
pvs = [self.params.rbv]
if self.params.pv:
pvs.append(self.params.pv)
ca_values = self.cothread.CallbackResult(
self.catools.caget, pvs,
format=self.catools.FORMAT_CTRL, datatype=self.get_datatype())
# check connection is ok
for i, v in enumerate(ca_values):
assert v.ok, "CA connect failed with %s" % v.state_strings[v.state]
self.set_initial_value(ca_values[0])
self.log_debug("ca values connected %s", ca_values)
# now setup monitor on rbv
self.monitor = self.cothread.CallbackResult(
self.catools.camonitor, self.params.rbv, self.update_value,
format=self.catools.FORMAT_TIME, datatype=self.get_datatype(),
notify_disconnect=True, all_updates=True)
@DefaultController.Disable
def close_monitor(self, task=None):
if self.monitor is not None:
self.cothread.CallbackResult(self.monitor.close)
self.monitor = None
def format_caput_value(self, value):
self.log_info("caput -c -w 1000 %s %s", self.params.pv, value)
return value
def caput(self, value):
value = self.format_caput_value(value)
self.cothread.CallbackResult(
self.catools.caput, self.params.pv, value, wait=True, timeout=None,
datatype=self.get_datatype())
# now do a caget
value = self.cothread.CallbackResult(
self.catools.caget, self.params.rbv,
format=self.catools.FORMAT_TIME, datatype=self.get_datatype())
self.update_value(value)
def update_value(self, value):
# TODO: make Alarm from value.status and value.severity
# TODO: make Timestamp from value.timestamp
if not value.ok:
# TODO: set disconnect
self.attr.set_value(None)
else:
# update value
self.attr.set_value(value)