Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple MCP2221 I2C buses (improved) #637

Closed
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f86d610
draft: Add i2c bus_id support
fgervais Jul 24, 2021
b428f71
Cleanup
fgervais Jul 24, 2021
32852ce
Add examples
fgervais Jul 24, 2021
877aae7
Manage MCP instances so different modules can share them
fgervais Jul 25, 2021
24a73e2
Remove prints
fgervais Jul 25, 2021
ce4032f
Remove examples/mcps_busio_i2c.py
fgervais Jul 25, 2021
ca9d6ec
Change single example to include multiple mcp2221s
fgervais Jul 25, 2021
a7b6441
Add get_instance() docstring
fgervais Jul 28, 2021
1224623
Change if/else syntax for pylint no-else-return
fgervais Jul 28, 2021
301fa1f
Allow access to protected member
fgervais Jul 28, 2021
5104e5d
Allow pylint unexpected-keyword-arg
fgervais Jul 28, 2021
9447b81
Reorder third-party import in mcp2221 multiple example
fgervais Jul 28, 2021
6e98be4
Merge commit 'd04bc037c3498c442671d9578b3fecbb3fc4e950' into multiple…
ezio-melotti Nov 30, 2022
1fe958f
Merge pull request #1 from ezio-melotti/multiple-mcp2221-updated
fgervais Dec 2, 2022
b2e6c67
Add license to examples
fgervais Dec 3, 2022
db91cce
Add copyright to examples
fgervais Dec 3, 2022
6b89397
List mcp2221 examples in examples.rst
fgervais Dec 3, 2022
d739532
Improve multiple MCP2221 support.
ezio-melotti Dec 5, 2022
bb0ea63
Improve examples/mcp2221_multiple_busio_i2c.py.
ezio-melotti Dec 5, 2022
3477c4f
Make `black` happy.
ezio-melotti Dec 8, 2022
eafe43f
Restore compatibility with Python 3.7.
ezio-melotti Dec 8, 2022
a584d04
Make `pylint` happy.
ezio-melotti Dec 8, 2022
505be77
Make `pylint` happier.
ezio-melotti Dec 8, 2022
c8d9318
Restore import in pin.py.
ezio-melotti Dec 13, 2022
4ea3997
Refactor the MCP2221 implementation.
ezio-melotti Dec 16, 2022
86ac218
Silence Pylint warning.
ezio-melotti Dec 26, 2022
041462e
Remove debug prints.
ezio-melotti Jan 2, 2023
28ae3d3
Make pylint happy.
ezio-melotti Jan 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ See the `CircuitPython docs <https://circuitpython.readthedocs.io/>`_ for extens
:caption: examples/bbb_digitalio.py
:linenos:

.. literalinclude:: ../examples/mcps_busio_i2c.py
:caption: examples/mcps_busio_i2c.py
.. literalinclude:: ../examples/mcp2221_first_busio_i2c.py
:caption: examples/mcp2221_first_busio_i2c.py
:linenos:

.. literalinclude:: ../examples/mcp2221_multiple_busio_i2c.py
:caption: examples/mcp2221_multiple_busio_i2c.py
:linenos:

.. literalinclude:: ../examples/mcp2221_single_busio_i2c.py
:caption: examples/mcp2221_single_busio_i2c.py
:linenos:

.. literalinclude:: ../examples/pb_digitalio.py
Expand Down
13 changes: 13 additions & 0 deletions examples/mcp2221_first_busio_i2c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-FileCopyrightText: 2022, The Blinka Authors.
#
# SPDX-License-Identifier: MIT
import busio


# Usage:
# $ python mcp2221_first_busio_i2c.py
# I2C devices found: ['0x70']


i2c = busio.I2C()
print("I2C devices found: ", [hex(i) for i in i2c.scan()])
30 changes: 30 additions & 0 deletions examples/mcp2221_multiple_busio_i2c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2022, The Blinka Authors.
#
# SPDX-License-Identifier: MIT

import busio

from adafruit_blinka.microcontroller.mcp2221.mcp2221 import MCP2221

# Usage:
# $ python mcp2221_multiple_busio_i2c.py
# 2 MCP2221 found: [b'2-3.3:1.2', b'2-3.2:1.2']
# I2C devices found on 2-3.3:1.2: ['0x77']
# I2C devices found on 2-3.2:1.2: ['0x58', '0x61']

# # MCP2221.available_paths() is roughly equivalent to
# import hid
# MCP2221_VID = 0x04D8
# MCP2221_PID = 0x00DD
# addresses = [mcp["path"] for mcp in hid.enumerate(MCP2221_VID, MCP2221_PID)]

addresses = MCP2221.available_paths()
print(f'{len(addresses)} MCP2221(s) found: {addresses}')
ezio-melotti marked this conversation as resolved.
Show resolved Hide resolved

i2c_busses = []
for address in addresses:
i2c_busses.append(busio.I2C(bus_id=address))

