Skip to content

Commit

Permalink
Merge pull request #929 from gpiozero/nativepin_pi4
Browse files Browse the repository at this point in the history
Fix NativePin for Pi 4
  • Loading branch information
bennuttall committed Mar 5, 2021
2 parents be7f898 + 67b7d4e commit 7403b1b
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 51 deletions.
4 changes: 4 additions & 0 deletions docs/api_pins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ Native

.. autoclass:: gpiozero.pins.native.NativePin

.. autoclass:: gpiozero.pins.native.Native2835Pin

.. autoclass:: gpiozero.pins.native.Native2711Pin


Mock
====
Expand Down
271 changes: 221 additions & 50 deletions gpiozero/pins/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
print_function,
division,
)
try:
range = xrange
except NameError:
pass
nstr = str
str = type('')

import io
import os
import sys
import mmap
import errno
import struct
Expand All @@ -62,13 +67,110 @@
)


class GPIOMemory(object):
def dt_resolve_alias(alias, root='/proc/device-tree'):
"""
Returns the full path of a device-tree alias. For example:
>>> dt_resolve_alias('gpio')
'/proc/device-tree/soc/gpio@7e200000'
>>> dt_resolve_alias('ethernet0', root='/proc/device-tree')
'/proc/device-tree/scb/ethernet@7d580000'
"""
# XXX Change this return a pathlib.Path when we drop 2.x
filename = os.path.join(root, 'aliases', alias)
with io.open(filename, 'rb') as f:
node, tail = f.read().split(b'\0', 1)
fs_encoding = sys.getfilesystemencoding()
return os.path.join(root, node.decode(fs_encoding).lstrip('/'))

def dt_peripheral_reg(node, root='/proc/device-tree'):
"""
Returns the :class:`range` covering the registers of the specified *node*
of the device-tree, mapped to the CPU's address space. For example:
>>> reg = dt_peripheral_reg(dt_resolve_alias('gpio'))
>>> '%#x..%#x' % (reg.start, reg.stop)
'0xfe200000..0xfe2000b4'
>>> hex(dt_peripheral_reg(dt_resolve_alias('ethernet0')).start)
'0xfd580000'
"""
# Returns a tuple of (address-cells, size-cells) for *node*
def _cells(node):
with io.open(os.path.join(node, '#address-cells'), 'rb') as f:
address_cells = struct.unpack(nstr('>L'), f.read())[0]
with io.open(os.path.join(node, '#size-cells'), 'rb') as f:
size_cells = struct.unpack(nstr('>L'), f.read())[0]
return (address_cells, size_cells)

# Returns a generator function which, given a file-like object *source*
# iteratively decodes it, yielding a tuple of values from it. Each tuple
# contains one integer for each specified *length*, which is the number of
# 32-bit device-tree cells that make up that value.
def _reader(*lengths):
structs = [struct.Struct(nstr('>{cells}L'.format(cells=cells)))
for cells in lengths]
offsets = [sum(s.size for s in structs[:i])
for i in range(len(structs))]
buf_len = sum(s.size for s in structs)

def fn(source):
while True:
buf = source.read(buf_len)
if not buf:
break
elif len(buf) < buf_len:
raise IOError('failed to read {buf_len} bytes'.format(
buf_len=buf_len))
row = ()
for offset, s in zip(offsets, structs):
cells = s.unpack_from(buf, offset)
value = 0
for cell in cells:
value = (value << 32) | cell
row += (value,)
yield row
return fn

# Returns a list of (child-range, parent-range) tuples for *node*
def _ranges(node):
child_cells, size_cells = _cells(node)
parent = os.path.dirname(node)
parent_cells, _ = _cells(parent)
ranges_reader = _reader(child_cells, parent_cells, size_cells)
with io.open(os.path.join(node, 'ranges'), 'rb') as f:
return [
(range(child_base, child_base + size),
range(parent_base, parent_base + size))
for child_base, parent_base, size in ranges_reader(f)
]

# XXX Replace all this gubbins with pathlib.Path stuff once we drop 2.x
node = os.path.join(root, node)
parent = os.path.dirname(node)
child_cells, size_cells = _cells(parent)
reg_reader = _reader(child_cells, size_cells)
with io.open(os.path.join(node, 'reg'), 'rb') as f:
base, size = list(reg_reader(f))[0]
while parent != root:
# Iterate up the hierarchy, resolving the base address as we go
if os.path.exists(os.path.join(parent, 'ranges')):
for child_range, parent_range in _ranges(parent):
if base in child_range:
# XXX Can't use .start here as python2's crappy xrange
# lacks it; change this when we drop 2.x!
base += parent_range[0] - child_range[0]
break
parent = os.path.dirname(parent)
return range(base, base + size)


