Skip to content

Commit

Permalink
First implementation of V4 monitors (server side).
Browse files Browse the repository at this point in the history
  • Loading branch information
ajgdls committed Oct 13, 2016
1 parent 22aeefe commit 9e7f6f1
Showing 1 changed file with 52 additions and 17 deletions.
69 changes: 52 additions & 17 deletions malcolm/comms/pva/pvaservercomms.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, process, _=None):

self._rpcs = {}
self._puts = {}
self._monitors = {}
self._dead_rpcs = []

# Create the V4 PVA server object
Expand Down Expand Up @@ -77,6 +78,14 @@ def _update_cache(self, response):
self._cache.apply_changes(*response.changes)
# Update the block list to create new PVA channels if required
self._update_block_list()
# Now check for any monitors that might need updating
for monitor in self._monitors:
for change in response.changes:
path = change[0]
block = path[0]
if self._monitors[monitor].get_block() == block:
self.log_debug("Updating monitor %s", monitor)
self._monitors[monitor].update(change)

def send_to_client(self, response):
"""Abstract method to dispatch response to a client
Expand Down Expand Up @@ -129,6 +138,11 @@ def register_rpc(self, id, rpc):
self.log_debug("Registering RPC object with ID %d", id)
self._rpcs[id] = rpc

def register_monitor(self, id, monitor):
with self._lock:
self.log_debug("Registering monitor object with ID %d", id)
self._monitors[id] = monitor

def register_put(self, id, put):
with self._lock:
self.log_debug("Registering Put object with ID %d", id)
Expand Down Expand Up @@ -275,14 +289,16 @@ def __init__(self, name, block, pva_server, server):
self._endpoint.registerEndpointGet(self.get_callback)
self._endpoint.registerEndpointPut(self.put_callback)
self._endpoint.registerEndpointRPC(self.rpc_callback)
#self._endpoint.registerEndpointMonitor(self.monitor_callback)
self._endpoint.registerEndpointMonitor(self.monitor_callback)
self._pva_server.registerEndpoint(self._block, self._endpoint)

# def monitor_callback(self, request):
# self.log_debug("Monitor callback called for: %s", self._block)
# self.log_debug("Request structure: %s", request.toDict())
# pva_impl = PvaMonitorImplementation()
# return pva_impl
def monitor_callback(self, request):
self.log_debug("Monitor callback called for: %s", self._block)
self.log_debug("Request structure: %s", request.toDict())
mon_id = self._server._get_unique_id()
pva_impl = PvaMonitorImplementation(mon_id, request, self._block, self._server)
self._server.register_monitor(mon_id, pva_impl)
return pva_impl

def get_callback(self, request):
self.log_debug("Get callback called for: %s", self._block)
Expand Down Expand Up @@ -472,14 +488,33 @@ def execute(self, args):
#self._server.register_dead_rpc(self._id)
return pv_object

#class PvaMonitorImplementation(Loggable):
# def __init__(self):
# self.set_logger_name("PvaMonitorImplementation")
#
# def getPVStructure(self):
# self.log_debug("getPVStructure called")
# pv_object = pvaccess.PvObject(OrderedDict({'value': pvaccess.INT}))
# return pv_object
#
# def set_event_queue(self, arg1):
# self.log_debug("setEventQueue called with argument: %s", arg1)

class PvaMonitorImplementation(Loggable):
def __init__(self, id, request, block, server):
self.set_logger_name("PvaMonitorImplementation")
self._id = id
self._block = block
self._server = server
self._request = request
self.log_debug("_server %s", self._server)
self._pv_structure = self._server.get_request(block, request)
self.mu = pvaccess.MonitorServiceUpdater()

def get_block(self):
return self._block

def getPVStructure(self):
self.log_debug("getPVStructure called")
return self._pv_structure

def getUpdater(self):
self.log_debug("getUpdater called")
return self.mu

def update(self, changes):
self.log_debug("update called")
self.log_debug("Changes: %s", changes)
path = ".".join(changes[0][1:])
if self._pv_structure.hasField(path):
self._pv_structure[path] = changes[1]
self.mu.update()

0 comments on commit 9e7f6f1

Please sign in to comment.