Skip to content

Commit

Permalink
Add CAPart
Browse files Browse the repository at this point in the history
  • Loading branch information
coretl committed Jul 15, 2016
1 parent d02595f commit 7b4d7c1
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 2 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
Expand Down
Empty file added malcolm/parts/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions malcolm/parts/capart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import cothread
from cothread import catools

from malcolm.core.part import Part
from malcolm.core.controller import Controller
from malcolm.core.attribute import Attribute


class CAPart(Part):

def __init__(self, name, process, block, meta, pv, rbv=None, rbv_suff=None,
long_string=False):
super(CAPart, self).__init__(name=name, process=process, block=block)
if rbv is None:
if rbv_suff is None:
rbv = pv
else:
rbv = pv + rbv_suff
# Meta instance
self.meta = meta
# Pv strings
self.pv = pv
self.rbv = rbv
# should we put as a long string
self.long_string = long_string
# The attribute we will be publishing
self.attr = Attribute(self.name, self.meta)
self.attr.set_put_function(self.caput)
self.block.add_attribute(self.attr)
# camonitor subscription
self.monitor = None

def get_datatype(self):
# TODO: use meta to infer datatype
if self.long_string:
datatype = catools.DBR_CHAR_STR
else:
datatype = None
return datatype

@Controller.Resetting
def connect_pvs(self):
# release old monitor
self.close_monitor()
# need to make the connection in cothread's thread
pvs = [self.pv, self.rbv]
cainfo = cothread.CallbackResult(catools.connect, pvs, cainfo=True)
# check connection is ok
assert cainfo.ok, \
"CA connect failed with %s" % cainfo.state_strings[cainfo.state]
# now setup monitor on rbv
self.monitor = catools.camonitor(
self.rbv, on_update=self.on_update, format=catools.FORMAT_TIME,
datatype=self.get_datatype(), notify_disconnect=True)

def close_monitor(self):
if self.monitor is not None:
cothread.CallbackResult(self.monitor.close)
self.monitor = None

def caput(self, value):
cothread.CallbackResult(
catools.caput, self.pv, value, wait=True, timeout=None)
# now do a caget
value = cothread.CallbackResult(
catools.caget, self.rbv)
self.update_value(value)

def on_update(self, value):
# Called on cothread's queue, so don't block
self.process.spawn(self.update_value, value)

def update_value(self, value):
self.log_debug("Camonitor update %r", value)
# TODO: make Alarm from value.status and value.severity
# TODO: make Timestamp from value.timestamp
with self.block.lock:
if not value.ok:
# disconnect
self.block.state.set_value(Controller.stateMachine.FAULT,
notify=False)
self.block.status.set_value("CA disconnect on %s" % value.name,
notify=False)
self.block.busy.set_value(False)
self.close_monitor()
else:
# update value
self.attr.set_value(value)
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ nose>=1.3.0
coverage>=3.7.1
tornado>=4.1
git+https://github.com/dls-controls/scanpointgenerator.git
cothread
2 changes: 1 addition & 1 deletion tests/setup_malcolm_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "scanpointgenerator"))

from pkg_resources import require
require("mock", "numpy", "tornado")
require("mock", "numpy", "tornado", "cothread")
Empty file added tests/test_parts/__init__.py
Empty file.
114 changes: 114 additions & 0 deletions tests/test_parts/test_capart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
import setup_malcolm_paths

import unittest
from mock import MagicMock

try:
import cothread
except:
# cothread doesn't work on python3 at the moment
cothread = MagicMock()
def callback_result(f, *args, **kwargs):
return f(*args, **kwargs)
cothread.CallbackResult.side_effect = callback_result
sys.modules["cothread"] = cothread
catools = MagicMock()
cothread.catools = catools

# logging
# import logging
# logging.basicConfig(level=logging.DEBUG)

# module imports
from malcolm.core.block import DummyLock


class TestCAPart(unittest.TestCase):

def test_init(self):
p = self.create_part()
self.assertEqual(p.rbv, "pv2")
p.block.add_attribute.assert_called_once_with(p.attr)

def test_init_rbv(self):
from malcolm.parts.capart import CAPart
meta = MagicMock()
meta.name = "meta"
p = CAPart("me", MagicMock(), MagicMock(), meta, "pv", "pv3")
self.assertEqual(p.rbv, "pv3")
self.assertEqual(p.pv, "pv")

def create_part(self):
from malcolm.parts.capart import CAPart
process = MagicMock()
block = MagicMock()
meta = MagicMock()
meta.name = "meta"
p = CAPart("me", process, block, meta, "pv", rbv_suff="2")
return p

def test_reset(self):
p = self.create_part()
catools.connect.return_value = MagicMock(ok=True)
p.connect_pvs()
catools.connect.assert_called_with(["pv", "pv2"], cainfo=True)
catools.camonitor.assert_called_once_with(
"pv2", on_update=p.on_update, format=catools.FORMAT_TIME,
datatype=None, notify_disconnect=True)
self.assertEqual(p.monitor, catools.camonitor())

def test_caput(self):
class caint(int):
ok = True
catools.caget.return_value = caint(3)
p = self.create_part()
p.attr.put(32)
catools.caput.assert_called_once_with(
"pv", 32, wait=True, timeout=None)
catools.caget.assert_called_once_with(
"pv2")
p.meta.validate.assert_called_once_with(catools.caget.return_value)
self.assertEqual(p.attr.value, p.meta.validate())

def test_monitor_update(self):
p = self.create_part()
p.on_update("value")
p.process.spawn.assert_called_once_with(p.update_value, "value")

def test_close_monitor(self):
p = self.create_part()
m = MagicMock()
p.monitor = m
p.close_monitor()
m.close.assert_called_once_with()
self.assertEqual(p.monitor, None)

def test_get_datatype(self):
p = self.create_part()
p.long_string = True
self.assertEqual(p.get_datatype(), catools.DBR_CHAR_STR)

def test_update_value_good(self):
p = self.create_part()
p.block.lock = DummyLock()
value = MagicMock(ok=True)
p.update_value(value)
p.meta.validate.assert_called_once_with(value)
self.assertEqual(p.attr.value, p.meta.validate())

def test_update_value_bad(self):
p = self.create_part()
p.block.lock = DummyLock()
value = MagicMock(ok=False)
p.update_value(value)
p.block.state.set_value.assert_called_once_with(
"Fault", notify=False)
p.block.status.set_value.assert_called_once_with(
"CA disconnect on %s" % value.name, notify=False)
p.block.busy.set_value.assert_called_once_with(False)

if __name__ == "__main__":
unittest.main(verbosity=2)

0 comments on commit 7b4d7c1

Please sign in to comment.