class GPIOMemory(object):
GPIO_BASE_OFFSET = 0x200000
PERI_BASE_OFFSET = {
'BCM2835': 0x20000000,
'BCM2836': 0x3f000000,
'BCM2837': 0x3f000000,
'BCM2711': 0xfe000000,
}

# From BCM2835 data-sheet, p.91
Expand All @@ -85,6 +187,8 @@ class GPIOMemory(object):
GPAFEN_OFFSET = 0x88 >> 2
GPPUD_OFFSET = 0x94 >> 2
GPPUDCLK_OFFSET = 0x98 >> 2
# pull-control registers for BCM2711
GPPUPPDN_OFFSET = 0xe4 >> 2

def __init__(self, soc):
try:
Expand All @@ -97,37 +201,45 @@ def __init__(self, soc):
'unable to open /dev/gpiomem or /dev/mem; '
'upgrade your kernel or run as root')
else:
offset = self.peripheral_base(soc) + self.GPIO_BASE_OFFSET
offset = self.gpio_base(soc)
else:
offset = 0
self.mem = mmap.mmap(self.fd, 4096, offset=offset)
# Register reads and writes must be in native format (otherwise
# struct resorts to individual byte reads/writes and you can't hit
# half a register :). For arm64 compat we have to figure out what the
# native unsigned 32-bit type is...
try:
self.reg_fmt = {
struct.calcsize(fmt): fmt
for fmt in (nstr('@I'), nstr('@L'))
}[4]
except KeyError:
raise RuntimeError('unable to find native unsigned 32-bit type')

def close(self):
self.mem.close()
os.close(self.fd)

def peripheral_base(self, soc):
def gpio_base(self, soc):
try:
with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
f.seek(4)
# This is deliberately a big-endian read
return struct.unpack(nstr('>L'), f.read(4))[0]
# XXX Replace this with .start when 2.x is dropped
return dt_peripheral_reg(dt_resolve_alias('gpio'))[0]
except IOError:
try:
return self.PERI_BASE_OFFSET[soc]
return self.PERI_BASE_OFFSET[soc] + self.GPIO_BASE_OFFSET
except KeyError:
pass
raise IOError('unable to determine peripheral base')
raise IOError('unable to determine gpio base')

def __getitem__(self, index):
return struct.unpack_from(nstr('<L'), self.mem, index * 4)[0]
return struct.unpack_from(self.reg_fmt, self.mem, index * 4)[0]

def __setitem__(self, index, value):
struct.pack_into(nstr('<L'), self.mem, index * 4, value)
struct.pack_into(self.reg_fmt, self.mem, index * 4, value)


class GPIOFS(object):

GPIO_PATH = '/sys/class/gpio'

def __init__(self, factory, queue):
Expand Down Expand Up @@ -177,7 +289,7 @@ def export(self, pin):
os.O_RDONLY | os.O_NONBLOCK)
except IOError as e:
if e.errno == errno.ENOENT:
with io.open(self.path('export'), 'w+b') as f:
with io.open(self.path('export'), 'wb') as f:
f.write(str(pin).encode('ascii'))
elif e.errno == errno.EACCES:
sleep(i / 100)
Expand Down Expand Up @@ -210,7 +322,7 @@ def unexport(self, pin):
pass
else:
try:
with io.open(self.path('unexport'), 'w+b') as f:
with io.open(self.path('unexport'), 'wb') as f:
f.write(str(pin).encode('ascii'))
except IOError as e:
if e.errno == errno.EINVAL:
Expand Down Expand Up @@ -324,7 +436,10 @@ def __init__(self):
self.mem = GPIOMemory(self.pi_info.soc)
self.fs = GPIOFS(self, queue)
self.dispatch = NativeDispatchThread(self, queue)
self.pin_class = NativePin
if self.pi_info.soc == 'BCM2711':
self.pin_class = Native2711Pin
else:
self.pin_class = Native2835Pin

def close(self):
if self.dispatch is not None:
Expand Down Expand Up @@ -355,18 +470,21 @@ class NativePin(LocalPiPin):
'alt5': 0b010,
}

GPIO_PULL_UPS = {
'up': 0b10,
'down': 0b01,
'floating': 0b00,
'reserved': 0b11,
}

GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}

