# MPR121 Data Recording Notebook

Author: Christopher Parker (parkecp@mail.uc.edu)

I've tried to make this as user-friendly as possible, but feel free to reach out with any questions.

# Imports, Board Configuration and Function Definitions

In [1]:
# Basic libraries
import os
os.environ['BLINKA_MPR121'] = '1'
os.environ['BLINKA_FT232H'] = '1'
import time
import datetime
from collections import deque

# For writing data to file
import h5py

# Libraries for FTDI and I2C
from pyftdi.i2c import I2cController
from pyftdi.usbtools import UsbTools

# Multiprocessing and Asynchronous Execution
from concurrent.futures import ThreadPoolExecutor
import asyncio

# Widgets
import ipywidgets as widgets
from IPython.display import display

## Device Configuration and Constant Declarations

Before we start recording, we need to configure the MPR121s.

Normally, this would be done automatically by the Adafruit Blinka package, but we cannot use that (because it only allows 1 FT232H at a time).
So instead, we have to use the pyftdi package (which is what Adafruit Blinka uses, also) and manually determine which registers need to be written/read.

Before we start recording, we need to configure the MPR121s.

Normally, this would be done automatically by the Adafruit Blinka package, but we cannot use that (because it only allows 1 FT232H at a time). So instead, we have to use the pyftdi package (which is what Adafruit Blinka uses, also) and manually determine which registers need to be written/read.

In [2]:
# Register Addresses
SOFT_RESET = 0x80
CONFIG = 0x5E
DATA = 0x04

# How many sensor samples we want to store before writing
HISTORY_SIZE = 1000

# 12 channels per MPR121, just defining a constant
# for clarity
NUM_CHANNELS = 12

# Filename for saving data
filename = f"raw_data_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.h5"

In [3]:
# Find all devices with the given vendor and product IDs
# These correspond to the FT232H
devices = UsbTools.find_all([(0x0403, 0x6014)])

NUM_BOARDS = len(devices)
NUM_SENSORS = NUM_BOARDS*6

In [4]:
# We will make lists of the I2C controller objects and port objects
# for each FT232H device
i2c_controllers = []
i2c_ports = []
for dev in devices:
    url = f"ftdi://ftdi:232h:{dev[0].bus}:{dev[0].address}/1"
    controller = I2cController()
    controller.configure(url)
    i2c_controllers.append(controller)
    port = controller.get_port(0x5A)
    # I really need to figure out a better way to determine the correct address,
    # because this looks silly
    try: # Try to read a byte to see if that's the right address
        port.read_from(0x04, 1)
    except: # If not, try again
        port = controller.get_port(0x5B)
    try:
        port.read_from(0x04, 1)
    except:
        port = controller.get_port(0x5C)
    try:
        port.read_from(0x04, 1)
    except:
        port = controller.get_port(0x5D)
    try:
        port.read_from(0x04,1)
    except:
        print('None of the MPR121 addresses is readable, is the FT232H in I2C mode?')
    i2c_ports.append(port)

In [5]:
# Loop over the FT232Hs, reset and configure each
for idx, port in enumerate(i2c_ports):
    # Write to Soft Reset Register (0x63 sends reset command)
    port.write_to(SOFT_RESET, b'\x63')
    # 0x5E is the configuration register, setting to 0x8F starts the MPR121
    # with the config used in the Adafruit library (if needed, I can figure
    # out alternative configurations)
    port.write_to(CONFIG, b'\x8F')
    
    # If we don't sleep here, it doesn't have time to start reading properly
    time.sleep(0.1)
    
    # Test that we have started the MPR121 (if it's not properly started,
    # reading from DATA will give an empty bytearray, so we check if that's
    # what we got back)
    cap = port.read_from(DATA, 24)
    if cap != bytearray(24):
        print(f'Board {idx} Configured')

Board 0 Configured


## Recording Function and Loop