for address, i2c in zip(addresses, i2c_busses):
print(f'I2C devices found on {address.decode()}:',
[hex(i) for i in i2c.scan()])
29 changes: 29 additions & 0 deletions examples/mcp2221_single_busio_i2c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2022, The Blinka Authors.
#
# SPDX-License-Identifier: MIT
import argparse
import busio


# Usage:
# $ lsusb | grep Microchip
# Bus 001 Device 082: ID 04d8:00dd Microchip Technology, Inc.
# Bus 001 Device 083: ID 04d8:00dd Microchip Technology, Inc.
# $ python mcp2221_single_busio_i2c.py -b 1 -d 83
# I2C devices found: ['0x70']


# Bus and Device number can be gathered from `lsusb`
parser = argparse.ArgumentParser()
parser.add_argument(
"-b", "--bus", type=int, required=True, help="USB bus number (base10)"
)
parser.add_argument(
"-d", "--device", type=int, required=True, help="USB device number (base10)"
)
args = parser.parse_args()

# Need to be bytes, always function 2
device_path = "{:04x}:{:04x}:02".format(args.bus, args.device).encode()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script doesn't work for me because of this line. The paths to my devices look like 2-3.3:1.2, whereas this will produce something like 0002:0003:02. Perhaps the example could be updated to print a list of available addresses, and prompt the user to select one.

i2c = busio.I2C(bus_id=device_path)
print("I2C devices found: ", [hex(i) for i in i2c.scan()])
60 changes: 0 additions & 60 deletions examples/mcps_busio_i2c.py

This file was deleted.

6 changes: 3 additions & 3 deletions src/adafruit_blinka/microcontroller/mcp2221/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
#
# SPDX-License-Identifier: MIT
"""I2C Class for MCP2221"""
from .mcp2221 import mcp2221
from .mcp2221 import MCP2221


class I2C:
"""Custom I2C Class for MCP2221"""

def __init__(self, *, frequency=100000):
self._mcp2221 = mcp2221
def __init__(self, *, frequency=100000, bus_id=None):
self._mcp2221 = MCP2221(bus_id)
self._mcp2221._i2c_configure(frequency)

def scan(self):
Expand Down
94 changes: 86 additions & 8 deletions src/adafruit_blinka/microcontroller/mcp2221/mcp2221.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,39 @@ class MCP2221:
GP_ALT1 = 0b011
GP_ALT2 = 0b100

def __init__(self):
self._hid = hid.device()
self._hid.open(MCP2221.VID, MCP2221.PID)
# attempting to reopen the same hid device results in an error,
# so we cache and return already opened devices
instances = {}

def __new__(cls, bus_id=None):
"""Return a (possibly cached) MCP2221 instance."""
print(f'MCP2221.__new__; {bus_id=}; instances={list(cls.instances)}')
# check for an existing cached instance
if bus_id in cls.instances:
# return the corresponding cached instance
print(f'Returning cached {bus_id}...')
return cls.instances[bus_id]
if bus_id is None and cls.instances:
# return the instance for the first bus_id, if already cached
first_bus_id = cls.available_paths(require_mcps=True)[0]
if first_bus_id in cls.instances:
print(f'Returning cached {first_bus_id}...')
return cls.instances[first_bus_id]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I tried to emulate the original behavior: if the user calls MCP2221() without the bus, it will check if the first device is already opened/cached and return that, if not it will try to open a different one below.

Other possible behaviors are:

  • return one of the cached MCP (if any), even if it's not the first
  • if no bus is specified, only try to open the first device

Note that simply importing the module will automatically create an instance of the first device, so any subsequent MCP2221 call (without the bus) will return that instance. This might not happen if another process already opened the first device -- in that case the class will try to open a different one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now changed this so that:

  • MCP2221() (with no bus_id specified) opens and returns the first available MCP (and raises an error if there are no MCP2221 available).
  • MCP2221(bus_id) opens and returns the corresponding MCP2221, or return the cached instance if it's already open.


# create a new instance
if bus_id is not None:
# use the given bus_id
self = super().__new__(cls)
self._hid = hid.device()
print(f'Opening {bus_id}.')
self._hid.open_path(bus_id)
self._bus_id = bus_id
# cache the new instance
cls.instances[bus_id] = self
else:
# find the first available MCP
self = cls.new_instance()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no bus is specified and the first device is not cached (e.g. because some other process already opened it), then it will try to find and open another one. This behavior is non-deterministic, since it depends on the order the devices have been connected, and on whether other processes have opened the device.

If we want MCP2221() (without the bus) to only ever access the first device, the call to new_instance can be removed and the code changed to just try to open the first device (and possibly failing and raising an error without falling back on the next available device).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now removed the new_instance method and updated the code so that MCP2221 will try to find and open the first available MCP.


# make sure the device gets closed before exit
atexit.register(self.close)
if MCP2221_RESET_DELAY >= 0:
Expand All @@ -64,6 +94,46 @@ def __init__(self):
for pin in range(4):
self.gp_set_mode(pin, self.GP_GPIO) # set to GPIO mode
self.gpio_set_direction(pin, 1) # set to INPUT
return self

