Skip to content

Commit

Permalink
Merge pull request #13 from dls-controls/queueing
Browse files Browse the repository at this point in the history
Queueing
  • Loading branch information
T-Nicholls committed Aug 9, 2019
2 parents 73cc13c + 78ae5cc commit 318b11f
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 38 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ name = "pypi"
pytest = "*"
pytest-cov = "*"
coveralls = "*"
testfixtures = "*"
mock = "*"
flake8 = "*"
sphinx = "*"
Expand Down
52 changes: 42 additions & 10 deletions atip/sim_data_sources.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Module containing the pytac data sources for the AT simulator."""
import logging
from functools import partial

import at
import pytac
from pytac.exceptions import FieldException, HandleException
from pytac.exceptions import (FieldException, HandleException,
ControlSystemException)


class ATElementDataSource(pytac.data_source.DataSource):
Expand Down Expand Up @@ -116,24 +118,39 @@ def add_field(self, field):
else:
self._fields.append(field)

def get_value(self, field, handle=None, throw=None):
def get_value(self, field, handle=None, throw=True):
"""Get the value for a field.
Args:
field (str): The requested field.
handle (str, optional): Handle is not needed and is only here to
conform with the structure of the
DataSource base class.
throw (bool, optional): Throw is not needed and is only here to
conform with the structure of the
DataSource base class.
throw (bool, optional): If the check for completion of outstanding
calculations times out, then:
if True, raise a ControlSystemException;
if False, log a warning and return the
potentially out of date data anyway.
Returns:
float: The value of the specified field on this data source.
Raises:
FieldException: if the specified field does not exist.
ControlSystemException: if the calculation completion check fails,
and throw is True.
"""
# Wait for any outstanding calculations to conclude, to ensure they are
# complete before a value is returned; if the wait times out then raise
# an error message or log a warning according to the value of throw.
if not self._atsim.wait_for_calculations():
error_msg = ("Check for completion of outstanding "
"calculations timed out.")
if throw:
raise ControlSystemException(error_msg)
else:
logging.warning("Potentially out of date data returned. " +
error_msg)
# Again we assume that every set field has a corresponding get field.
if field in self._fields:
return self._get_field_funcs[field]()
Expand All @@ -160,7 +177,7 @@ def set_value(self, field, value, throw=None):
if field in self._fields:
if field in self._set_field_funcs.keys():
self._atsim.up_to_date.Reset()
self._atsim.queue.Signal((self._make_change, field, value))
self._atsim.queue_set(self._make_change, field, value)
else:
raise HandleException("Field {0} cannot be set on element data"
" source {1}.".format(field, self))
Expand Down Expand Up @@ -384,24 +401,39 @@ def get_fields(self):
"""
return list(self._field_funcs.keys())

