Skip to content

Commit

Permalink
Merge 6eca5b1 into 65dffff
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelWedel committed May 12, 2017
2 parents 65dffff + 6eca5b1 commit b194682
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 106 deletions.
8 changes: 6 additions & 2 deletions docs/release_notes/release_1_1_0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ This release is currently in progress.
New features
------------

The control client, lewis-control, now provides a version argument via ``--version`` or ``-v``.
The control client, lewis-control, now provides a version argument via ``--version`` or ``-v``.

::

$ lewis-control -v
$ lewis-control -v

Bug fixes and other improvements
--------------------------------
Expand All @@ -30,6 +30,10 @@ Bug fixes and other improvements
also the SVGs are in the `source repository`_, feel free to include them in presentations,
posters.

- Adapters now run in a different thread than the simulation itself. The consequence of this is
that slow network communication or expensive computations in the device do not influence
one another anymore. Otherwise, communication still works exactly like in previous versions.

Upgrade guide
-------------

Expand Down
53 changes: 30 additions & 23 deletions lewis/adapters/epics.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,11 @@ def _function_has_n_args(self, func, n):

@has_log
class PropertyExposingDriver(Driver):
def __init__(self, interface):
def __init__(self, interface, device_lock):
super(PropertyExposingDriver, self).__init__()

self._interface = interface
self._device_lock = device_lock
self._set_logging_context(interface)

self._timers = {k: 0.0 for k in self._interface.bound_pvs.keys()}
Expand All @@ -410,10 +411,11 @@ def write(self, pv, value):
return False

try:
pv_object.value = value
self.setParam(pv, pv_object.value)
with self._device_lock:
pv_object.value = value
self.setParam(pv, pv_object.value)

return True
return True
except LimitViolationException as e:
self.log.warning('Rejected writing value %s to PV %s due to limit '
'violation. %s', value, pv, e)
Expand All @@ -424,33 +426,37 @@ def write(self, pv, value):
return False

def process_pv_updates(self, force=False):
"""
Update PV values that have changed for PVs that are due to update according to their
respective poll interval timers.
:param force: If True, will force updates to all PVs regardless of timers.
"""
dt = seconds_since(self._last_update_call or datetime.now())
# Updates bound parameters as needed

# Cache details of PVs that need to update
updates = []

for pv, pv_object in iteritems(self._interface.bound_pvs):
if pv not in self._timers:
self._timers = 0.0

self._timers[pv] += dt
if self._timers[pv] >= pv_object.poll_interval or force:
try:
new_value = pv_object.value
self.setParam(pv, new_value)
self.setParamInfo(pv, pv_object.meta)

updates.append((pv, new_value))
except (AttributeError, TypeError):
self.log.exception('An error occurred while updating PV %s.', pv)
finally:
self._timers[pv] = 0.0
with self._device_lock:
for pv, pv_object in iteritems(self._interface.bound_pvs):
self._timers[pv] = self._timers.get(pv, 0.0) + dt
if self._timers[pv] >= pv_object.poll_interval or force:
try:
updates.append((pv, pv_object.value, pv_object.meta))
except (AttributeError, TypeError):
self.log.exception('An error occurred while updating PV %s.', pv)
finally:
self._timers[pv] = 0.0

for pv, value, info in updates:
self.setParam(pv, value)
self.setParamInfo(pv, info)

self.updatePVs()

if updates:
self.log.info('Processed PV updates: %s',
', '.join(('{}={}'.format(pv, val) for pv, val in updates)))
', '.join(('{}={}'.format(pv, val) for pv, val, _ in updates)))

self._last_update_call = datetime.now()

Expand Down Expand Up @@ -507,7 +513,8 @@ def start_server(self):
self._server = SimpleServer()
self._server.createPV(prefix=self._options.prefix,
pvdb={k: v.config for k, v in self.interface.bound_pvs.items()})
self._driver = PropertyExposingDriver(interface=self.interface)
self._driver = PropertyExposingDriver(interface=self.interface,
device_lock=self.device_lock)
self._driver.process_pv_updates(force=True)

