In [None]:
# default_exp core

# CircuitPython_pico_pi_controller

> The `pico_pi_controller` module contains CircuitPython classes to manage multiple Raspberry Pi SBCs from a Raspberry Pi Pico, using i2c.

## About

The classes in this module inherit from Adafruit's `busio` & `adafruit_bus_device` I2C classes for CircuitPython. `busio` is distributed with [CircuitPython](https://circuitpython.org), and `adafruit_bus_device` is distributed with the [CircuitPython Library Bundle](https://circuitpython.org/libraries).

Broadcom SoC hardware I2C is utilized both for the controlling MCU (the Pico) and the controlled SBCs (Raspberry Pi 4/3+/Zero). I2C baudrate is limited from the controller (the Pico) to reduce transmit/receive errors.

Controller programs may be written with hard-coded I2C peripheral addresses of connected SBCs (called 'devices' by these classes). Or, autodiscovery may be used so that devices may be added without changing software on the controller.

For example:

<code>
from pico_pi_controller import *
    
mycontroller = PPController()

mycontroller.addDevice('0x13')
mycontroller.listdevices()
</code>

This example will initialize I2C on the controller using the board defaults (I2C bus 1), retrieve host information & statistics from the SBC at I2C address 0x13, then return a list of device information.

## Module

In [None]:
#hide
try:
    from nbdev.showdoc import *
except ModuleNotFoundError:
    pass
    """CircuitPython kernel has no nbdev"""

### Imports

In [None]:
#export
from sys import byteorder, modules
import board
from adafruit_bus_device import i2c_device
try:
    from rtc import RTC
except ModuleNotFoundError:
    pass
try:
    from adafruit_datetime import datetime
except ModuleNotFoundError:
    from datetime import datetime
try:
    from adafruit_itertools.adafruit_itertools import chain
except ModuleNotFoundError:
    from itertools import chain 
    
try:
    import adafruit_logging as logging
    logger = logging.getLogger('PPC')
    logger.setLevel(logging.DEBUG)
except ModuleNotFoundError:
    import logging
    logger = logging.getLogger()
    logging.basicConfig(level = logging.DEBUG)

### Classes

In [None]:
# export
IDENTITY = bytearray(([ord(c) for c in list('ppcd')]))
"""identifier string used by RPi devices"""

BUFCLR = 16
"""number of bytes to clear from the sender's TX buffer"""

CLR = bytearray(); CLR.append(ord('C')) # request clear transmit FIFO
IDF = bytearray(); IDF.append(ord('I')) # request to send [str] identification
HOS = bytearray(); HOS.append(ord('H')) # request to send [str] hostname
TIM = bytearray(); TIM.append(ord('T')) # request to send [int] datetime
BOS = bytearray(); BOS.append(ord('B')) # request to send [bool] bosmang status
LOD = bytearray(); LOD.append(ord('L')) # request to send [int] load
TZO = bytearray(); TZO.append(ord('Z')) # request to send [int] timezone (sec offset from UTC)
PEN = bytearray(); PEN.append(ord('P')) # request to send [int] MCU pin connected to RPi PEN

class UNDevice():
    """Represents an I2C peripheral device unidentified to a `PPController`"""
    def __init__(self, controller, device_address, *argv, **kwargs):
        self.controller  = controller
        self.device_address = device_address
        self.i2cdevice      = None
        """The I2CDevice created by a PPController."""
        
        self.retries     = 0
        self.retries_max = 4
        """retry count before I2CDevice is considered 'other', i.e. not a PPC device."""

