-
Notifications
You must be signed in to change notification settings - Fork 303
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #927 from gpiozero/lg
Add lgpio pin implementation
- Loading branch information
Showing
6 changed files
with
366 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,348 @@ | ||
from __future__ import ( | ||
unicode_literals, | ||
absolute_import, | ||
print_function, | ||
division, | ||
) | ||
str = type('') | ||
|
||
import os | ||
|
||
import lgpio | ||
|
||
from . import SPI | ||
from .pi import spi_port_device | ||
from .local import LocalPiFactory, LocalPiPin | ||
from ..mixins import SharedMixin | ||
from ..exc import ( | ||
PinInvalidFunction, | ||
PinSetInput, | ||
PinFixedPull, | ||
PinInvalidPull, | ||
PinInvalidBounce, | ||
PinInvalidState, | ||
SPIBadArgs, | ||
SPIInvalidClockMode, | ||
PinPWMFixedValue, | ||
DeviceClosed | ||
) | ||
|
||
|
||
class LGPIOFactory(LocalPiFactory): | ||
""" | ||
Extends :class:`~gpiozero.pins.local.LocalPiFactory`. Uses the `lgpio`_ | ||
library to interface to the local computer's GPIO pins. The lgpio library | ||
simply talks to Linux gpiochip devices; it is not specific to the Raspberry | ||
Pi although this class is currently constructed under the assumption that | ||
it is running on a Raspberry Pi. | ||
You can construct lgpio pins manually like so:: | ||
from gpiozero.pins.lgpio import LGPIOFactory | ||
from gpiozero import LED | ||
factory = LGPIOFactory(chip=0) | ||
led = LED(12, pin_factory=factory) | ||
The *chip* parameter to the factory constructor specifies which gpiochip | ||
device to attempt to open. It defaults to 0 and thus doesn't normally need | ||
to be specified (the example above only includes it for completeness). | ||
The lgpio library relies on access to the :file:`/dev/gpiochip*` devices. | ||
If you run into issues, please check that your user has read/write access | ||
to the specific gpiochip device you are attempting to open (0 by default). | ||
.. _lgpio: http://abyz.me.uk/lg/py_lgpio.html | ||
""" | ||
def __init__(self, chip=0): | ||
super(LGPIOFactory, self).__init__() | ||
self._handle = lgpio.gpiochip_open(chip) | ||
self._chip = chip | ||
self.pin_class = LGPIOPin | ||
|
||
def close(self): | ||
super(LGPIOFactory, self).close() | ||
if self._handle is not None: | ||
lgpio.gpiochip_close(self._handle) | ||
self._handle = None | ||
|
||
@property | ||
def chip(self): | ||
return self._chip | ||
|
||
def _get_spi_class(self, shared, hardware): | ||
# support via lgpio instead of spidev | ||
if hardware: | ||
return [LGPIOHardwareSPI, LGPIOHardwareSPIShared][shared] | ||
return super(LGPIOFactory, self)._get_spi_class(shared, hardware=False) | ||
|
||
|
||
class LGPIOPin(LocalPiPin): | ||
""" | ||
Extends :class:`~gpiozero.pins.local.LocalPiPin`. Pin implementation for | ||
the `lgpio`_ library. See :class:`LGPIOFactory` for more information. | ||
.. _lgpio: http://abyz.me.uk/lg/py_lgpio.html | ||
""" | ||
GPIO_IS_KERNEL = 1 << 0 | ||
GPIO_IS_OUT = 1 << 1 | ||
GPIO_IS_ACTIVE_LOW = 1 << 2 | ||
GPIO_IS_OPEN_DRAIN = 1 << 3 | ||
GPIO_IS_OPEN_SOURCE = 1 << 4 | ||
GPIO_IS_BIAS_PULL_UP = 1 << 5 | ||
GPIO_IS_BIAS_PULL_DOWN = 1 << 6 | ||
GPIO_IS_BIAS_DISABLE = 1 << 7 | ||
GPIO_IS_LG_INPUT = 1 << 8 | ||
GPIO_IS_LG_OUTPUT = 1 << 9 | ||
GPIO_IS_LG_ALERT = 1 << 10 | ||
GPIO_IS_LG_GROUP = 1 << 11 | ||
|
||
GPIO_LINE_FLAGS_MASK = ( | ||
GPIO_IS_ACTIVE_LOW | GPIO_IS_OPEN_DRAIN | GPIO_IS_OPEN_SOURCE | | ||
GPIO_IS_BIAS_PULL_UP | GPIO_IS_BIAS_PULL_DOWN | GPIO_IS_BIAS_DISABLE) | ||
|
||
GPIO_EDGES = { | ||
'both': lgpio.BOTH_EDGES, | ||
'rising': lgpio.RISING_EDGE, | ||
'falling': lgpio.FALLING_EDGE, | ||
} | ||
|
||
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()} | ||
|
||
def __init__(self, factory, number): | ||
super(LGPIOPin, self).__init__(factory, number) | ||
self._pwm = None | ||
self._bounce = None | ||
self._callback = None | ||
self._edges = lgpio.BOTH_EDGES | ||
lgpio.gpio_claim_input( | ||
self.factory._handle, self.number, lgpio.SET_BIAS_DISABLE) | ||
|
||
def close(self): | ||
if self.factory._handle is not None: | ||
# Closing is really just "resetting" the function of the pin; | ||
# we let the factory close deal with actually freeing stuff | ||
lgpio.gpio_claim_input( | ||
self.factory._handle, self.number, lgpio.SET_BIAS_DISABLE) | ||
|
||
def _get_function(self): | ||
mode = lgpio.gpio_get_mode(self.factory._handle, self.number) | ||
return ['input', 'output'][bool(mode & self.GPIO_IS_OUT)] | ||
|
||
def _set_function(self, value): | ||
# XXX What about existing callbacks? | ||
try: | ||
{ | ||
'input': lgpio.gpio_claim_input, | ||
'output': lgpio.gpio_claim_output, | ||
}[value](self.factory._handle, self.number) | ||
except KeyError: | ||
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self)) | ||
|
||
def _get_state(self): | ||
if self._pwm: | ||
return self._pwm[1] / 100 | ||
else: | ||
return bool(lgpio.gpio_read(self.factory._handle, self.number)) | ||
|
||
def _set_state(self, value): | ||
if self._pwm: | ||
freq, duty = self._pwm | ||
self._pwm = (freq, min(100, max(0, int(value * 100)))) | ||
lgpio.tx_pwm(self.factory._handle, self.number, *self._pwm) | ||
elif self.function == 'input': | ||
raise PinSetInput('cannot set state of pin %r' % self) | ||
else: | ||
lgpio.gpio_write(self.factory._handle, self.number, bool(value)) | ||
|
||
def _get_pull(self): | ||
mode = lgpio.gpio_get_mode(self.factory._handle, self.number) | ||
if mode & self.GPIO_IS_BIAS_PULL_UP: | ||
return 'up' | ||
elif mode & self.GPIO_IS_BIAS_PULL_DOWN: | ||
return 'down' | ||
else: | ||
return 'floating' | ||
|
||
def _set_pull(self, value): | ||
if self.function != 'input': | ||
raise PinFixedPull('cannot set pull on non-input pin %r' % self) | ||
if value != 'up' and self.factory.pi_info.pulled_up(repr(self)): | ||
raise PinFixedPull('%r has a physical pull-up resistor' % self) | ||
try: | ||
flags = { | ||
'up': lgpio.SET_BIAS_PULL_UP, | ||
'down': lgpio.SET_BIAS_PULL_DOWN, | ||
'floating': lgpio.SET_BIAS_DISABLE, | ||
}[value] | ||
except KeyError: | ||
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self)) | ||
else: | ||
# Simply calling gpio_claim_input is insufficient to change the | ||
# line flags on a pin; it needs to be freed and re-claimed | ||
lgpio.gpio_free(self.factory._handle, self.number) | ||
lgpio.gpio_claim_input(self.factory._handle, self.number, flags) | ||
|
||
def _get_frequency(self): | ||
if self._pwm: | ||
freq, duty = self._pwm | ||
return freq | ||
else: | ||
return None | ||
|
||
def _set_frequency(self, value): | ||
if not self._pwm and value is not None and value > 0: | ||
if self.function != 'output': | ||
raise PinPWMFixedValue('cannot start PWM on pin %r' % self) | ||
lgpio.tx_pwm(self.factory._handle, self.number, value, 0) | ||
self._pwm = (value, 0) | ||
elif self._pwm and value is not None and value > 0: | ||
freq, duty = self._pwm | ||
lgpio.tx_pwm(self.factory._handle, self.number, value, duty) | ||
self._pwm = (value, duty) | ||
elif self._pwm and (value is None or value == 0): | ||
lgpio.tx_pwm(self.factory._handle, self.number, 0, 0) | ||
self._pwm = None | ||
|
||
def _get_bounce(self): | ||
return None if not self._bounce else self._bounce / 1000000 | ||
|
||
def _set_bounce(self, value): | ||
if value is None: | ||
value = 0 | ||
elif value < 0: | ||
raise PinInvalidBounce('bounce must be 0 or greater') | ||
value = int(value * 1000000) | ||
lgpio.gpio_set_debounce_micros(self.factory._handle, self.number, value) | ||
self._bounce = value | ||
|
||
def _get_edges(self): | ||
return self.GPIO_EDGES_NAMES[self._edges] | ||
|
||
def _set_edges(self, value): | ||
f = self.when_changed | ||
self.when_changed = None | ||
try: | ||
self._edges = self.GPIO_EDGES[value] | ||
finally: | ||
self.when_changed = f | ||
|
||
def _call_when_changed(self, chip, gpio, level, ticks): | ||
super(LGPIOPin, self)._call_when_changed(ticks / 1000000000, level) | ||
|
||
def _enable_event_detect(self): | ||
lgpio.gpio_claim_alert( | ||
self.factory._handle, self.number, self._edges, | ||
lgpio.gpio_get_mode(self.factory._handle, self.number) & | ||
self.GPIO_LINE_FLAGS_MASK) | ||
self._callback = lgpio.callback( | ||
self.factory._handle, self.number, self._edges, | ||
self._call_when_changed) | ||
|
||
def _disable_event_detect(self): | ||
if self._callback is not None: | ||
self._callback.cancel() | ||
self._callback = None | ||
lgpio.gpio_claim_input( | ||
self.factory._handle, self.number, | ||
lgpio.gpio_get_mode(self.factory._handle, self.number) & | ||
self.GPIO_LINE_FLAGS_MASK) | ||
|
||
|
||
class LGPIOHardwareSPI(SPI): | ||
""" | ||
Hardware SPI implementation for the `lgpio`_ library. Uses the ``spi_*`` | ||
functions from the lgpio API. | ||
.. _lgpio: http://abyz.me.uk/lg/py_lgpio.html | ||
""" | ||
def __init__(self, clock_pin, mosi_pin, miso_pin, select_pin, pin_factory): | ||
port, device = spi_port_device( | ||
clock_pin, mosi_pin, miso_pin, select_pin) | ||
self._port = port | ||
self._device = device | ||
self._baud = 500000 | ||
self._spi_flags = 0 | ||
self._handle = None | ||
super(LGPIOHardwareSPI, self).__init__(pin_factory=pin_factory) | ||
to_reserve = {clock_pin, select_pin} | ||
if mosi_pin is not None: | ||
to_reserve.add(mosi_pin) | ||
if miso_pin is not None: | ||
to_reserve.add(miso_pin) | ||
self.pin_factory.reserve_pins(self, *to_reserve) | ||
self._handle = lgpio.spi_open(port, device, self._baud, self._spi_flags) | ||
|
||
def _conflicts_with(self, other): | ||
return not ( | ||
isinstance(other, LGPIOHardwareSPI) and | ||
(self._port, self._device) != (other._port, other._device) | ||
) | ||
|
||
def close(self): | ||
if not self.closed: | ||
lgpio.spi_close(self._handle) | ||
self._handle = None | ||
self.pin_factory.release_all(self) | ||
super(LGPIOHardwareSPI, self).close() | ||
|
||
@property | ||
def closed(self): | ||
return self._handle is None | ||
|
||
def __repr__(self): | ||
try: | ||
self._check_open() | ||
return 'SPI(port=%d, device=%d)' % (self._port, self._device) | ||
except DeviceClosed: | ||
return 'SPI(closed)' | ||
|
||
def _get_clock_mode(self): | ||
return self._spi_flags | ||
|
||
def _set_clock_mode(self, value): | ||
self._check_open() | ||
if not 0 <= value < 4: | ||
raise SPIInvalidClockMode("%d is not a valid SPI clock mode" % value) | ||
lgpio.spi_close(self._handle) | ||
self._spi_flags = value | ||
self._handle = lgpio.spi_open( | ||
self._port, self._device, self._baud, self._spi_flags) | ||
|
||
def _get_rate(self): | ||
return self._baud | ||
|
||
def _set_rate(self, value): | ||
self._check_open() | ||
value = int(value) | ||
lgpio.spi_close(self._handle) | ||
self._baud = value | ||
self._handle = lgpio.spi_open( | ||
self._port, self._device, self._baud, self._spi_flags) | ||
|
||
def read(self, n): | ||
self._check_open() | ||
count, data = lgpio.spi_read(self._handle, n) | ||
if count < 0: | ||
raise IOError('SPI transfer error %d' % count) | ||
return [int(b) for b in data] | ||
|
||
def write(self, data): | ||
self._check_open() | ||
count = lgpio.spi_write(self._handle, data) | ||
if count < 0: | ||
raise IOError('SPI transfer error %d' % count) | ||
return len(data) | ||
|
||
def transfer(self, data): | ||
self._check_open() | ||
count, data = lgpio.spi_xfer(self._handle, data) | ||
if count < 0: | ||
raise IOError('SPI transfer error %d' % count) | ||
return [int(b) for b in data] | ||
|
||
|
||
class LGPIOHardwareSPIShared(SharedMixin, LGPIOHardwareSPI): | ||
@classmethod | ||
def _shared_key(cls, clock_pin, mosi_pin, miso_pin, select_pin, pin_factory): | ||
return (clock_pin, select_pin) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.