In [2]:
import serial
import time
import math
import csv
from gekko import GEKKO
import numpy as np

ser = serial.Serial(
    port='COM6',
    baudrate=115200,
    timeout=1
)

# List to store results
results = []

def send_command(command):
    ser.reset_input_buffer()  # Clear any pending input
    ser.write(f"{command}\r\n".encode())
    time.sleep(0.2)  # Wait for processing
    
    while ser.in_waiting:
        response = ser.readline().decode().strip()
        if response:  # Only print non-empty responses
            print(f"Pico response: {response}")
            # Check if response contains MPU6050 angles
            if response.startswith("MPU:"):
                return [float(x) for x in response[4:].split()]
    return None

def send_angles(pitch, yaw):
    command = f"ANGLE {pitch} {yaw}"
    return send_command(command)

def setup_mpc():
    m = GEKKO()
    
    # Define time points
    m.time = np.linspace(0, 2, 21)  # 2 seconds, 21 points
    
    # Define variables
    pitch_measured = m.CV(value=0)    # Controlled Variable (measured pitch)
    servo_angle = m.MV(value=90)      # Manipulated Variable (servo angle)
    
    # MV properties
    servo_angle.LOWER = 0
    servo_angle.UPPER = 180
    servo_angle.DMAX = 10             # Maximum change per time step
    servo_angle.STATUS = 1            # Enable optimization
    
    # CV properties
    pitch_measured.STATUS = 1         # Include in objective function
    
    # Simple first-order dynamic model
    # dpitch/dt = K * (servo_angle - pitch)
    K = 0.8  # Process gain
    tau = 0.5  # Time constant
    m.Equation(tau * pitch_measured.dt() == -pitch_measured + K * servo_angle)
    
    # Configure MPC
    m.options.IMODE = 6    # MPC control
    m.options.CV_TYPE = 2  # Squared error objective
    m.options.NODES = 3    # Collocation nodes
    
    return m, pitch_measured, servo_angle

def correct_pitch_mpc(target_pitch, actual_pitch, current_servo_pitch):
    # Initialize or get existing MPC
    if not hasattr(correct_pitch_mpc, 'mpc'):
        correct_pitch_mpc.mpc, correct_pitch_mpc.pitch_cv, correct_pitch_mpc.servo_mv = setup_mpc()
    
    m = correct_pitch_mpc.mpc
    pitch_cv = correct_pitch_mpc.pitch_cv
    servo_mv = correct_pitch_mpc.servo_mv
    
    # Update current state and setpoint
    pitch_cv.MEAS = actual_pitch
    pitch_cv.SP = target_pitch
    
    # Solve MPC
    try:
        m.solve(disp=False)
        # Get first control move
        new_servo_pitch = float(servo_mv.NEWVAL)
        # Ensure angles stay within 0-180 range
        new_servo_pitch = max(0, min(180, new_servo_pitch))
        return new_servo_pitch
    except:
        print("MPC solution failed, using current angle")
        return current_servo_pitch

try:
    # Clear initial buffer
    time.sleep(2)
    ser.reset_input_buffer()
    
    # Use sample.csv file
    csv_path = "sample.csv"
    
    # Read angles from CSV
    with open(csv_path, 'r') as file:
        csv_reader = csv.DictReader(file)
        
        for row in csv_reader:
            # Get target angles from CSV
            target_pitch = float(row['pitch'])
            target_yaw = float(row['yaw'])
            
            # Ensure angles stay within 0-180 range
            target_pitch = max(0, min(180, target_pitch))
            target_yaw = max(0, min(180, target_yaw))
            
            # Initialize current servo pitch
            current_servo_pitch = target_pitch
            
            # Send angles and get actual readings
            actual_angles = send_angles(current_servo_pitch, target_yaw)
            
            if actual_angles:
                actual_pitch, actual_yaw = actual_angles
                pitch_error = abs(target_pitch - actual_pitch)
                yaw_error = abs(target_yaw - actual_yaw)
                
                # Store results
                results.append({
                    'Target Pitch': target_pitch,
                    'Actual Pitch': actual_pitch,
                    'Error': pitch_error
                })
                
                print(f"Target angles - Pitch: {target_pitch:.2f}, Yaw: {target_yaw:.2f}")
                print(f"Actual angles - Pitch: {actual_pitch:.2f}, Yaw: {actual_yaw:.2f}")
                print(f"Errors - Pitch: {pitch_error:.2f}, Yaw: {yaw_error:.2f}")
                
                # Calculate corrected pitch angle using MPC
                current_servo_pitch = correct_pitch_mpc(target_pitch, actual_pitch, current_servo_pitch)
                print(f"Corrected servo pitch: {current_servo_pitch:.2f}")
            
            # Small delay between movements
            time.sleep(0.05)