class PPDevice():
    """Represents an I2C peripheral device identified as a `PPDevice`
    and stores data from those hosts."""
    def __init__(self, controller, device_address, *argv, **kwargs):
        self.controller     = controller
        self.device_address = device_address
        self.i2cdevice      = None
        """The I2CDevice created by a PPController."""
        
        self.retries     = 0
        self.retries_max = 4
        """retry count before I2CDevice is considered 'other', i.e. not a PPC device."""
        self.lastonline  = None
        """controller timestamp updated with each successful receive. reports & bosmang
           can decide what to do with this info."""
        
        """All data below are received *from* the PPC device:"""
        
        self.bosmang    = None
        """Declaration that device will send datetime & control instructions to controller.
           Only one bosmang per controller please, unless you wanya chaos."""
        self.uart_rx    = None
        """MCU gpio rx for passthru from bosmang console tx"""
        self.uart_tx    = None
        """MCU gpio tx for passthru from bosmang console rx"""
        self.pen        = None
        """MCU gpio connected to RPi pen pin"""
        
        self.hostname   = None
        self.datetime   = None
        """datetime object converted from timestamp, used to send datetime as bosmang & 
           to check for datetime skew on other devices."""
        self.utcoffset  = None
        self.loadavg    = None
        
        self.id_str = type(self).__name__[0:3]+": "+str(hex(self.device_address))

    def log_txn(self, fname, message, msg=None):
        """Wrapper for logger."""
        logger.info('%-8s %-28s %-13s %s' % (self.id_str, message, fname, self.controller.i2c_str))
    
    def get_hostname(self):
        """Ask PPD for its hostname"""
        fname='get_hostname'
        #self.log_txn(fname,"querying device")
        with self.i2cdevice as i2cdevice:
            msg = bytearray(BUFCLR)
            try:
                i2cdevice.write_then_readinto(CLR,msg)
                """Clear the i2c peripheral's transmit FIFO"""
            except OSError:
                pass
            msg = bytearray(1)
            try:
                i2cdevice.write_then_readinto(HOS,msg)
                """Get the length in bytes of the hostname"""
                msg = bytearray(int.from_bytes(msg, byteorder))  
                i2cdevice.readinto(msg)
                self.lastonline=datetime.now()
                self.log_txn(fname,"recvd hostname",msg.decode())
            except OSError:
                pass
        return msg.decode()
 
    def get_datetime(self): 
        """Ask PPD for its datetime"""
        fname='get_datetime'
        #self.log_txn(fname,"querying device")
        with self.i2cdevice as i2cdevice:
            msg = bytearray(BUFCLR)
            try:
                i2cdevice.write_then_readinto(CLR,msg)
                """Clear the i2c peripheral's transmit FIFO"""
            except OSError:
                pass
            msg = bytearray(4)
            try:
                i2cdevice.write_then_readinto(TIM,msg)
                self.lastonline=datetime.now()
                self.log_txn(fname,"recvd datetime",int.from_bytes(bytes(msg),byteorder))
                return datetime.fromtimestamp((int.from_bytes(bytes(msg),byteorder)))
            except OSError: 
                pass
        return None
 
    def get_bosmang(self): 
        """Ask PPD for its bosmang status (bool)"""
        fname='get_bosmang'
        #self.log_txn(fname,"querying device")
        with self.i2cdevice as i2cdevice:
            msg = bytearray(BUFCLR)
            try:
                i2cdevice.write_then_readinto(CLR,msg)
                """Clear the i2c peripheral's transmit FIFO"""
            except OSError:
                pass
            msg = bytearray(1)
            try:
                i2cdevice.write_then_readinto(BOS,msg)
                self.lastonline=datetime.now()
                self.controller.bosmang = self.device_address
                self.log_txn(fname,"recvd bosmang status:",bool(msg.decode()))
                return bool(int.from_bytes(bytes(msg),byteorder))
            except OSError: 
                pass
        return None
 
    def get_timezone(self): 
        """Ask PPD for its timezone (in seconds offset from utc)"""
        fname='get_timezone'
        #self.log_txn(fname,"querying device")
        with self.i2cdevice as i2cdevice:
            msg = bytearray(BUFCLR)
            try:
                i2cdevice.write_then_readinto(CLR,msg)
                """Clear the i2c peripheral's transmit FIFO"""
            except OSError:
                pass
            msg = bytearray(3)
            try:
                i2cdevice.write_then_readinto(TZO,msg)
                self.lastonline=datetime.now()
                self.log_txn(fname,"recvd utcoffset",int.from_bytes(bytes(msg),byteorder))
                return int.from_bytes(bytes(msg),byteorder)
            except OSError: 
                pass
        return None
 
    def get_loadavg(self): 
        """Ask PPD for its load average"""
        fname='get_loadavg'
        #self.log_txn(fname,"querying device")
        with self.i2cdevice as i2cdevice:
            msg = bytearray(BUFCLR)
            try:
                i2cdevice.write_then_readinto(CLR,msg)
                """Clear the i2c peripheral's transmit FIFO"""
            except OSError:
                pass
            msg = bytearray(4)
            try:
                i2cdevice.write_then_readinto(LOD,msg)
                self.lastonline=datetime.now()
                self.log_txn(fname,"recvd loadavg:",msg.decode())
                return msg.decode()
            except OSError: 
                pass
        return None
 
    def get_uart_rx(self): 
        """Ask PPD for the MCU board pin where its UART RX is connected"""
        return self.uart_rx
 
    def get_uart_tx(self): 
        """Ask PPD for the MCU board pin where its UART TX is connected"""
        return self.uart_tx
 
    def get_pen(self): 
        """Ask PPD for the MCU board pin where its PEN is connected"""
        return self.pen

