Skip to content

Commit

Permalink
Merge 7b5fb37 into 65dffff
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelWedel committed May 9, 2017
2 parents 65dffff + 7b5fb37 commit d503b89
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 86 deletions.
24 changes: 14 additions & 10 deletions lewis/adapters/epics.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,11 @@ def _function_has_n_args(self, func, n):

@has_log
class PropertyExposingDriver(Driver):
def __init__(self, interface):
def __init__(self, interface, device_lock):
super(PropertyExposingDriver, self).__init__()

self._interface = interface
self._device_lock = device_lock
self._set_logging_context(interface)

self._timers = {k: 0.0 for k in self._interface.bound_pvs.keys()}
Expand All @@ -410,10 +411,11 @@ def write(self, pv, value):
return False

try:
pv_object.value = value
self.setParam(pv, pv_object.value)
with self._device_lock:
pv_object.value = value
self.setParam(pv, pv_object.value)

return True
return True
except LimitViolationException as e:
self.log.warning('Rejected writing value %s to PV %s due to limit '
'violation. %s', value, pv, e)
Expand All @@ -431,16 +433,18 @@ def process_pv_updates(self, force=False):

for pv, pv_object in iteritems(self._interface.bound_pvs):
if pv not in self._timers:
self._timers = 0.0
self._timers[pv] = 0.0

self._timers[pv] += dt
if self._timers[pv] >= pv_object.poll_interval or force:
try:
new_value = pv_object.value
self.setParam(pv, new_value)
self.setParamInfo(pv, pv_object.meta)
with self._device_lock:
new_value = pv_object.value

updates.append((pv, new_value))
self.setParam(pv, new_value)
self.setParamInfo(pv, pv_object.meta)

updates.append((pv, new_value))
except (AttributeError, TypeError):
self.log.exception('An error occurred while updating PV %s.', pv)
finally:
Expand Down Expand Up @@ -507,7 +511,7 @@ def start_server(self):
self._server = SimpleServer()
self._server.createPV(prefix=self._options.prefix,
pvdb={k: v.config for k, v in self.interface.bound_pvs.items()})
self._driver = PropertyExposingDriver(interface=self.interface)
self._driver = PropertyExposingDriver(interface=self.interface, device_lock=self.lock)
self._driver.process_pv_updates(force=True)

self.log.info('Started serving PVs: %s',
Expand Down
15 changes: 10 additions & 5 deletions lewis/adapters/modbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,15 @@ def __init__(self, sender, datastore):
0x10: self._handle_write_multiple_registers,
}

def process(self, data):
def process(self, data, device_lock):
"""
Process as much of given data as possible.
Any remainder, in case there is an incomplete frame at the end, is stored so that
processing may continue where it left off when more data is provided.
:param data: Incoming byte data. Must be compatible with bytearray.
:param device_lock: threading.Lock instance that is acquired for device interaction.
"""
self._buffer.extend(bytearray(data))

Expand All @@ -307,7 +308,9 @@ def process(self, data):
'Request: %s', str(['{:#04x}'.format(c) for c in request.to_bytearray()]))

handler = self._get_handler(request.fcode)
response = handler(request)

with device_lock:
response = handler(request)

self.log.debug(
'Request: %s', str(['{:#04x}'.format(c) for c in response.to_bytearray()]))
Expand Down Expand Up @@ -536,7 +539,7 @@ def __init__(self, sock, interface, server):

def handle_read(self):
data = self.recv(8192)
self._modbus.process(data)
self._modbus.process(data, self._server.device_lock)

def handle_close(self):
self.log.info('Closing connection to client %s:%s', *self.socket.getpeername())
Expand All @@ -546,8 +549,9 @@ def handle_close(self):

@has_log
class ModbusServer(asyncore.dispatcher):
def __init__(self, host, port, interface=None):
def __init__(self, host, port, interface, lock):
asyncore.dispatcher.__init__(self)
self.device_lock = lock
self.interface = interface
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
Expand Down Expand Up @@ -589,7 +593,8 @@ def __init__(self, options=None):
self._server = None

def start_server(self):
self._server = ModbusServer(self._options.bind_address, self._options.port, self.interface)
self._server = ModbusServer(
self._options.bind_address, self._options.port, self.interface, self.lock)

def stop_server(self):
if self._server is not None:
Expand Down
26 changes: 14 additions & 12 deletions lewis/adapters/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,21 @@ def found_terminator(self):