In [6]:
def record(i2c_port):
    """
    Reads 24 bytes (2 bytes per channel for 12 channels) from an MPR121 sensor 
    via the given pyftdi I2C port, and returns timestamp and capacitance data.
    """
    local_time_data = deque(maxlen=NUM_CHANNELS)
    local_cap_data = deque(maxlen=NUM_CHANNELS)

    # Read 24 bytes (2 bytes for each of the 12 channels).
    raw_buffer = i2c_port.read_from(DATA, 24)
    for chan in range(NUM_CHANNELS):
        # Combine two bytes (little-endian)
        value = raw_buffer[2 * chan] | (raw_buffer[2 * chan + 1] << 8)
        local_cap_data.append(value)
        local_time_data.append(time.time())
    return local_time_data, local_cap_data


In [7]:
# Register Addresses
SOFT_RESET = 0x80
CONFIG = 0x5E
DATA = 0x04

# How many sensor samples we want to store before writing
HISTORY_SIZE = 1000

# 12 channels per MPR121, just defining a constant
# for clarity
NUM_CHANNELS = 12

In [8]:
# Find all devices with the given vendor and product IDs
# These correspond to the FT232H
devices = UsbTools.find_all([(0x0403, 0x6014)])

NUM_SENSORS = len(devices)

In [9]:
# We will make lists of the I2C controller objects and port objects
# for each FT232H device
i2c_controllers = []
i2c_ports = []
for dev in devices:
    url = f"ftdi://ftdi:232h:{dev[0].bus}:{dev[0].address}/1"
    controller = I2cController()
    controller.configure(url)
    i2c_controllers.append(controller)
    port = controller.get_port(0x5A)
    # I really need to figure out a better way to determine the correct address,
    # because this looks silly
    try: # Try to read a byte to see if that's the right address
        port.read_from(0x04, 1)
    except: # If not, try again
        port = controller.get_port(0x5B)
    try:
        port.read_from(0x04, 1)
    except:
        port = controller.get_port(0x5C)
    try:
        port.read_from(0x04, 1)
    except:
        port = controller.get_port(0x5D)
    try:
        port.read_from(0x04,1)
    except:
        print('None of the MPR121 addresses is readable, is the FT232H in I2C mode?')
    i2c_ports.append(port)

In [10]:
# Loop over the FT232Hs, reset and configure each
for idx, port in enumerate(i2c_ports):
    # Write to Soft Reset Register (0x63 sends reset command)
    port.write_to(SOFT_RESET, b'\x63')
    # 0x5E is the configuration register, setting to 0x8F starts the MPR121
    # with the config used in the Adafruit library (if needed, I can figure
    # out alternative configurations)
    port.write_to(CONFIG, b'\x8F')
    
    # If we don't sleep here, it doesn't have time to start reading properly
    time.sleep(0.1)
    
    # Test that we have started the MPR121 (if it's not properly started,
    # reading from DATA will give an empty bytearray, so we check if that's
    # what we got back)
    cap = port.read_from(DATA, 24)
    if cap != bytearray(24):
        print(f'Sensor {idx} Started')

Sensor 0 Started


