In [2]:
%pip install matplotlib numpy python-msp430-tools

Collecting matplotlib
  Obtaining dependency information for matplotlib from https://files.pythonhosted.org/packages/57/68/c2feb4667adbf882ffa4b3e0ac9967f848980d9f8b5bebd86644aa67ce6a/matplotlib-3.9.4-cp39-cp39-win_amd64.whl.metadata
  Using cached matplotlib-3.9.4-cp39-cp39-win_amd64.whl.metadata (11 kB)
Collecting numpy
  Obtaining dependency information for numpy from https://files.pythonhosted.org/packages/ea/2b/7fc9f4e7ae5b507c1a3a21f0f15ed03e794c1242ea8a242ac158beb56034/numpy-2.0.2-cp39-cp39-win_amd64.whl.metadata
  Using cached numpy-2.0.2-cp39-cp39-win_amd64.whl.metadata (59 kB)
Collecting python-msp430-tools
  Obtaining dependency information for python-msp430-tools from https://files.pythonhosted.org/packages/55/c6/3bea894c524cf0c188a4ac6b8ad0c6f2743dbf1726a96e8f430f2748de02/python_msp430_tools-0.9.2-py2.py3-none-any.whl.metadata
  Downloading python_msp430_tools-0.9.2-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Obtaining dependency 


[notice] A new release of pip is available: 23.2.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
# Import necessary libraries
import time
import serial
import sys
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets
import serial.tools.list_ports  # For listing available ports