self.log.debug('Got request %s', request)

try:
cmd = next((cmd for cmd in self.target.bound_commands if cmd.can_process(request)),
None)
with self._stream_server.device_lock:
try:
cmd = next((cmd for cmd in self.target.bound_commands if cmd.can_process(request)),
None)

if cmd is None:
raise RuntimeError('None of the device\'s commands matched.')
if cmd is None:
raise RuntimeError('None of the device\'s commands matched.')

self.log.info('Processing request %s using command %s', request, cmd.raw_pattern)
self.log.info('Processing request %s using command %s', request, cmd.raw_pattern)

reply = cmd.process_request(request)
reply = cmd.process_request(request)

except Exception as error:
reply = self.target.handle_error(request, error)
self.log.debug('Error while processing request', exc_info=error)
except Exception as error:
reply = self.target.handle_error(request, error)
self.log.debug('Error while processing request', exc_info=error)

if reply is not None:
self.log.debug('Sending reply %s', reply)
Expand All @@ -80,9 +81,10 @@ def handle_close(self):

@has_log
class StreamServer(asyncore.dispatcher):
def __init__(self, host, port, target):
def __init__(self, host, port, target, device_lock):
asyncore.dispatcher.__init__(self)
self.target = target
self.device_lock = device_lock
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
Expand Down Expand Up @@ -475,7 +477,7 @@ def start_server(self):
self.interface.out_terminator = '\r\n'

self._server = StreamServer(self._options.bind_address, self._options.port,
self.interface)
self.interface, self.lock)

def stop_server(self):
if self._server is not None:
Expand Down
101 changes: 75 additions & 26 deletions lewis/core/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,30 @@
be used to store multiple adapters and manage them together.
"""
import inspect
import threading
from collections import namedtuple
from time import sleep

from lewis.core.exceptions import LewisException
from lewis.core.logging import has_log
from lewis.core.utils import dict_strict_update


class NoLock(object):
"""
A dummy context manager that raises a RuntimeError when it's used. This makes it easier to
detect cases where an :class:`Adapter` has not received the proper lock-object to make sure
that device/interface access is synchronous.
"""

def __enter__(self):
raise RuntimeError(
'The attempted action requires a proper threading.Lock-object, '
'but none was available.')

def __exit__(self, exc_type, exc_val, exc_tb):
pass


@has_log
class Adapter(object):
"""
Expand Down Expand Up @@ -62,6 +78,11 @@ class Adapter(object):
in the ``default_options`` member of the class are accepted. Inheriting classes must override
``default_options`` to be a dictionary with the possible options for the adapter.
Each adapter has a ``lock`` member, which contains a :class:`NoLock` by default. To make
device access thread-safe, any adapter should acquire this lock before interacting with
the device (or interface). This means that before starting the server component of an Adapter,
a proper Lock-object needs to be assigned to ``lock``.
:param options: Configuration options for the adapter.
"""
default_options = {}
Expand All @@ -70,8 +91,9 @@ def __init__(self, options=None):
super(Adapter, self).__init__()
self._interface = None

options = options or {}
self.lock = NoLock()

options = options or {}
combined_options = dict(self.default_options)

try:
Expand Down Expand Up @@ -189,18 +211,31 @@ class AdapterCollection(object):
names at all will start/stop all adapters. These semantics also apply for :meth:`is_connected`
and `documentation`.
The :meth:`handle` implementation will call all the stored adapters' ``handle`` methods if they
are running, otherwise ``time.sleep`` is called.
This class also makes sure that all adapters use the same Lock for device interaction.
:param args: List of adapters to add to the container
"""

def __init__(self, *args):
self._adapters = {}

self._threads = {}
self._stop = {}
self._lock = threading.Lock()

for adapter in args:
self.add_adapter(adapter)

@property
def device_lock(self):
"""
This lock is passed to each adapter when it's started. It's supposed to be used to ensure
that the device is only accessed from one thread at a time, for example during network IO.
:class:`~lewis.core.simulation.Simulation` uses this lock to block the device during the
simulation cycle calculations.
"""
return self._lock