def __init__(self, factory, number):
super(NativePin, self).__init__(factory, number)
self._reg_init(factory, number)
self._last_call = None
self._when_changed = None
self._change_thread = None
self._change_event = Event()
self.function = 'input'
self.pull = 'up' if self.factory.pi_info.pulled_up(repr(self)) else 'floating'
self.bounce = None
self.edges = 'none'

def _reg_init(self, factory, number):
self._func_offset = self.factory.mem.GPFSEL_OFFSET + (number // 10)
self._func_shift = (number % 10) * 3
self._set_offset = self.factory.mem.GPSET_OFFSET + (number // 32)
Expand All @@ -375,23 +493,12 @@ def __init__(self, factory, number):
self._clear_shift = number % 32
self._level_offset = self.factory.mem.GPLEV_OFFSET + (number // 32)
self._level_shift = number % 32
self._pull_offset = self.factory.mem.GPPUDCLK_OFFSET + (number // 32)
self._pull_shift = number % 32
self._edge_offset = self.factory.mem.GPEDS_OFFSET + (number // 32)
self._edge_shift = number % 32
self._rising_offset = self.factory.mem.GPREN_OFFSET + (number // 32)
self._rising_shift = number % 32
self._falling_offset = self.factory.mem.GPFEN_OFFSET + (number // 32)
self._falling_shift = number % 32
self._last_call = None
self._when_changed = None
self._change_thread = None
self._change_event = Event()
self.function = 'input'
self._pull = 'floating'
self.pull = 'up' if self.factory.pi_info.pulled_up(repr(self)) else 'floating'
self.bounce = None
self.edges = 'none'

def close(self):
self.edges = 'none'
Expand Down Expand Up @@ -426,24 +533,10 @@ def _set_state(self, value):
self.factory.mem[self._clear_offset] = 1 << self._clear_shift

def _get_pull(self):
return self.GPIO_PULL_UP_NAMES[self._pull]
raise NotImplementedError

def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.factory.pi_info.pulled_up(repr(self)):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
value = self.GPIO_PULL_UPS[value]
except KeyError:
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self))
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = value
sleep(0.000000214)
self.factory.mem[self._pull_offset] = 1 << self._pull_shift
sleep(0.000000214)
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = 0
self.factory.mem[self._pull_offset] = 0
self._pull = value
raise NotImplementedError

def _get_bounce(self):
return self._bounce
Expand Down Expand Up @@ -481,3 +574,81 @@ def _enable_event_detect(self):

def _disable_event_detect(self):
self.factory.fs.unwatch(self.number)


class Native2835Pin(NativePin):
"""
Extends :class:`NativePin` for Pi hardware prior to the Pi 4 (Pi 0, 1, 2,
3, and 3+).
"""
GPIO_PULL_UPS = {
'up': 0b10,
'down': 0b01,
'floating': 0b00,
}

GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}

def _reg_init(self, factory, number):
super(Native2835Pin, self)._reg_init(factory, number)
self._pull_offset = self.factory.mem.GPPUDCLK_OFFSET + (number // 32)
self._pull_shift = number % 32
self._pull = 'floating'

def _get_pull(self):
return self.GPIO_PULL_UP_NAMES[self._pull]

def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.factory.pi_info.pulled_up(repr(self)):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
value = self.GPIO_PULL_UPS[value]
except KeyError:
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self))
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = value
sleep(0.000000214)
self.factory.mem[self._pull_offset] = 1 << self._pull_shift
sleep(0.000000214)
self.factory.mem[self.factory.mem.GPPUD_OFFSET] = 0
self.factory.mem[self._pull_offset] = 0
self._pull = value


class Native2711Pin(NativePin):
"""
Extends :class:`NativePin` for Pi 4 hardware (Pi 4, CM4, Pi 400 at the time
of writing).
"""
GPIO_PULL_UPS = {
'up': 0b01,
'down': 0b10,
'floating': 0b00,
}

GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}

def _reg_init(self, factory, number):
super(Native2711Pin, self)._reg_init(factory, number)
self._pull_offset = self.factory.mem.GPPUPPDN_OFFSET + (number // 16)
self._pull_shift = (number % 16) * 2

def _get_pull(self):
pull = (self.factory.mem[self._pull_offset] >> self._pull_shift) & 3
return self.GPIO_PULL_UP_NAMES[pull]

def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.factory.pi_info.pulled_up(repr(self)):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
value = self.GPIO_PULL_UPS[value]
except KeyError:
raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self))
self.factory.mem[self._pull_offset] = (
self.factory.mem[self._pull_offset]
& ~(3 << self._pull_shift)
| (value << self._pull_shift)
)

0 comments on commit 7403b1b

Please sign in to comment.