In [5]:
import os
from pathlib import Path
import scipy.io as sio

def load_drivecycle_mat_files(base_folder: str):
    """
    Scans the 'drivecycle' subfolder under base_folder, finds all .mat files,
    and loads each one using scipy.io.loadmat.

    Returns:
        A dict mapping each .mat filename (without extension) to the loaded dict.
    """
    drivecycle_dir = Path(base_folder) / "drivecycle"
    if not drivecycle_dir.is_dir():
        raise FileNotFoundError(f"Directory not found: {drivecycle_dir}")

    mat_data = {}
    for mat_file in drivecycle_dir.glob("*.mat"):
        # loadmat returns a dict: variable names → arrays/objects
        try:
            data_dict = sio.loadmat(mat_file)
        except NotImplementedError:
            # If it's a v7.3 MAT-file (HDF5-based), scipy.loadmat will fail.
            # You can either skip it or handle with h5py (see below).
            print(f"Warning: {mat_file.name} might be v7.3. Skipping scipy.loadmat.")
            continue

        # Strip off the path and extension, e.g. "CYC_WLTC_Low_3"
        key = mat_file.stem  
        mat_data[key] = data_dict

        print(f"Loaded '{mat_file.name}' → variables: {[k for k in data_dict.keys() if not k.startswith('__')]}")
    return mat_data

# Load the reference speed
base_folder = ''
all_cycles = load_drivecycle_mat_files(base_folder)

# Example: inspect one of the loaded cycles
example_key = next(iter(all_cycles))
example_data = all_cycles[example_key]

print("\nExample contents of", example_key)
for var_name, arr in example_data.items():
    if var_name.startswith("__"):
        continue
    print(f"  • {var_name}: shape = {getattr(arr, 'shape', 'scalar')}  dtype = {getattr(arr, 'dtype', '')}")


Loaded 'CYC_WLTC_Low_3.mat' → variables: ['cyc_mph']
Loaded 'CYC_WLTP.mat' → variables: ['cyc_mph']
Loaded 'CYC_WLTC_High_3_2.mat' → variables: ['cyc_mph']
Loaded 'CYC_WLTC_Medium_3_2.mat' → variables: ['cyc_mph']
Loaded 'CYC_WLTC_ExtraHigh_3.mat' → variables: ['cyc_mph']

Example contents of CYC_WLTC_Low_3
  • cyc_mph: shape = (590, 2)  dtype = float64


In [None]:
import Jetson.GPIO as GPIO
GPIO.cleanup()
import time
import board
import busio
from adafruit_pca9685 import PCA9685
import time
import cantools
import can

def controller(lastest_speed, speed_history, reference_speed):

    

    return pwm_output


In [None]:
#!/usr/bin/env python3
import Jetson.GPIO as GPIO
GPIO.cleanup()
import time
import board
import busio
from adafruit_pca9685 import PCA9685
import time
import cantools
import can

# Load the reference speed
base_folder = os.path.dirname(os.path.abspath(__file__))
all_cycles = load_drivecycle_mat_files(base_folder)

# Example: inspect one of the loaded cycles
example_key = next(iter(all_cycles))
example_data = all_cycles[example_key]

print("\nExample contents of", example_key)
for var_name, arr in example_data.items():
    if var_name.startswith("__"):
        continue
    print(f"  • {var_name}: shape = {getattr(arr, 'shape', 'scalar')}  dtype = {getattr(arr, 'dtype', '')}")

# PWM signal generation setups
i2c = busio.I2C(board.SCL, board.SDA)
pca = PCA9685(i2c, address=0x40)
pca.frequency = 1000 #can go up to ~1.6 kHz if you want faster switching
# convert duty cycle percentage → 16-bit value
def set_duty(channel, percent):
    """
    channel: 0–15
    percent: 0.0 to 100.0 (percentage of full on)
    """
    if not (0 <= percent <= 100):
        raise ValueError("percent must be between 0.0 and 100.0")
    # PCA9685 uses 12-bit internally but busdevice scales it to 16-bit.
    # 0 = always off, 0xFFFF = always on. Linear interpolation for duty.
    duty_16bit = int(percent * 65535 / 100)
    pca.channels[channel].duty_cycle = duty_16bit