def get_value(self, field, handle=None, throw=None):
def get_value(self, field, handle=None, throw=True):
"""Get the value for a field on the Pytac lattice.
Args:
field (str): The requested field.
handle (str, optional): Handle is not needed and is only here to
conform with the structure of the
DataSource base class.
throw (bool, optional): Throw is not needed and is only here to
conform with the structure of the
DataSource base class.
throw (bool, optional): If the check for completion of outstanding
calculations times out, then:
if True, raise a ControlSystemException;
if False, log a warning and return the
potentially out of date data anyway.
Returns:
float: The value of the specified field on this data source.
Raises:
FieldException: if the specified field does not exist.
ControlSystemException: if the calculation completion check fails,
and throw is True.
"""
# Wait for any outstanding calculations to conclude, to ensure they are
# complete before a value is returned; if the wait times out then raise
# an error message or log a warning according to the value of throw.
if not self._atsim.wait_for_calculations():
error_msg = ("Check for completion of outstanding "
"calculations timed out.")
if throw:
raise ControlSystemException(error_msg)
else:
logging.warning("Potentially out of date data returned. " +
error_msg)
if field in list(self._field_funcs.keys()):
if field.startswith('phase'):
return self._field_funcs[field]('p' + field[-1])
Expand Down
26 changes: 19 additions & 7 deletions atip/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class ATSimulator(object):
**Attributes**
Attributes:
queue (cothread.EventQueue): A queue of changes to be made to the
lattice on the next recalculation cycle.
up_to_date (cothread.Event): A flag that indicates if the physics data
is up to date with all the changes made
to the AT lattice.
Expand All @@ -35,6 +33,9 @@ class ATSimulator(object):
ohmi_envelope (see at.lattice.radiation.py).
_lindata (tuple): Linear optics data, the output of the AT physics
function linopt (see at.lattice.linear.py).
_queue (cothread.EventQueue): A queue of changes to be applied to
the centralised lattice on the next
recalculation cycle.
_paused (cothread.Event): A flag used to temporarily pause the
physics calculations.
_calculation_thread (cothread.Thread): A thread to check the queue
Expand Down Expand Up @@ -70,19 +71,30 @@ def __init__(self, at_lattice, callback=None):
self._lindata = self._at_lat.linopt(refpts=self._rp, get_chrom=True,
coupled=False)
# Threading stuff initialisation.
self.queue = cothread.EventQueue()
self.up_to_date = cothread.Event()
self._queue = cothread.EventQueue()
# Explicitly manage the cothread Events, so turn off auto_reset.
self._paused = cothread.Event(auto_reset=False)
self.up_to_date = cothread.Event(auto_reset=False)
self.up_to_date.Signal()
self._paused = cothread.Event()
self._calculation_thread = cothread.Spawn(self._recalculate_phys_data,
callback)

def queue_set(self, func, field, value):
"""Add a change to the queue, to be applied when the queue is emptied.
Args:
func (callable): The function to be called to apply the change.
field (str): The field to be changed.
value (float): The value to be set.
"""
self._queue.Signal((func, field, value))

def _gather_one_sample(self):
"""If the queue is empty Wait() yields until an item is added. When the
queue is not empty the oldest change will be removed and applied to the
AT lattice.
"""
apply_change_method, field, value = self.queue.Wait()
apply_change_method, field, value = self._queue.Wait()
apply_change_method(field, value)