except KeyboardInterrupt:
    # Save results to CSV on keyboard interrupt
    with open('results.csv', 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['Target Pitch', 'Actual Pitch', 'Error'])
        writer.writeheader()
        writer.writerows(results)
        print("\nResults saved to results.csv")

finally:
    ser.close()
    print("Serial connection closed")

Pico response: ANGLE 90.0 45.0
Pico response: Debug: Received 'ANGLE 90.0 45.0'
Pico response: MPU: 76.62434 42.99139
Target angles - Pitch: 90.00, Yaw: 45.00
Actual angles - Pitch: 76.62, Yaw: 42.99
Errors - Pitch: 13.38, Yaw: 2.01
Corrected servo pitch: 100.00
Pico response: ANGLE 88.42951 45.02741
Pico response: Debug: Received 'ANGLE 88.42951 45.02741'
Pico response: MPU: 70.1647 34.37241
Target angles - Pitch: 88.43, Yaw: 45.03
Actual angles - Pitch: 70.16, Yaw: 34.37
Errors - Pitch: 18.26, Yaw: 10.66
Corrected servo pitch: 100.00
Pico response: ANGLE 86.86096 45.10961
Pico response: Debug: Received 'ANGLE 86.86096 45.10961'
Pico response: MPU: 81.67088 31.74997
Target angles - Pitch: 86.86, Yaw: 45.11
Actual angles - Pitch: 81.67, Yaw: 31.75
Errors - Pitch: 5.19, Yaw: 13.36
Corrected servo pitch: 100.00
Pico response: ANGLE 85.29621 45.24651
Pico response: Debug: Received 'ANGLE 85.29621 45.24651'
Pico response: MPU: 81.40479 28.93801
Target angles - Pitch: 85.30, Yaw: 45.25
Actu

In [None]:
 Include a sensitivity analysis to show the steady state and dynamic relationships between the manipulated variables and the controlled variables. Show simulated control and optimization results that achieve a best objective. For this third phase of the project, it is not necessary to show the estimator and controller working together. Uncorrupted data and full state feedback (all states assumed to be measured) are acceptable for this progress report. The progress report should be a draft section of the final report that includes control and optimization results and discussion.

In [1]:
import serial
import time
import math
import csv
from gekko import GEKKO
import numpy as np
import matplotlib.pyplot as plt

ser = serial.Serial(
    port='COM6',
    baudrate=115200,
    timeout=1
)

# List to store results for each run
results = {i: [] for i in range(1, 55)}  # Initialize results dict for 54 runs

def send_command(command):
    ser.reset_input_buffer()
    ser.write(f"{command}\r\n".encode())
    time.sleep(0.01)
    
    while ser.in_waiting:
        response = ser.readline().decode().strip()
        if response:
            print(f"Pico response: {response}")
            if response.startswith("MPU:"):
                return [float(x) for x in response[4:].split()]
    return None

def send_angles(pitch, yaw):
    command = f"ANGLE {pitch} {yaw}"
    return send_command(command)