def add_adapter(self, adapter):
"""
Adds the supplied adapter to the container but raises a ``RuntimeError`` if there's
Expand All @@ -227,47 +262,61 @@ def remove_adapter(self, protocol):

del self._adapters[protocol]

def handle(self, cycle_delay):
"""
Calls all stored and running adapters' ``handle``-methods or sleeps for the specified
amount in the rest of the cases.
:param cycle_delay: Approximate time to spend processing adapters.
"""
delay_per_adapter = cycle_delay / len(self._adapters)

for adapter in self._adapters.values():
if adapter.is_running:
adapter.handle(delay_per_adapter)
else:
sleep(delay_per_adapter)

@property
def protocols(self):
"""List of protocols for which adapters are registered."""
return list(self._adapters.keys())

def connect(self, *args):
"""
Calls :meth:`~Adapter.start_server` on each adapter that correspond to the supplied
protocols. If no arguments are supplied, all adapters are started.
This method starts an adapter for each specified protocol in a separate thread, if the
adapter is not already running.
:param args: List of protocols for which to start adapters or empty for all.
"""
for adapter in self._get_adapters(args):
self._start_server(adapter)

def _start_server(self, adapter):
if adapter.protocol not in self._threads:
self.log.info('Connecting device interface for protocol \'%s\'', adapter.protocol)
adapter.start_server()

adapter_thread = threading.Thread(target=self._adapter_loop, args=(adapter, 0.1))
adapter_thread.daemon = True

self._threads[adapter.protocol] = adapter_thread

adapter_thread.start()

def _adapter_loop(self, adapter, dt):
adapter.lock = self._lock # This ensures that the adapter is using the correct lock.
adapter.start_server()

self.log.debug('Starting adapter loop.')
while not self._stop.get(adapter.protocol, False):
adapter.handle(dt)

adapter.stop_server()

del self._stop[adapter.protocol]

def disconnect(self, *args):
"""
Calls :meth:`~Adapter.stop_server` on each adapter that correspond to the supplied
protocols. If no arguments are supplied, all adapters are stopped.
Stops all adapters for the specified protocols. The method waits for each adapter thread
to join, so it might hang if the thread is not terminating correctly.
:param args: List of protocols for which to stop adapters or empty for all.
"""
for adapter in self._get_adapters(args):
self.log.info('Disonnecting device interface for protocol \'%s\'', adapter.protocol)
adapter.stop_server()
self._stop_server(adapter)

def _stop_server(self, adapter):
if adapter.protocol in self._threads:
self.log.info('Disconnecting device interface for protocol \'%s\'', adapter.protocol)

self._stop[adapter.protocol] = True
self._threads[adapter.protocol].join()
del self._threads[adapter.protocol]

def is_connected(self, *args):
"""
Expand Down
17 changes: 10 additions & 7 deletions lewis/core/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

from datetime import datetime
from time import sleep

from lewis.core.adapters import AdapterCollection
from lewis.core.control_server import ControlServer, ExposedObject
Expand All @@ -47,7 +48,7 @@ class Simulation(object):
In the simplest case, the actual time-delta between two cycles
is passed to the simulated device so that it can update its internal
state according to the elapsed time. It is however possible to set
a simulation speed, which serves as a multiplier for this time.
a simulation speed, which serv es as a multiplier for this time.
If the speed is set to 2 and 0.1 seconds pass between two cycles,
the simulation is asked to simulate 0.2 seconds, and so on. Speed 0
effectively stops all time dependent calculations in the
Expand Down Expand Up @@ -125,7 +126,7 @@ def _create_control_server(self, control_server):
'interface': ExposedObject(
self._adapters,
exclude=('add_adapter', 'remove_adapter', 'handle', 'log'),
exclude_inherited=True
exclude_inherited=False
)},
control_server)

Expand Down Expand Up @@ -196,9 +197,6 @@ def _process_cycle(self, delta):

self._process_simulation_cycle(delta)

if self._control_server:
self._control_server.process()

delta = seconds_since(start)

return delta
Expand All @@ -215,11 +213,16 @@ def _process_simulation_cycle(self, delta):
"""
self.log.debug('Cycle, dt=%s', delta)

self._adapters.handle(self._cycle_delay)
sleep(self._cycle_delay)

if self._running:
delta_simulation = delta * self._speed
self._device.process(delta_simulation)

with self._adapters.device_lock:
self._device.process(delta_simulation)

if self._control_server:
self._control_server.process()

self._cycles += 1
self._runtime += delta_simulation
Expand Down

0 comments on commit d503b89

Please sign in to comment.