class PPController():
    """Represents one of the system's I2C busses and tracks which I2C
    peripherals are `PPDevice`s."""
    def __init__(self, **kwargs):
        self.i2c       = None
        self.scl       = kwargs.pop('scl', board.SDA)
        self.sda       = kwargs.pop('sda', board.SDA)
        self.frequency = kwargs.pop('frequency', 4800)
        self.timeout   = kwargs.pop('timeout', 10000)
        self.bosmang   = kwargs.pop('bosmang', None)
        """PPDevice device_address selected to recieve datetime & control instructions from,
           have UART connected for passthru, etc. Default is `None`."""
        self.datetime  = None
        """to receive datetime from bosmang & to check for datetime skew on other devices."""
        self.utcoffset = None
        self.clock     = RTC()
        
        self.PPDs      = []
        """PPDevice objects belonging to PPController object."""
        self.noident   = []
        """UNDevice objects belonging to PPController object."""
        self.othrdev   = []
        """UNDevice objects without I2CDevices (address record only) 
           recognized as 'other' peripherals"""
        
        self.i2c_str = str(self.scl).strip('board.')+"/"+str(self.sda).strip('board.')
        
    def log_txn(self, fname, message, hexaddr=None, msg=None):
        """Wrapper for logger."""
        id_str = type(self).__name__[0:3]+": "+str(hexaddr or '    ')+" "
        logger.info('%-8s %-28s %-13s %s' % (id_str, message, fname, self.i2c_str))
        
    def i2c_scan(self):
        """Scan the I2C bus and create I2CDevice objects for each peripheral."""
        fname='i2c_scan'
        while not self.i2c.try_lock():
            pass
        self.log_txn(fname,">>> SCANNING I2C bus <<<")
           
        for addr in self.i2c.scan():
            if not any(d.device_address == addr for d in chain(self.PPDs,self.noident,self.othrdev)):  
                self.noident.append(UNDevice(controller=self,device_address=addr))
                self.noident[-1].i2cdevice=i2c_device.I2CDevice(self.i2c,device_address=addr,probe=False)
                self.log_txn(fname,"added I2C peripheral",hex(addr))
        self.i2c.unlock()
        return True

    def identify_ppds(self):
        """Identify PPDs from among all unidentified I2CDevices"""
        fname='identify_ppds'
        i = 0
        while i < len(self.noident):
            addr = self.noident[i].device_address
            self.log_txn(fname,"querying I2C peripheral",hex(addr))
            with self.noident[i].i2cdevice as unident:
                msg = bytearray(BUFCLR)
                try:
                    unident.write_then_readinto(CLR,msg)
                    """Clear the i2c peripheral's TX FIFO"""
                except OSError:
                    pass
                msg = bytearray(len(IDENTITY))
                try:
                    unident.write_then_readinto(IDF,msg)
                except OSError:
                    self.log_txn(fname,"WRITE FAILED",hex(addr))
                if msg == IDENTITY:
                    self.PPDs.append(PPDevice(controller=self,device_address=addr))
                    self.PPDs[-1].i2cdevice = unident
                    self.PPDs[-1].lastonline=datetime.now()
                    del self.noident[i]
                    self.log_txn(fname,">>>  added PPDevice <<<",hex(addr))
                else:
                    self.noident[i].retries += 1
                    self.log_txn(fname,"ID FAILED on try",hex(addr),self.noident[i].retries)
                    if self.noident[i].retries >= self.noident[i].retries_max:
                        self.log_txn(fname,"max retries; releasing",hex(addr))
                        self.othrdev.append(self.noident.pop(i))
                        del self.othrdev[-1].i2cdevice
                    else:
                        i += 1
                
    def add_ppds(self):
        """Wrapper for `i2c_scan` + `identify_ppds`."""
        fname='add_ppds'
        self.log_txn(fname,'    function called')
        self.i2c_scan()
        if self.noident:
            self.log_txn(fname,"found new peripherals:",'',len(self.noident))
            self.identify_ppds()
                
    def query_ppds(self):
        """Ask all PPDs for all of their metadata & stats."""
        fname='query_ppds'
        self.log_txn(fname,'    function called')
        for ppd in self.PPDs:
            ppd.datetime  = ppd.get_datetime() 
            ppd.utcoffset = ppd.get_timezone() 
            ppd.bosmang   = ppd.get_bosmang() 
            if ppd.bosmang:
                self.set_datetime(ppd.datetime.timetuple())
            ppd.hostname  = ppd.get_hostname() 
            ppd.loadavg   = ppd.get_loadavg()
            ppd.UART_RX   = ppd.get_uart_rx()
            ppd.UART_TX   = ppd.get_uart_tx()
            ppd.PEN       = ppd.get_pen()
            
    def set_datetime(self,timetuple):
        """Set the MCU's realtime clock."""
        fname='set_datetime'
        self.clock.datetime = timetuple
        self.log_txn(fname,str(datetime.now()))
    
    def get_ppd(self, device_address=None, hostname=None):
        """"""
        if device_address:
            dlist = list(filter(lambda d: d.device_address == device_address, self.PPDs))
            if dlist:
                return dlist[0]
        if hostname:
            dlist = list(filter(lambda d: d.hostname == hostname, self.PPDs))
            if dlist:
                return dlist[0]
        return None

## nbdev & circuitpython

In [None]:
#hide
try:
    from IPython.display import display, Javascript
    display(Javascript('IPython.notebook.save_checkpoint();'))
    from time import sleep
    sleep(0.2)
    from nbdev.export import notebook2script
    notebook2script()
except ModuleNotFoundError:
    pass
    """CircuitPython kernel has no nbdev"""

!!echo -e "\x02\x04" | tee /dev/ttyACM0

%cp -v CircuitPython_pico_pi_controller/core.py /CIRCUITPY/lib/CircuitPython_pico_pi_controller.py