@staticmethod
def available_paths(*, require_mcps=False):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this method I considered a few different options:

  • a read-only @property, but in order to work in __new__ it must work without instance (and @property requires the instance);
  • a stand-alone function, but it would need to be imported separately, and having it as a method is handy.

Regarding the name:

  • I used paths because that's the name used by hid.enumerate. Perhaps addresses or devices might be better.
  • I used required_mcps, but required_mcp2221s might be better even if longer. Other options (like raise_if_no_mcp2221s) feel too verbose.

"""Return a list of paths of the currently available MCP2221s.

Raises a RuntimeError if *require_mcps* is True and there
are no MCP2221 connected.
"""
mcps = hid.enumerate(MCP2221.VID, MCP2221.PID)
if require_mcps and not mcps:
raise RuntimeError('No MCP2221 detected.')
return [mcp["path"] for mcp in mcps]

@classmethod
def new_instance(cls):
ezio-melotti marked this conversation as resolved.
Show resolved Hide resolved
"""Find a new MCP2221, create an instance, and return it."""
self = super().__new__(cls)
self._hid = hid.device()
bus_ids = cls.available_paths(require_mcps=True)
print(f'{bus_ids=}')
# loop through all the available MCPs
for bus_id in bus_ids:
print(f'Opening {bus_id}...', end=' ')
try:
self._hid.open_path(bus_id)
self._bus_id = bus_id
# cache the new instance
cls.instances[bus_id] = self
print('[OK]')
return self
except (OSError, RuntimeError) as e:
print(f'[ERR]: {e}')
if len(bus_ids) == 1:
raise # we failed to open the only MCP
continue # try opening the next one
else:
print('Failed to open all hid devices')
raise RuntimeError(f'Can not open any of the {len(bus_ids)} '
f'connected MCP2221 devices.')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To maintain backward compatibility, if there's only one MCP, the error is re-raised as is. When there are multiple MCPs, a generic error is returned if none of them can be opened. This might end up hiding some information on the cause of the errors though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is now integrated in __new__. The same comment applies.


def close(self):
"""Close the hid device. Does nothing if the device is not open."""
Expand Down Expand Up @@ -132,18 +202,26 @@ def _sram_dump(self):
self._pretty_report(b"\x61")

def _reset(self):
dev_list_before_reset = MCP2221.available_paths()
dev_list_before_reset.remove(self._bus_id)
self._hid_xfer(b"\x70\xAB\xCD\xEF", response=False)
self._hid.close()
time.sleep(MCP2221_RESET_DELAY)

start = time.monotonic()
while time.monotonic() - start < 5:
try:
self._hid.open(MCP2221.VID, MCP2221.PID)
except OSError:
# try again
dev_list_after_reset = MCP2221.available_paths()
# The new device is the one present after reset but not before
device_new_path = set(dev_list_before_reset) ^ set(dev_list_after_reset)
if not device_new_path:
time.sleep(0.1)
continue
# The set should have only a single value, get it out of the set
device_new_path = next(iter(device_new_path))
self._hid.open_path(device_new_path)
self._bus_id = device_new_path
return

raise OSError("open failed")

# ----------------------------------------------------------------
Expand Down Expand Up @@ -396,7 +474,7 @@ def dac_write(self, pin, value):
report[4] = 1 << 7 | (value & 0b11111)
self._hid_xfer(report)

# pylint: enable=unused-argument
# pylint: enable=unused-argument
ezio-melotti marked this conversation as resolved.
Show resolved Hide resolved


mcp2221 = MCP2221()
ezio-melotti marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion src/adafruit_blinka/microcontroller/mcp2221/pin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
#
# SPDX-License-Identifier: MIT
"""MCP2221 pin names"""
from .mcp2221 import mcp2221
from .mcp2221 import MCP2221


mcp2221 = MCP2221()
ezio-melotti marked this conversation as resolved.
Show resolved Hide resolved


class Pin:
Expand Down
10 changes: 6 additions & 4 deletions src/busio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ class I2C(Lockable):
for both MicroPython and Linux.
"""

def __init__(self, scl, sda, frequency=100000):
self.init(scl, sda, frequency)
def __init__(self, scl=None, sda=None, frequency=100000, *, bus_id=None):
self.init(scl, sda, frequency, bus_id=bus_id)

def init(self, scl, sda, frequency):
def init(self, scl, sda, frequency, *, bus_id):
"""Initialization"""
self.deinit()
if detector.board.ftdi_ft232h:
Expand All @@ -50,7 +50,9 @@ def init(self, scl, sda, frequency):
if detector.board.microchip_mcp2221:
from adafruit_blinka.microcontroller.mcp2221.i2c import I2C as _I2C

self._i2c = _I2C(frequency=frequency)
# pylint: disable=unexpected-keyword-arg
self._i2c = _I2C(frequency=frequency, bus_id=bus_id)
# pylint: enable=unexpected-keyword-arg
return
if detector.board.greatfet_one:
from adafruit_blinka.microcontroller.nxp_lpc4330.i2c import I2C as _I2C
Expand Down