# Read CAN signal setups
# ─── STEP 1: Load the DBC file ────────────────────────────────────────────────
DBC_PATH = '/home/guiliang/Desktop/DriveRobot/KAVL_V3.dbc'

try:
    db = cantools.database.load_file(DBC_PATH)
except FileNotFoundError:
    print(f"ERROR: Cannot find DBC at '{DBC_PATH}'.")
    raise
# Grab the message definition for "Speed_and_Force" (ID = 0x009)
try:
    speed_force_msg = db.get_message_by_name('Speed_and_Force')
except KeyError:
    print("ERROR: 'Speed_and_Force' not found in the DBC.")
    raise
# ─── STEP 2: Prepare history container ───────────────────────────────────────
force_history = [] 
latest_force = None
speed_history = []
lastest_speed = None
CAN_INTERFACE = 'can0'

# ─── STEP 3: Open the SocketCAN interface ─────────────────────────────────────
# Make sure you've already run (or something similar) on the Jetson:
#   sudo ip link set can0 up type can bitrate 500000 dbitrate 1000000 fd on
try:
    bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')
except OSError:
    print(f"ERROR: Cannot open CAN interface '{CAN_INTERFACE}'.")
    print("Make sure you have run something like:\n"
          "    sudo ip link set can0 up type can bitrate 500000 dbitrate 1000000 fd on")
    raise
print(f"Listening on {CAN_INTERFACE} for ID=0x{speed_force_msg.frame_id:03X} ('Speed_and_Force')…")
print("Press Ctrl+C to exit.\n")

# ─── STEP 4: Loop forever, read Force_N, print & store ────────────────────────
try:
    while True:
        # Wait up to 1 second for a CAN frame; msg will be a can.Message or None.
        msg = bus.recv(timeout=1.0)
        if msg is None:
            continue
        # Only decode if the ID matches 0x009:
        if msg.arbitration_id != speed_force_msg.frame_id:
            continue
        # Decode raw payload into a dict {signal_name: value}
        try:
            decoded = db.decode_message(msg.arbitration_id, msg.data)
        except KeyError:
            # ID wasn’t in the DBC (unlikely here), so skip.
            continue
        # Pull out the Force_N field (in Newtons)
        force_value = decoded.get('Force_N')
        speed_value = decoded.get('Speed_kph')
        if force_value is None:
            continue
        if speed_value is None:
            continue
        # Update our “live” variable:
        latest_force = force_value
        lastest_speed = speed_value

        # Append to history list:
        force_history.append(force_value)
        speed_history.append(speed_value)

        # Print live to console
        timestamp = time.time()
        print(f"[{timestamp:.3f}] Force_N = {latest_force:.3f} N; Speed_kph = {lastest_speed:.3f} kph")

        pwm_output = controller(lastest_speed, speed_history, reference_speed)

        if pwm_output >= 0:
            set_duty(0, pwm_output) # channel 0 for accleration pedal
        else:
            set_duty(4, -pwm_output) # channel 4 for break pedal
        print(f"Channel 0 Acceleration dutyCycle: {pwm_output}%; Channel 4 Decceleration dutyCycle: {pwm_output}%;")
        


except KeyboardInterrupt:
    print("\nInterrupted by user. Exiting…")
finally:
    bus.shutdown()

for ch in range(16):
    pca.channels[ch].duty_cycle = 0
pca.deinit()

  bus = can.interface.Bus(channel=CAN_INTERFACE, bustype='socketcan')


Listening on can0 for ID=0x009 ('Speed_and_Force')…
Press Ctrl+C to exit.


Interrupted by user. Exiting…