In [11]:
async def record_sensors():
    """
    Asynchronous function for recording from all sensors and periodically
    writing to file. This is done asynchronously so that we are not blocking
    user inputs while recording (so the user can start/stop and add other data
    to the H5 file without stopping everything).
    """
    board_time_data = {
        board: [deque(maxlen=HISTORY_SIZE) for _ in range(NUM_CHANNELS)]
        for board in range(NUM_BOARDS)
    }
    board_cap_data = {
        board: [deque(maxlen=HISTORY_SIZE) for _ in range(NUM_CHANNELS)]
        for board in range(NUM_BOARDS)
    }
    # Track how many reads we've done so that we can write at the appropriate
    # intervals
    loop_ctr = 0
    
    
    with ThreadPoolExecutor(max_workers=NUM_BOARDS) as executor:
        # Create a group for each board and initialize datasets.
        with h5py.File(filename, "w") as h5f:
            board_groups = {}
            for board in range(NUM_BOARDS):
                h5f.create_group(f"board_{board}")
    
        while recording_all:
            # Because the while loop is computing stuff pretty much constantly,
            # it blocks the main execution thread for the notebook unless we
            # await for a moment here. This gives control back to the notebook,
            # and checks if the user has pressed any buttons since the last loop
            await asyncio.sleep(0)
            # Launch parallel sensor reads on all I2C ports.
            futures = [executor.submit(record, port) for port in i2c_ports]
            results = [future.result() for future in futures]  # Each result is (local_time_data, local_cap_data)
            # Append the data from each board to its corresponding deques.
            for board_idx, (local_time, local_cap) in enumerate(results):
                for chan in range(NUM_CHANNELS):
                    board_time_data[board_idx][chan].append(local_time[chan])
                    board_cap_data[board_idx][chan].append(local_cap[chan])
            if loop_ctr == HISTORY_SIZE:
                with h5py.File(filename, "r+") as h5f:
                    for board in range(NUM_BOARDS):
                        group = h5f[f"board_{board}"]
                        group.create_dataset("time_data", data=board_time_data[board], chunks=(NUM_CHANNELS, HISTORY_SIZE), maxshape=(NUM_CHANNELS, None))
                        group.create_dataset("cap_data", data=board_cap_data[board], chunks=(NUM_CHANNELS, HISTORY_SIZE), maxshape=(NUM_CHANNELS, None))
            elif loop_ctr != 0 and loop_ctr%HISTORY_SIZE == 0:
                tmp_ctr = loop_ctr - 1000
                with h5py.File(filename, "r+") as h5f:
                    for i in range(NUM_BOARDS):
                        group = h5f[f"board_{board}"]
                        group["time_data"].resize((NUM_CHANNELS, tmp_ctr + HISTORY_SIZE))
                        group["cap_data"].resize((NUM_CHANNELS, tmp_ctr + HISTORY_SIZE))
                        group["time_data"][:, tmp_ctr:tmp_ctr + HISTORY_SIZE] = board_time_data[board]
                        group["cap_data"][:, tmp_ctr:tmp_ctr + HISTORY_SIZE] = board_cap_data[board]
            loop_ctr += 1

# Start Here

In [24]:
# Define widgets and their functions

# Global flags and variables for async recording.
recording_all = False
recording_task = None

# Counters to check if we are setting multiple start/end times
# This is probably not necessary, but just in case someone makes a mistake and clicks
# the start or stop on the wrong sensor, we can allow saving multiple start and stop times
# instead of forcing the user to start over. It'll be a bit confusing later on, is the
# issue I see, if there are multiple start times and it's not carefully recorded which
# was correct.
start_except_ctr = 0
stop_except_ctr = 0

# --- Create the Output Widget ---
output_area = widgets.Output(layout=widgets.Layout(overflow_y='scroll', height='600px'))

# --- Callback for the Main Start/Stop Button ---
def start_stop_all(button):
    global recording_all, recording_task
    if not recording_all:
        recording_all = True
        button.description = "Stop Recording"
        recording_task = asyncio.create_task(record_sensors())
        with output_area:
            print("Started recording sensors asynchronously.")
    else:
        recording_all = False
        button.description = "Start Recording"
        with output_area:
            print("Stopping recording sensors...")
        if recording_task is not None:
            recording_task.cancel()
        # Make sure all sensor buttons are switched to the stopped state
        # (this will also trigger saving the stop_time for each sensor
        # that was not switched off manually)
        for row in sensor_rows:
            row.children[0].value = False

