Skip to content

Commit

Permalink
Merge pull request #946 from gpiozero/mock-independence
Browse files Browse the repository at this point in the history
Mock independence
  • Loading branch information
waveform80 committed Apr 4, 2021
2 parents 39368c1 + 7c8c0b6 commit f203f4a
Show file tree
Hide file tree
Showing 15 changed files with 656 additions and 560 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# External utilities
PYTHON=python
PIP=pip
PYTEST=py.test
PYTEST=pytest
TWINE=twine
PYFLAGS=
DEST_DIR=/
Expand Down Expand Up @@ -73,7 +73,7 @@ develop:
$(PIP) install -e .[doc,test]

test:
$(PYTEST) tests -v
$(PYTEST)

clean:
rm -fr dist/ build/ .pytest_cache/ .mypy_cache/ $(NAME).egg-info/ tags .coverage
Expand Down
2 changes: 2 additions & 0 deletions docs/api_pins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,5 @@ Mock
.. autoclass:: gpiozero.pins.mock.MockChargingPin

.. autoclass:: gpiozero.pins.mock.MockTriggerPin

.. autoclass:: gpiozero.pins.mock.MockSPIDevice
139 changes: 25 additions & 114 deletions gpiozero/pins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

from . import SPI
from .pi import PiFactory, PiPin, SPI_HARDWARE_PINS, spi_port_device
from .spi import SPISoftwareBus
from ..devices import Device, SharedMixin
from .spi import SPISoftware
from ..devices import Device
from ..mixins import SharedMixin
from ..output_devices import OutputDevice
from ..exc import DeviceClosed, PinUnknownPi, SPIInvalidClockMode

Expand Down Expand Up @@ -88,12 +89,7 @@ def ticks():

@staticmethod
def ticks_diff(later, earlier):
# NOTE: technically the guarantee to always return a positive result
# cannot be maintained in versions where monotonic() is not available
# and we fall back to time(). However, in that situation we've no
# access to a true monotonic source, and no idea how far the clock has
# skipped back so this is the best we can do anyway.
return max(0, later - earlier)
return later - earlier


class LocalPiPin(PiPin):
Expand Down Expand Up @@ -121,7 +117,7 @@ class LocalPiHardwareSPI(SPI):
def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin, pin_factory):
self._port, self._device = spi_port_device(
clock_pin, mosi_pin, miso_pin, select_pin)
self._interface = None
self._bus = None
if SpiDev is None:
raise ImportError('failed to import spidev')
super().__init__(pin_factory=pin_factory)
Expand All @@ -131,20 +127,20 @@ def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin, pin_factory):
if miso_pin is not None:
to_reserve.add(miso_pin)
self.pin_factory.reserve_pins(self, *to_reserve)
self._interface = SpiDev()
self._interface.open(self._port, self._device)
self._interface.max_speed_hz = 500000
self._bus = SpiDev()
self._bus.open(self._port, self._device)
self._bus.max_speed_hz = 500000

def close(self):
if self._interface is not None:
self._interface.close()
self._interface = None
if self._bus is not None:
self._bus.close()
self._bus = None
self.pin_factory.release_all(self)
super().close()

@property
def closed(self):
return self._interface is None
return self._bus is None

def __repr__(self):
try:
Expand All @@ -159,126 +155,41 @@ def transfer(self, data):
:attr:`bits_per_word` bits or less) to the SPI interface, and reads an
equivalent number of words, returning them as a list of integers.
"""
return self._interface.xfer2(data)
return self._bus.xfer2(data)

def _get_clock_mode(self):
return self._interface.mode
return self._bus.mode

def _set_clock_mode(self, value):
self._interface.mode = value
self._bus.mode = value

def _get_lsb_first(self):
return self._interface.lsbfirst
return self._bus.lsbfirst

def _set_lsb_first(self, value):
self._interface.lsbfirst = bool(value)
self._bus.lsbfirst = bool(value)

def _get_select_high(self):
return self._interface.cshigh
return self._bus.cshigh

def _set_select_high(self, value):
self._interface.cshigh = bool(value)
self._bus.cshigh = bool(value)

def _get_bits_per_word(self):
return self._interface.bits_per_word
return self._bus.bits_per_word

def _set_bits_per_word(self, value):
self._interface.bits_per_word = value
self._bus.bits_per_word = value

def _get_rate(self):
return self._interface.max_speed_hz
return self._bus.max_speed_hz

def _set_rate(self, value):
self._interface.max_speed_hz = int(value)


class LocalPiSoftwareSPI(SPI):
def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin, pin_factory):
self._bus = None
self._select = None
super().__init__(pin_factory=pin_factory)
try:
self._clock_phase = False
self._lsb_first = False
self._bits_per_word = 8
self._bus = SPISoftwareBus(clock_pin, mosi_pin, miso_pin)
self._select = OutputDevice(
select_pin, active_high=False, pin_factory=pin_factory)
except:
self.close()
raise

def _conflicts_with(self, other):
return not (
isinstance(other, LocalPiSoftwareSPI) and
(self._select.pin.number != other._select.pin.number)
)
self._bus.max_speed_hz = int(value)

def close(self):
if self._select:
self._select.close()
self._select = None
if self._bus is not None:
self._bus.close()
self._bus = None
super().close()

@property
def closed(self):
return self._bus is None

def __repr__(self):
try:
self._check_open()
return 'SPI(clock_pin=%d, mosi_pin=%d, miso_pin=%d, select_pin=%d)' % (
self._bus.clock.pin.number,
self._bus.mosi.pin.number,
self._bus.miso.pin.number,
self._select.pin.number)
except DeviceClosed:
return 'SPI(closed)'

def transfer(self, data):
with self._bus.lock:
self._select.on()
try:
return self._bus.transfer(
data, self._clock_phase, self._lsb_first, self._bits_per_word)
finally:
self._select.off()

def _get_clock_mode(self):
with self._bus.lock:
return (not self._bus.clock.active_high) << 1 | self._clock_phase

def _set_clock_mode(self, value):
if not (0 <= value < 4):
raise SPIInvalidClockMode("%d is not a valid clock mode" % value)
with self._bus.lock:
self._bus.clock.active_high = not (value & 2)
self._clock_phase = bool(value & 1)

def _get_lsb_first(self):
return self._lsb_first

def _set_lsb_first(self, value):
self._lsb_first = bool(value)

def _get_bits_per_word(self):
return self._bits_per_word

def _set_bits_per_word(self, value):
if value < 1:
raise ValueError('bits_per_word must be positive')
self._bits_per_word = int(value)

def _get_select_high(self):
return self._select.active_high

def _set_select_high(self, value):
with self._bus.lock:
self._select.active_high = value
self._select.off()
class LocalPiSoftwareSPI(SPISoftware):
pass


class LocalPiHardwareSPIShared(SharedMixin, LocalPiHardwareSPI):
Expand Down

0 comments on commit f203f4a

Please sign in to comment.