def _recalculate_phys_data(self, callback):
Expand All @@ -106,7 +118,7 @@ def _recalculate_phys_data(self, callback):
"""
while True:
self._gather_one_sample()
for i in range(len(self.queue)):
while self._queue:
self._gather_one_sample()
if bool(self._paused) is False:
try:
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ numpy
scipy
pytest
pytest-cov
testfixtures
coveralls
mock
flake8
Expand Down
18 changes: 9 additions & 9 deletions test/test_at_simulator_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ def test_ATSimulator_creation(atsim, initial_emit, initial_lin):
# Check initial state of flags.
assert bool(atsim._paused) is False
assert bool(atsim.up_to_date) is True
assert len(atsim.queue) == 0
assert len(atsim._queue) == 0
# Check emittance and lindata are initially calculated correctly.
assert _initial_phys_data(atsim, initial_emit, initial_lin) is True


def test_recalculate_phys_data_queue(atsim):
elem_ds = mock.Mock()
atsim.up_to_date.Reset()
atsim.queue.Signal((elem_ds._make_change, 'a_field', 12))
assert len(atsim.queue) == 1
atsim.queue_set(elem_ds._make_change, 'a_field', 12)
assert len(atsim._queue) == 1
atsim.wait_for_calculations()
assert len(atsim.queue) == 0
assert len(atsim._queue) == 0
elem_ds._make_change.assert_called_once_with('a_field', 12)


Expand All @@ -71,7 +71,7 @@ def test_recalculate_phys_data(atsim, initial_emit, initial_lin):
# Check that errors raised inside thread are converted to warnings.
atsim._at_lat[5].PolynomB[0] = 1.e10
atsim.up_to_date.Reset()
atsim.queue.Signal((mock.Mock(), 'f', 0))
atsim.queue_set(mock.Mock(), 'f', 0)
with pytest.warns(at.AtWarning):
atsim.wait_for_calculations()
atsim._at_lat[5].PolynomB[0] = 0.0
Expand All @@ -87,7 +87,7 @@ def test_recalculate_phys_data(atsim, initial_emit, initial_lin):
atsim._at_lat[21].PolynomB[2] = -75
# Clear the flag and then wait for the calculations
atsim.up_to_date.Reset()
atsim.queue.Signal((mock.Mock(), 'f', 0))
atsim.queue_set(mock.Mock(), 'f', 0)
atsim.wait_for_calculations()
# Get the applicable physics data
orbit = [atsim.get_orbit('x')[0], atsim.get_orbit('y')[0]]
Expand Down Expand Up @@ -116,11 +116,11 @@ def test_toggle_calculations_and_wait_for_calculations(atsim, initial_lin,
atsim.toggle_calculations()
atsim._at_lat[5].PolynomB[1] = 2.5
atsim.up_to_date.Reset()
atsim.queue.Signal((mock.Mock(), 'f', 0))
atsim.queue_set(mock.Mock(), 'f', 0)
assert atsim.wait_for_calculations(2) is False
assert _initial_phys_data(atsim, initial_emit, initial_lin) is True
atsim.toggle_calculations()
atsim.queue.Signal((mock.Mock(), 'f', 0))
atsim.queue_set(mock.Mock(), 'f', 0)
assert atsim.wait_for_calculations() is True
assert _initial_phys_data(atsim, initial_emit, initial_lin) is False

Expand All @@ -134,7 +134,7 @@ def test_recalculate_phys_data_callback(at_lattice):
callback_func = mock.Mock()
atsim = atip.simulator.ATSimulator(at_lattice, callback_func)
atsim.up_to_date.Reset()
atsim.queue.Signal((mock.Mock(), 'f', 0))
atsim.queue_set(mock.Mock(), 'f', 0)
atsim.wait_for_calculations()
callback_func.assert_called_once_with()

Expand Down
34 changes: 28 additions & 6 deletions test/test_element_data_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import at
import mock
import pytest
from pytac.exceptions import FieldException, HandleException
from pytac.exceptions import (FieldException, HandleException,
ControlSystemException)
from testfixtures import LogCapture

import atip

Expand Down Expand Up @@ -82,6 +84,24 @@ def test_elem_add_field_raises_FieldExceptions_correctly(at_elem, field):
ateds.add_field(field)


def test_elem_get_value_handles_calculation_check_time_out_correctly(at_elem):
atsim = mock.Mock()
ateds = atip.sim_data_sources.ATElementDataSource(at_elem, 1, atsim, ['f'])
atsim.wait_for_calculations.return_value = False
# Check fails, throw is True, so exception is raised.
with pytest.raises(ControlSystemException):
ateds.get_value('f', throw=True)
# Check fails, throw is False, so warning is logged and value is returned.
with LogCapture() as log:
assert ateds.get_value('f', throw=False) == 0
log.check(('root', 'WARNING', 'Potentially out of date data returned. '
'Check for completion of outstanding calculations timed out.'))
atsim.wait_for_calculations.return_value = True
# Check doesn't fail, so doesn't raise error or warn and data is returned.
assert ateds.get_value('f', throw=True) == 0
assert ateds.get_value('f', throw=False) == 0


@pytest.mark.parametrize('field', ['not_a_field', 1, [], 'a1', 'X_KICK'])
def test_elem_get_value_raises_FieldException_if_nonexistent_field(at_elem,
field):
Expand Down Expand Up @@ -151,12 +171,14 @@ def test_elem_set_value_sets_up_to_date_flag(at_elem, field):
'f'])
def test_elem_set_value_adds_changes_to_queue(at_elem, field):
atsim = mock.Mock()
ateds = atip.sim_data_sources.ATElementDataSource(at_elem, 1, atsim,
[field])
ateds = atip.sim_data_sources.ATElementDataSource(
at_elem, 1, atsim, [field]
)
ateds.set_value(field, 1)
assert len(atsim.queue.mock_calls) == 1
assert atsim.queue.mock_calls[0] == mock.call.Signal((ateds._make_change,
field, 1))
assert len(atsim.queue_set.mock_calls) == 1
assert atsim.queue_set.mock_calls[0] == mock.call(
ateds._make_change, field, 1
)


@pytest.mark.parametrize('field,attr_str', [('x_kick', 'at_elem.KickAngle[0]'),
Expand Down
27 changes: 24 additions & 3 deletions test/test_lattice_data_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import mock
import pytac
import pytest
from pytac.exceptions import (FieldException, HandleException,
ControlSystemException)
from testfixtures import LogCapture

import atip

Expand Down Expand Up @@ -36,10 +38,29 @@ def test_lat_get_fields(atlds):
@pytest.mark.parametrize('field', ['not_a_field', 1, [], 'BETA', ['x', 'y']])
def test_lat_get_value_raises_FieldException_if_nonexistent_field(atlds,
field):
with pytest.raises(pytac.exceptions.FieldException):
with pytest.raises(FieldException):
atlds.get_value(field)


def test_lat_get_value_handles_calculation_check_time_out_correctly():
atsim = mock.Mock()
atsim.get_disp.return_value = 2.5
atlds = atip.sim_data_sources.ATLatticeDataSource(atsim)
atsim.wait_for_calculations.return_value = False
# Check fails, throw is True, so exception is raised.
with pytest.raises(ControlSystemException):
atlds.get_value('dispersion', throw=True)
# Check fails, throw is False, so warning is logged and value is returned.
with LogCapture() as log:
assert atlds.get_value('dispersion', throw=False) == 2.5
log.check(('root', 'WARNING', 'Potentially out of date data returned. '
'Check for completion of outstanding calculations timed out.'))
atsim.wait_for_calculations.return_value = True
# Check doesn't fail, so doesn't raise error or warn and data is returned.
assert atlds.get_value('dispersion', throw=True) == 2.5
assert atlds.get_value('dispersion', throw=False) == 2.5


def test_lat_get_value():
"""We don't need to test every value for get_value() as _field_funcs which
it relys on has alreadly been tested for all fields."""
Expand All @@ -60,5 +81,5 @@ def test_lat_get_value():
@pytest.mark.parametrize('field', ['not_a_field', 1, [], 'BETA', ['x', 'y'],
'chromaticity_x', 'dispersion'])
def test_lat_set_value_always_raises_HandleException(atlds, field):
with pytest.raises(pytac.exceptions.HandleException):
with pytest.raises(HandleException):
atlds.set_value(field, 0)
8 changes: 5 additions & 3 deletions test/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ def test_load_from_filepath(pytac_lattice, mat_filepath):
atip.load_sim.load_from_filepath(pytac_lattice, mat_filepath)


def test_load_with_callback(pytac_lattice, at_diad_lattice):
# Check load with non-callable raises TypeError
def test_load_with_non_callable_callback_raises_TypeError(pytac_lattice,
at_diad_lattice):
with pytest.raises(TypeError):
atip.load_sim.load(pytac_lattice, at_diad_lattice, '')
# Check load with callable


def test_load_with_callback(pytac_lattice, at_diad_lattice):
callback_func = mock.Mock()
lat = atip.load_sim.load(pytac_lattice, at_diad_lattice, callback_func)
atsim = lat._data_source_manager._data_sources[pytac.SIM]._atsim
Expand Down

0 comments on commit 318b11f

Please sign in to comment.