class TUSS44x0Controller:
    """
    Controller class for communicating with MSP430 boards connected to TUSS44x0 ultrasonic sensors.
    Enables bidirectional communication between two boards where either can be transmitter or receiver.
    """

    # MSP430 register addresses for TUSS44x0 control
    TUSS_CONFIG_REG = 0x1000
    TUSS_MODE_REG = 0x1002
    TUSS_TRIGGER_REG = 0x1004
    TUSS_STATUS_REG = 0x1006
    TUSS_DATA_REG = 0x1008

    # Operation modes
    MODE_TRANSMITTER = 0x01
    MODE_RECEIVER = 0x02

    def __init__(self, port1, port2, baud_rate=115200):
        """Initialize connection to both MSP430 boards"""
        try:
            self.msp1 = serial.Serial(port1, baud_rate, timeout=1)
            self.msp2 = serial.Serial(port2, baud_rate, timeout=1)
            print(f"Connected to MSP430 boards on {port1} and {port2}")

            # Default assignment (can be switched later)
            self.transmitter = self.msp1
            self.receiver = self.msp2

            # Initialize both boards
            self._initialize_board(self.msp1)
            self._initialize_board(self.msp2)

            # Store measurement history
            self.measurement_history = []

        except serial.SerialException as e:
            print(f"Error connecting to MSP430 boards: {e}")
            raise

    def _initialize_board(self, board):
        """Initialize a single MSP430 board"""
        # Reset the board
        self._send_command(board, "RESET")
        time.sleep(0.5)

        # Configure SPI for communication with TUSS44x0
        self._send_command(board, "CONFIG_SPI")

        # Initial configuration of the TUSS44x0 sensor
        self._write_register(board, self.TUSS_CONFIG_REG, 0x0010)  # Basic configuration
        time.sleep(0.1)

        # Verify communication by reading back configuration
        config = self._read_register(board, self.TUSS_CONFIG_REG)
        if config != 0x0010:
            print(f"Warning: Configuration verification failed for board on {board.port}")

    def switch_roles(self):
        """Switch transmitter and receiver roles between the two MSP430 boards"""
        self.transmitter, self.receiver = self.receiver, self.transmitter

        # Update modes on both boards
        self._set_mode(self.transmitter, self.MODE_TRANSMITTER)
        self._set_mode(self.receiver, self.MODE_RECEIVER)

        print(f"Roles switched: Transmitter is now on {self.transmitter.port}, Receiver is now on {self.receiver.port}")

    def _set_mode(self, board, mode):
        """Set the mode (transmitter or receiver) for a specific board"""
        self._write_register(board, self.TUSS_MODE_REG, mode)
        time.sleep(0.1)
        current_mode = self._read_register(board, self.TUSS_MODE_REG)

        if current_mode != mode:
            print(f"Warning: Mode setting failed for board on {board.port}")
            return False
        return True

    def configure_transmitter(self, frequency=40000, pulse_count=8, gain=3):
        """Configure transmitter parameters"""
        # Combine parameters into a single configuration word
        config = ((frequency // 1000) << 8) | ((pulse_count & 0x0F) << 4) | (gain & 0x07)
        self._write_register(self.transmitter, self.TUSS_CONFIG_REG, config)
        print(f"Transmitter configured: {frequency}Hz, {pulse_count} pulses, gain level {gain}")

    def configure_receiver(self, gain=5, filter_bandwidth=2000, detection_threshold=500):
        """Configure receiver parameters"""
        # Combine parameters into configuration words
        config1 = ((gain & 0x07) << 8) | ((filter_bandwidth // 100) & 0xFF)
        config2 = detection_threshold & 0xFFFF

        self._write_register(self.receiver, self.TUSS_CONFIG_REG, config1)
        self._write_register(self.receiver, self.TUSS_CONFIG_REG + 2, config2)
        print(f"Receiver configured: gain level {gain}, filter bandwidth {filter_bandwidth}Hz, threshold {detection_threshold}")

    def send_pulse(self):
        """Trigger the transmitter to send an ultrasonic pulse"""
        # Make sure transmitter is in the right mode
        if not self._set_mode(self.transmitter, self.MODE_TRANSMITTER):
            return False

        # Make sure receiver is ready
        if not self._set_mode(self.receiver, self.MODE_RECEIVER):
            return False

        # Trigger the pulse
        self._write_register(self.transmitter, self.TUSS_TRIGGER_REG, 0x0001)
        return True

    def read_measurement(self, timeout=1.0):
        """Read measurement data from the receiver"""
        start_time = time.time()

        # Wait for data ready flag in status register
        while time.time() - start_time < timeout:
            status = self._read_register(self.receiver, self.TUSS_STATUS_REG)
            if status & 0x0001:  # Data ready bit
                # Read measurement data
                data = self._read_register(self.receiver, self.TUSS_DATA_REG)
                time_of_flight = self._read_register(self.receiver, self.TUSS_DATA_REG + 2)
                signal_strength = self._read_register(self.receiver, self.TUSS_DATA_REG + 4)

                # Calculate distance (assuming speed of sound is 343 m/s)
                # time_of_flight is in microseconds, so distance in meters is:
                distance = (time_of_flight * 343.0) / 2000000.0

                result = {
                    'distance': distance,
                    'time_of_flight': time_of_flight,
                    'signal_strength': signal_strength,
                    'raw_data': data,
                    'timestamp': time.time()
                }

                self.measurement_history.append(result)
                return result
            time.sleep(0.01)

        print("Timeout: No measurement data received")
        return None

    def run_continuous_measurement(self, count=10, interval=0.5):
        """Run a series of measurements with the current configuration"""
        results = []

        for i in range(count):
            print(f"Measurement {i+1}/{count}", end="\r")
            if self.send_pulse():
                result = self.read_measurement()
                if result:
                    results.append(result)
            time.sleep(interval)

        print(f"Completed {len(results)} measurements")
        return results

    def _send_command(self, board, command):
        """Send a command string to the MSP430"""
        cmd = f"{command}\n".encode()
        board.write(cmd)
        time.sleep(0.1)
        response = board.read(100)
        return response.decode().strip()

    def _write_register(self, board, address, value):
        """Write a value to a register on the MSP430"""
        cmd = f"WRITE {address:04X} {value:04X}\n".encode()
        board.write(cmd)
        time.sleep(0.05)
        response = board.read(100)
        return "OK" in response.decode()

    def _read_register(self, board, address):
        """Read a value from a register on the MSP430"""
        cmd = f"READ {address:04X}\n".encode()
        board.write(cmd)
        time.sleep(0.05)
        response = board.read(100).decode().strip()

        # Parse response, expecting format like "READ 1000: XXXX"
        try:
            value = int(response.split(":")[-1].strip(), 16)
            return value
        except (ValueError, IndexError):
            print(f"Error parsing register read response: {response}")
            return None

    def plot_measurement_history(self):
        """Plot the history of measurements"""
        if not self.measurement_history:
            print("No measurement data available")
            return

        # Extract data from measurement history
        timestamps = [m['timestamp'] - self.measurement_history[0]['timestamp'] for m in self.measurement_history]
        distances = [m['distance'] for m in self.measurement_history]
        signal_strengths = [m['signal_strength'] for m in self.measurement_history]

        # Create plot
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))

        # Distance plot
        ax1.plot(timestamps, distances, 'b-o')
        ax1.set_xlabel('Time (s)')
        ax1.set_ylabel('Distance (m)')
        ax1.set_title('Distance Measurements')
        ax1.grid(True)

        # Signal strength plot
        ax2.plot(timestamps, signal_strengths, 'r-o')
        ax2.set_xlabel('Time (s)')
        ax2.set_ylabel('Signal Strength')
        ax2.set_title('Signal Strength')
        ax2.grid(True)

        plt.tight_layout()
        plt.show()

    def analyze_signal(self, raw_data=None):
        """Analyze the received signal (either the latest or provided raw data)"""
        if raw_data is None:
            if not self.measurement_history:
                print("No measurement data available")
                return
            raw_data = self.measurement_history[-1]['raw_data']

        # Here we would normally unpack the raw signal data from the MSP430
        # For demonstration, we'll generate a simulated signal based on the raw_data value
        t = np.linspace(0, 1, 1000)
        signal = np.sin(2 * np.pi * 40000 * t) * np.exp(-5 * t) * (raw_data / 1000)

        plt.figure(figsize=(10, 6))
        plt.plot(t * 1000, signal)  # Convert time to milliseconds
        plt.xlabel('Time (ms)')
        plt.ylabel('Amplitude')
        plt.title('Ultrasonic Signal Analysis')
        plt.grid(True)
        plt.show()

    def clear_history(self):
        """Clear the measurement history"""
        self.measurement_history = []
        print("Measurement history cleared")

    def close(self):
        """Close connections to both MSP430 boards"""
        if hasattr(self, 'msp1') and self.msp1.is_open:
            self.msp1.close()
        if hasattr(self, 'msp2') and self.msp2.is_open:
            self.msp2.close()
        print("Connections to MSP430 boards closed")

# Function to list available COM ports
def list_available_ports():
    """List all available serial ports"""
    ports = serial.tools.list_ports.comports()
    available_ports = []

    print("Available COM ports:")
    for port in ports:
        print(f"- {port.device}: {port.description}")
        available_ports.append(port.device)

    return available_ports

# Jupyter notebook interactive interface with Windows port selection
def create_interactive_interface():
    """Create an interactive interface using ipywidgets"""
    # Define controller variable in global scope to access it from widget callbacks
    global controller
    controller = None

    # Get available ports
    available_ports = list_available_ports()
    port_options = available_ports if available_ports else ["COM1", "COM2", "COM3", "COM4"]

    # Port selection widgets with dropdown for Windows COM ports
    port1_dropdown = widgets.Dropdown(
        options=port_options,
        description='Board 1 Port:',
        value=port_options[0] if port_options else "COM1"
    )

    port2_dropdown = widgets.Dropdown(
        options=port_options,
        description='Board 2 Port:',
        value=port_options[1] if len(port_options) > 1 else "COM2"
    )

    # Refresh ports button
    refresh_ports_button = widgets.Button(description='Refresh Ports')

    baud_dropdown = widgets.Dropdown(
        options=[9600, 19200, 38400, 57600, 115200, 230400],
        description='Baud Rate:',
        value=115200
    )

    # Connect button
    connect_button = widgets.Button(description='Connect')
    connection_status = widgets.Output()

    # Transmitter configuration widgets
    tx_freq = widgets.IntSlider(description='Freq (kHz):', min=30, max=50, value=40)
    tx_pulses = widgets.IntSlider(description='Pulses:', min=1, max=15, value=8)
    tx_gain = widgets.IntSlider(description='Gain:', min=0, max=7, value=3)

    # Receiver configuration widgets
    rx_gain = widgets.IntSlider(description='Gain:', min=0, max=7, value=5)
    rx_bw = widgets.IntSlider(description='BW (Hz):', min=500, max=5000, step=100, value=2000)
    rx_threshold = widgets.IntSlider(description='Threshold:', min=0, max=1000, step=10, value=500)

    # Measurement widgets
    measure_button = widgets.Button(description='Single Measure')
    continuous_button = widgets.Button(description='Continuous')
    stop_button = widgets.Button(description='Stop')
    count_input = widgets.IntText(description='Count:', value=10)
    interval_input = widgets.FloatText(description='Interval (s):', value=0.5)

    # Display widgets
    plot_button = widgets.Button(description='Plot Results')
    analyze_button = widgets.Button(description='Analyze Signal')
    clear_button = widgets.Button(description='Clear History')
    switch_button = widgets.Button(description='Switch Tx/Rx')

    # Results display
    results_output = widgets.Output()
    plot_output = widgets.Output()

    # Refresh ports callback
    def on_refresh_ports_button_clicked(b):
        available_ports = list_available_ports()
        if available_ports:
            port1_dropdown.options = available_ports
            port2_dropdown.options = available_ports
            port1_dropdown.value = available_ports[0]
            if len(available_ports) > 1:
                port2_dropdown.value = available_ports[1]

    # Connect button callback
    def on_connect_button_clicked(b):
        global controller
        with connection_status:
            clear_output(wait=True)
            try:
                controller = TUSS44x0Controller(port1_dropdown.value, port2_dropdown.value, baud_dropdown.value)
                print("Connection successful")
            except Exception as e:
                print(f"Connection failed: {str(e)}")

    # Configure transmitter callback
    def on_tx_config_change(change):
        if controller is not None:
            controller.configure_transmitter(tx_freq.value * 1000, tx_pulses.value, tx_gain.value)

    # Configure receiver callback
    def on_rx_config_change(change):
        if controller is not None:
            controller.configure_receiver(rx_gain.value, rx_bw.value, rx_threshold.value)

    # Measurement callbacks
    def on_measure_button_clicked(b):
        if controller is not None:
            with results_output:
                clear_output(wait=True)
                controller.send_pulse()
                result = controller.read_measurement()
                if result:
                    print(f"Distance: {result['distance']:.3f} m")
                    print(f"Time of flight: {result['time_of_flight']} µs")
                    print(f"Signal strength: {result['signal_strength']}")

    continuous_running = False

    def on_continuous_button_clicked(b):
        global continuous_running
        continuous_running = True
        if controller is not None:
            with results_output:
                clear_output(wait=True)
                print("Starting continuous measurement...")

                try:
                    for i in range(count_input.value):
                        if not continuous_running:
                            print("Measurement stopped")
                            break

                        clear_output(wait=True)
                        print(f"Measurement {i+1}/{count_input.value}")

                        controller.send_pulse()
                        result = controller.read_measurement()
                        if result:
                            print(f"Distance: {result['distance']:.3f} m")
                            print(f"Time of flight: {result['time_of_flight']} µs")
                            print(f"Signal strength: {result['signal_strength']}")

                        time.sleep(interval_input.value)

                    print("Continuous measurement completed")
                except Exception as e:
                    print(f"Error during measurement: {str(e)}")

    def on_stop_button_clicked(b):
        global continuous_running
        continuous_running = False

    # Display callbacks
    def on_plot_button_clicked(b):
        if controller is not None:
            with plot_output:
                clear_output(wait=True)
                controller.plot_measurement_history()

    def on_analyze_button_clicked(b):
        if controller is not None:
            with plot_output:
                clear_output(wait=True)
                controller.analyze_signal()

    def on_clear_button_clicked(b):
        if controller is not None:
            controller.clear_history()
            with results_output:
                clear_output(wait=True)
                print("Measurement history cleared")

    def on_switch_button_clicked(b):
        if controller is not None:
            controller.switch_roles()

    # Connect callbacks to widgets
    refresh_ports_button.on_click(on_refresh_ports_button_clicked)
    connect_button.on_click(on_connect_button_clicked)
    measure_button.on_click(on_measure_button_clicked)
    continuous_button.on_click(on_continuous_button_clicked)
    stop_button.on_click(on_stop_button_clicked)
    plot_button.on_click(on_plot_button_clicked)
    analyze_button.on_click(on_analyze_button_clicked)
    clear_button.on_click(on_clear_button_clicked)
    switch_button.on_click(on_switch_button_clicked)

    # Observe transmitter configuration changes
    tx_freq.observe(on_tx_config_change, names='value')
    tx_pulses.observe(on_tx_config_change, names='value')
    tx_gain.observe(on_tx_config_change, names='value')

    # Observe receiver configuration changes
    rx_gain.observe(on_rx_config_change, names='value')
    rx_bw.observe(on_rx_config_change, names='value')
    rx_threshold.observe(on_rx_config_change, names='value')

    # Create UI layout
    connection_box = widgets.VBox([
        widgets.HTML(value="<b>Connection Settings</b>"),
        widgets.HBox([port1_dropdown, port2_dropdown]),
        widgets.HBox([baud_dropdown, refresh_ports_button]),
        widgets.HBox([connect_button]),
        connection_status
    ])

    tx_config_box = widgets.VBox([
        widgets.HTML(value="<b>Transmitter Configuration</b>"),
        tx_freq, tx_pulses, tx_gain
    ])

    rx_config_box = widgets.VBox([
        widgets.HTML(value="<b>Receiver Configuration</b>"),
        rx_gain, rx_bw, rx_threshold
    ])

    config_box = widgets.HBox([tx_config_box, rx_config_box])

    measurement_box = widgets.VBox([
        widgets.HTML(value="<b>Measurement Control</b>"),
        widgets.HBox([measure_button, continuous_button, stop_button]),
        widgets.HBox([count_input, interval_input]),
        results_output
    ])

    display_box = widgets.VBox([
        widgets.HTML(value="<b>Data Analysis</b>"),
        widgets.HBox([plot_button, analyze_button, clear_button, switch_button]),
        plot_output
    ])

    # Main UI
    ui = widgets.VBox([
        connection_box,
        widgets.HTML(value="<hr>"),
        config_box,
        widgets.HTML(value="<hr>"),
        measurement_box,
        widgets.HTML(value="<hr>"),
        display_box
    ])

    return ui

# Usage example for Jupyter Notebook
# First, install required packages:
# !pip install pyserial matplotlib ipywidgets



In [11]:
# Then display the interactive interface
ui = create_interactive_interface()
display(ui)

Available COM ports:
- COM4: MSP Application UART1 (COM4)
- COM5: MSP Debug Interface (COM5)
- COM8: Serieel USB-apparaat (COM8)
- COM7: MSP Debug Interface (COM7)
- COM11: Serieel USB-apparaat (COM11)
- COM3: Intel(R) Active Management Technology - SOL (COM3)
- COM9: Standaard seriële verbinding via Bluetooth (COM9)
- COM10: Standaard seriële verbinding via Bluetooth (COM10)
- COM6: MSP Application UART1 (COM6)
- COM1: Communicatiepoort (COM1)


VBox(children=(VBox(children=(HTML(value='<b>Connection Settings</b>'), HBox(children=(Dropdown(description='B…