self.log.info('Started serving PVs: %s',
Expand Down
28 changes: 16 additions & 12 deletions lewis/adapters/modbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,28 +291,30 @@ def __init__(self, sender, datastore):
0x10: self._handle_write_multiple_registers,
}

def process(self, data):
def process(self, data, device_lock):
"""
Process as much of given data as possible.
Any remainder, in case there is an incomplete frame at the end, is stored so that
processing may continue where it left off when more data is provided.
:param data: Incoming byte data. Must be compatible with bytearray.
:param device_lock: threading.Lock instance that is acquired for device interaction.
"""
self._buffer.extend(bytearray(data))

for request in self._buffered_requests():
self.log.debug(
'Request: %s', str(['{:#04x}'.format(c) for c in request.to_bytearray()]))
with device_lock:
for request in self._buffered_requests():
self.log.debug(
'Request: %s', str(['{:#04x}'.format(c) for c in request.to_bytearray()]))

handler = self._get_handler(request.fcode)
response = handler(request)
handler = self._get_handler(request.fcode)
response = handler(request)

self.log.debug(
'Request: %s', str(['{:#04x}'.format(c) for c in response.to_bytearray()]))
self.log.debug(
'Response: %s', str(['{:#04x}'.format(c) for c in response.to_bytearray()]))

self._send(response)
self._send(response)

def _buffered_requests(self):
"""Generator to yield all complete modbus requests in the internal buffer"""
Expand Down Expand Up @@ -536,7 +538,7 @@ def __init__(self, sock, interface, server):

def handle_read(self):
data = self.recv(8192)
self._modbus.process(data)
self._modbus.process(data, self._server.device_lock)

def handle_close(self):
self.log.info('Closing connection to client %s:%s', *self.socket.getpeername())
Expand All @@ -546,8 +548,9 @@ def handle_close(self):

@has_log
class ModbusServer(asyncore.dispatcher):
def __init__(self, host, port, interface=None):
def __init__(self, host, port, interface, device_lock):
asyncore.dispatcher.__init__(self)
self.device_lock = device_lock
self.interface = interface
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
Expand Down Expand Up @@ -589,7 +592,8 @@ def __init__(self, options=None):
self._server = None

def start_server(self):
self._server = ModbusServer(self._options.bind_address, self._options.port, self.interface)
self._server = ModbusServer(
self._options.bind_address, self._options.port, self.interface, self.device_lock)

def stop_server(self):
if self._server is not None:
Expand Down
26 changes: 14 additions & 12 deletions lewis/adapters/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,21 @@ def found_terminator(self):

self.log.debug('Got request %s', request)

try:
cmd = next((cmd for cmd in self.target.bound_commands if cmd.can_process(request)),
None)
with self._stream_server.device_lock:
try:
cmd = next((cmd for cmd in self.target.bound_commands if cmd.can_process(request)),
None)

if cmd is None:
raise RuntimeError('None of the device\'s commands matched.')
if cmd is None:
raise RuntimeError('None of the device\'s commands matched.')

self.log.info('Processing request %s using command %s', request, cmd.raw_pattern)
self.log.info('Processing request %s using command %s', request, cmd.raw_pattern)

reply = cmd.process_request(request)
reply = cmd.process_request(request)

except Exception as error:
reply = self.target.handle_error(request, error)
self.log.debug('Error while processing request', exc_info=error)
except Exception as error:
reply = self.target.handle_error(request, error)
self.log.debug('Error while processing request', exc_info=error)

if reply is not None:
self.log.debug('Sending reply %s', reply)
Expand All @@ -80,9 +81,10 @@ def handle_close(self):

@has_log
class StreamServer(asyncore.dispatcher):
def __init__(self, host, port, target):
def __init__(self, host, port, target, device_lock):
asyncore.dispatcher.__init__(self)
self.target = target
self.device_lock = device_lock
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
Expand Down Expand Up @@ -475,7 +477,7 @@ def start_server(self):
self.interface.out_terminator = '\r\n'

self._server = StreamServer(self._options.bind_address, self._options.port,
self.interface)
self.interface, self.device_lock)

def stop_server(self):
if self._server is not None:
Expand Down

0 comments on commit b194682

Please sign in to comment.