def setup_mpc():
    m = GEKKO()
    m.time = np.linspace(0, 2, 21)
    
    yaw_measured = m.CV(value=0)
    servo_angle = m.MV(value=90)
    
    servo_angle.LOWER = 0
    servo_angle.UPPER = 180
    servo_angle.DMAX = 10
    servo_angle.STATUS = 1
    
    yaw_measured.STATUS = 1
    
    K = 0.8
    tau = 0.5
    m.Equation(tau * yaw_measured.dt() == -yaw_measured + K * servo_angle)
    
    m.options.IMODE = 6
    m.options.CV_TYPE = 2
    m.options.NODES = 3
    
    return m, yaw_measured, servo_angle

def correct_yaw_mpc(target_yaw, actual_yaw, current_servo_yaw):
    if not hasattr(correct_yaw_mpc, 'mpc'):
        correct_yaw_mpc.mpc, correct_yaw_mpc.yaw_cv, correct_yaw_mpc.servo_mv = setup_mpc()
    
    m = correct_yaw_mpc.mpc
    yaw_cv = correct_yaw_mpc.yaw_cv
    servo_mv = correct_yaw_mpc.servo_mv
    
    yaw_cv.MEAS = actual_yaw
    yaw_cv.SP = target_yaw
    
    try:
        m.solve(disp=False)
        new_servo_yaw = float(servo_mv.NEWVAL)
        new_servo_yaw = max(0, min(180, new_servo_yaw))
        return new_servo_yaw
    except:
        print("MPC solution failed, using current angle")
        return current_servo_yaw

try:
    time.sleep(0.01)
    ser.reset_input_buffer()
    
    # Read from angle_tracking_all_hr.csv
    with open('angle_tracking_all_hr.csv', 'r') as file:
        csv_reader = csv.DictReader(file)
        
        # Process each run (1-54)
        for run in range(1, 55):
            command_count = 0
            pitch_col = f'pitch_{run}'
            yaw_col = f'yaw_{run}'
            
            # Reset file pointer for each run
            file.seek(0)
            next(csv_reader)  # Skip header
            
            for row in csv_reader:
                # Check if we've hit command limit or NaN
                if command_count >= 113 or row[pitch_col] == 'NaN' or row[yaw_col] == 'NaN':
                    break
                    
                target_pitch = float(row[pitch_col])
                target_yaw = float(row[yaw_col])
                
                # Ensure angles stay within bounds
                target_pitch = max(0, min(180, target_pitch))
                target_yaw = max(0, min(180, target_yaw))
                
                current_servo_yaw = target_yaw
                actual_angles = send_angles(target_pitch, current_servo_yaw)
                
                if actual_angles:
                    actual_pitch, actual_yaw = actual_angles
                    pitch_error = abs(target_pitch - actual_pitch)
                    yaw_error = abs(target_yaw - actual_yaw)
                    
                    # Store results for this run
                    results[run].append({
                        f'pitch_{run}': target_pitch,
                        f'measured_pitch_{run}': actual_pitch,
                        f'err_{run}': pitch_error,
                        f'yaw_{run}': target_yaw,
                        f'measured_yaw_{run}': actual_yaw,
                        f'yaw_err_{run}': yaw_error
                    })
                    
                    print(f"Run {run} - Command {command_count + 1}")
                    print(f"Target angles - Pitch: {target_pitch:.2f}, Yaw: {target_yaw:.2f}")
                    print(f"Actual angles - Pitch: {actual_pitch:.2f}, Yaw: {actual_yaw:.2f}")
                    print(f"Errors - Pitch: {pitch_error:.2f}, Yaw: {yaw_error:.2f}")
                    
                    current_servo_yaw = correct_yaw_mpc(target_yaw, actual_yaw, current_servo_yaw)
                    print(f"Corrected servo yaw: {current_servo_yaw:.2f}\n")
                
                command_count += 1
                time.sleep(0.01)

except KeyboardInterrupt:
    print("\nOperation interrupted by user")