# --- Callback for Individual Sensor Start/Stop ---
def sensor_recording(sensor_id, starting):
    global start_except_ctr, stop_except_ctr
    with output_area:
        if starting:
            print(f"Sensor {sensor_id}: Recording start triggered.")
            # Set the board ID number based on the sensor ID,
            # sensors 1-6 are on board 0, etc.
            board_id = (sensor_id - 1) // 6
            with h5py.File(filename, "r+") as h5f:
                try:
                    grp = h5f[f"board_{board_id}"].create_group(f"sensor_{sensor_id%6+1}")
                    grp.create_dataset("start_time", data=time.time())
                except ValueError as e:
                    start_except_ctr += 1
                    print("Attempted to re-record the start time after it has been set")
                    h5f[f"board_{board_id}"][f"sensor_{sensor_id%6+1}"].create_dataset(f"start_time{start_except_ctr}", data=time.time())
        else:
            print(f"Sensor {sensor_id}: Recording stop triggered.")
            board_id = (sensor_id - 1) // 6
            with h5py.File(filename, "r+") as h5f:
                try:
                    h5f[f"board_{board_id}"][f"sensor_{sensor_id%6+1}"].create_dataset("stop_time", data=time.time())
                except ValueError as e:
                    stop_except_ctr += 1
                    print("Attempted to re-record the stop time after it has been set")
                    h5f[f"board_{board_id}"][f"sensor_{sensor_id%6+1}"].create_dataset(f"stop_time{stop_except_ctr}", data=time.time())

# --- Callback for Individual Sensor Test Buttons ---
def sensor_test(button):
    with output_area:
        print(f"Sensor {button.sensor_id}: Test triggered - outputting raw data for confirmation.")

# --- Callback for the Clear Output Button ---
def clear_output_callback(button):
    output_area.clear_output()

# --- Create the Main Start/Stop Button ---
all_devices_button = widgets.Button(
    description="Start Recording",
    button_style='primary',
    layout=widgets.Layout(width='400px', height='60px')
)
all_devices_button.on_click(start_stop_all)

# --- Create Sensor Rows: Each row includes a toggle button and a test button ---
sensor_rows = []
for i in range(1, 19):
    # Create the sensor toggle button (starts as "Start").
    toggle_btn = widgets.ToggleButton(
        value=False,
        description=f"Sensor {i}: Start",
        layout=widgets.Layout(width='200px')
    )
    toggle_btn.sensor_id = i

    # Create the sensor-specific test button.
    test_btn = widgets.Button(
        description="Test",
        button_style='info',
        layout=widgets.Layout(width='100px')
    )
    test_btn.sensor_id = i
    test_btn.on_click(sensor_test)

    # Define the observer for the toggle button.
    def on_toggle(change, btn=toggle_btn):
        if change['new']:
            btn.description = f"Sensor {btn.sensor_id}: Stop"
            sensor_recording(btn.sensor_id, starting=True)
        else:
            btn.description = f"Sensor {btn.sensor_id}: Start"
            sensor_recording(btn.sensor_id, starting=False)
    toggle_btn.observe(on_toggle, names='value')

    # Combine the toggle and test buttons into a horizontal box.
    sensor_row = widgets.HBox([toggle_btn, test_btn])
    sensor_rows.append(sensor_row)

# --- Arrange the Sensor Rows into 3 Columns (6 rows per column) ---
columns = []
for j in range(3):
    sensors_subset = sensor_rows[j * 6:(j + 1) * 6]
    column = widgets.VBox(sensors_subset)
    columns.append(column)
sensors_columns = widgets.HBox(columns)

# --- Create the Clear Output Button ---
clear_output_button = widgets.Button(
    description="Clear Output",
    button_style='warning'
)
# Set the layout properties after creation to avoid JSON serialization issues.
clear_output_button.layout.width = '150px'
clear_output_button.layout.height = '40px'
clear_output_button.on_click(clear_output_callback)

# --- Combine Everything into the Final Layout ---
final_ui = widgets.VBox([
    all_devices_button,  # Top large start/stop-all button.
    sensors_columns,     # Sensor grid arranged in 3 columns.
    clear_output_button, # Button to clear the output widget.
    output_area          # Output display area.
])

# Display the complete UI.
display(final_ui)

VBox(children=(Button(button_style='primary', description='Start Recording', layout=Layout(height='60px', widtâ€¦