finally:
    # Save all results to a single CSV
    all_data = []
    max_rows = 0
    
    # First find the maximum number of rows across all runs
    for run in range(1, 55):
        if results[run]:
            max_rows = max(max_rows, len(results[run]))
    
    # Create the combined data
    for i in range(max_rows):
        row_data = {}
        for run in range(1, 55):
            if results[run] and i < len(results[run]):
                row_data.update(results[run][i])
        all_data.append(row_data)
    
    if all_data:
        filename = 'combined_results.csv'
        fieldnames = []
        for run in range(1, 55):
            if results[run]:
                fieldnames.extend(list(results[run][0].keys()))
        
        with open(filename, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(all_data)
            print(f"All results saved to {filename}")
    
    ser.close()
    print("Serial connection closed")

Pico response: ANGLE 20.15213678 10.98429288
Pico response: Debug: Received 'ANGLE 20.15213678 10.98429288'
Pico response: MPU: 14.34869 154.7639
Run 1 - Command 1
Target angles - Pitch: 20.15, Yaw: 10.98
Actual angles - Pitch: 14.35, Yaw: 154.76
Errors - Pitch: 5.80, Yaw: 143.78
Corrected servo yaw: 80.00

Pico response: ANGLE 21.00817339 18.46096319
Pico response: Debug: Received 'ANGLE 21.00817339 18.46096319'
Pico response: MPU: 22.50475 156.3179
Run 1 - Command 2
Target angles - Pitch: 21.01, Yaw: 18.46
Actual angles - Pitch: 22.50, Yaw: 156.32
Errors - Pitch: 1.50, Yaw: 137.86
Corrected servo yaw: 80.00

Pico response: ANGLE 22.57933061 22.0543713
Pico response: Debug: Received 'ANGLE 22.57933061 22.0543713'
Pico response: MPU: 22.40098 155.544
Run 1 - Command 3
Target angles - Pitch: 22.58, Yaw: 22.05
Actual angles - Pitch: 22.40, Yaw: 155.54
Errors - Pitch: 0.18, Yaw: 133.49
Corrected servo yaw: 80.00

Pico response: ANGLE 24.25072307 24.21719581
Pico response: Debug: Received 

In [3]:
# for second set of data
import serial
import time
import math
import csv
from gekko import GEKKO
import numpy as np
import matplotlib.pyplot as plt

ser = serial.Serial(
    port='COM6',
    baudrate=115200,
    timeout=1
)

# List to store results for each run
results = {i: [] for i in range(1, 55)}  # Initialize results dict for 54 runs

def send_command(command):
    ser.reset_input_buffer()
    ser.write(f"{command}\r\n".encode())
    time.sleep(0.01)
    
    while ser.in_waiting:
        response = ser.readline().decode().strip()
        if response:
            print(f"Pico response: {response}")
            if response.startswith("MPU:"):
                return [float(x) for x in response[4:].split()]
    return None

def send_angles(pitch, yaw):
    command = f"ANGLE {pitch} {yaw}"
    return send_command(command)

def setup_mpc():
    m = GEKKO()
    m.time = np.linspace(0, 2, 21)
    
    yaw_measured = m.CV(value=0)
    servo_angle = m.MV(value=90)
    
    servo_angle.LOWER = 0
    servo_angle.UPPER = 180
    servo_angle.DMAX = 10
    servo_angle.STATUS = 1
    
    yaw_measured.STATUS = 1
    
    K = 0.8
    tau = 0.5
    m.Equation(tau * yaw_measured.dt() == -yaw_measured + K * servo_angle)
    
    m.options.IMODE = 6
    m.options.CV_TYPE = 2
    m.options.NODES = 3
    
    return m, yaw_measured, servo_angle

def correct_yaw_mpc(target_yaw, actual_yaw, current_servo_yaw):
    if not hasattr(correct_yaw_mpc, 'mpc'):
        correct_yaw_mpc.mpc, correct_yaw_mpc.yaw_cv, correct_yaw_mpc.servo_mv = setup_mpc()
    
    m = correct_yaw_mpc.mpc
    yaw_cv = correct_yaw_mpc.yaw_cv
    servo_mv = correct_yaw_mpc.servo_mv
    
    yaw_cv.MEAS = actual_yaw
    yaw_cv.SP = target_yaw
    
    try:
        m.solve(disp=False)
        new_servo_yaw = float(servo_mv.NEWVAL)
        new_servo_yaw = max(0, min(180, new_servo_yaw))
        return new_servo_yaw
    except:
        print("MPC solution failed, using current angle")
        return current_servo_yaw

try:
    time.sleep(0.01)
    ser.reset_input_buffer()
    
    # Read from angle_tracking_all_hr.csv
    ########
    # INPUT FILE HERE
    ########
    with open('angle_tracking_all_hr.csv', 'r') as file:
        csv_reader = csv.DictReader(file)
        
        # Process each run (1-54)
        for run in range(1, 55):
            # Reset pitch to 25 degrees before each run
            _ = send_angles(25, 90)  # Reset to 25 degrees pitch, neutral yaw
            time.sleep(0.5)  # Give time for servo to reach position
            
            command_count = 0
            pitch_col = f'pitch_{run}'
            yaw_col = f'yaw_{run}'
            
            # Reset file pointer for each run
            file.seek(0)
            next(csv_reader)  # Skip header
            
                
                current_servo_yaw = target_yaw
                actual_angles = send_angles(target_pitch, current_servo_yaw)
                
                    
                target_pitch = float(row[pitch_col])
                target_yaw = float(row[yaw_col])
                
                # Ensure angles stay within bounds
                target_pitch = max(0, min(180, target_pitch))
                target_yaw = max(0, min(180, target_yaw))
                
                current_servo_yaw = target_yaw
                actual_angles = send_angles(target_pitch, current_servo_yaw)
                
                if actual_angles:
                    actual_pitch, actual_yaw = actual_angles
                    pitch_error = abs(target_pitch - actual_pitch)
                    yaw_error = abs(target_yaw - actual_yaw)
                    
                    # Store results for this run
                    results[run].append({
                        f'pitch_{run}': target_pitch,
                        f'measured_pitch_{run}': actual_pitch,
                        f'err_{run}': pitch_error,
                        f'yaw_{run}': target_yaw,
                        f'measured_yaw_{run}': actual_yaw,
                        f'yaw_err_{run}': yaw_error
                    })
                    
                    print(f"Run {run} - Command {command_count + 1}")
                    print(f"Target angles - Pitch: {target_pitch:.2f}, Yaw: {target_yaw:.2f}")
                    print(f"Actual angles - Pitch: {actual_pitch:.2f}, Yaw: {actual_yaw:.2f}")
                    print(f"Errors - Pitch: {pitch_error:.2f}, Yaw: {yaw_error:.2f}")
                    
                    current_servo_yaw = correct_yaw_mpc(target_yaw, actual_yaw, current_servo_yaw)
                    print(f"Corrected servo yaw: {current_servo_yaw:.2f}\n")
                
                command_count += 1
                time.sleep(0.01)

except KeyboardInterrupt:
    print("\nOperation interrupted by user")

finally:
    # Save all results to a single CSV
    all_data = []
    max_rows = 0
    
    # First find the maximum number of rows across all runs
    for run in range(1, 55):
        if results[run]:
            max_rows = max(max_rows, len(results[run]))
    
    # Create the combined data
    for i in range(max_rows):
        row_data = {}
        for run in range(1, 55):
            if results[run] and i < len(results[run]):
                row_data.update(results[run][i])
        all_data.append(row_data)
    
    if all_data:
        #########
        # OUTPUT FILE NAME HERE
        ########
        filename = 'combined_results2.csv'  # Changed filename
        fieldnames = []
        for run in range(1, 55):
            if results[run]:
                fieldnames.extend(list(results[run][0].keys()))
        
        with open(filename, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(all_data)
            print(f"All results saved to {filename}")
    
    ser.close()
    print("Serial connection closed")


IndentationError: unexpected indent (1530206307.py, line 106)

In [2]:
import serial
import time
import math
import csv
from gekko import GEKKO
import numpy as np
import matplotlib.pyplot as plt

# Configure serial connection
ser = serial.Serial(
    port='COM6',
    baudrate=115200,
    timeout=1
)

def perform_yaw_sensitivity_analysis():
    # Define test parameters
    K_values = [0.5, 0.8, 1.0]  # Process gains 
    tau_values = [0.3, 0.5, 0.7]  # Time constants
    results = {}

    # Run simulations for different parameter combinations
    for K in K_values:
        for tau in tau_values:
            m = GEKKO()
            m.time = np.linspace(0, 5, 51)  # 5 second simulation
            
            # System variables
            yaw = m.Var(value=90)  # Start at center position
            servo_input = m.MV(value=90)
            servo_input.STATUS = 0  # Not used for optimization
            
            # System dynamics
            m.Equation(tau * yaw.dt() == -yaw + K * servo_input)
            
            # Apply step input
            servo_input.VALUE = 135  # 45 degree step change
            
            # Simulation settings
            m.options.IMODE = 4  # Dynamic simulation mode
            m.solve(disp=False)
            
            # Store results
            results[f'K={K}, τ={tau}'] = {
                'time': m.time,
                'yaw': yaw.VALUE,
                'input': servo_input.VALUE
            }

    return results

# Run sensitivity analysis
sensitivity_results = perform_yaw_sensitivity_analysis()

# Plot results
plt.figure(figsize=(12, 6))
for label, data in sensitivity_results.items():
    plt.plot(data['time'], data['yaw'], label=label)
plt.xlabel('Time (s)')
plt.ylabel('Yaw Angle (degrees)')
plt.title('Yaw Sensitivity Analysis: Response to 45° Step Input')
plt.legend()
plt.grid(True)
plt.savefig('yaw_sensitivity.png')
plt.close()

# Close serial connection
ser.close()
print("Analysis complete. Results saved to yaw_sensitivity.png")

SerialException: could not open port 'COM6': FileNotFoundError(2, 'The system cannot find the file specified.', None, 2)

In [2]:
if True:
    # Perform sensitivity analysis
    sensitivity_results = perform_sensitivity_analysis()
    
    # Plot sensitivity analysis results
    plt.figure(figsize=(12, 6))
    for label, data in sensitivity_results.items():
        plt.plot(data['time'], data['pitch'], label=label)
    plt.xlabel('Time (s)')
    plt.ylabel('Pitch Angle')
    plt.title('Sensitivity Analysis: System Response to Step Change')
    plt.legend()
    plt.grid(True)
    plt.savefig('sensitivity_analysis3.png')
    plt.close()
    
    # Simulate control optimization
    optimization_results = simulate_control_optimization()
    
    # Plot optimization results
    plt.figure(figsize=(12, 6))
    plt.subplot(2, 1, 1)
    plt.plot(optimization_results['time'], optimization_results['pitch'], 'b-', label='Pitch')
    plt.plot(optimization_results['time'], optimization_results['setpoint'], 'r--', label='Setpoint')
    plt.ylabel('Pitch Angle')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(2, 1, 2)
    plt.plot(optimization_results['time'], optimization_results['servo'], 'g-', label='Servo Command')
    plt.xlabel('Time (s)')
    plt.ylabel('Servo Angle')
    plt.legend()
    plt.grid(True)
    plt.savefig('optimization_results3.png')


NameError: name 'perform_sensitivity_analysis' is not defined

In [6]:
with open('results.csv', 'w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=['Target Pitch', 'Actual Pitch', 'Error'])
    writer.writeheader()
    writer.writerows(results)
    print("\nResults saved to results.csv")


Results saved to results.csv
