# PID Parameter Optimization using Q-Learning
## Optimizing Kp, Ki, Kd for TF.slx using Reinforcement Learning

## Cell 1: Imports and Setup

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import time
import warnings
warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8-darkgrid')
%matplotlib inline

print("Imports successful!")

## Cell 2: MATLAB Engine Connection

In [None]:
import matlab.engine
import os

# Start MATLAB engine
print("Starting MATLAB engine...")
eng = matlab.engine.start_matlab()
print("MATLAB engine started successfully!")

# Set working directory
work_dir = '/Users/franszhafran/code/master/ai/ai-final-project'
eng.cd(work_dir, nargout=0)
print(f"Working directory set to: {work_dir}")

# Load EQ1.mat
print("Loading EQ1.mat...")
eng.load('EQ1.mat', nargout=0)
print("EQ1.mat loaded successfully!")

## Cell 3: Simulink Runner Function

In [None]:
def run_simulink(Kp, Ki, Kd, eng):
    """
    Run Simulink model with given PID parameters.
    
    Args:
        Kp (float): Proportional gain
        Ki (float): Integral gain
        Kd (float): Derivative gain
        eng: MATLAB engine instance
        
    Returns:
        float: RMSE value from simulation, or None if simulation failed
    """
    try:
        # Set PID parameters in MATLAB workspace
        eng.assignin('base', 'Kp', float(Kp), nargout=0)
        eng.assignin('base', 'Ki', float(Ki), nargout=0)
        eng.assignin('base', 'Kd', float(Kd), nargout=0)
        
        # Run simulation
        eng.sim('TF', nargout=0)
        
        # Retrieve RMSE value
        rmse_result = eng.evalin('base', 'rmse(end)')
        rmse = float(rmse_result)
        
        return rmse
    
    except Exception as e:
        print(f"Simulation error with Kp={Kp}, Ki={Ki}, Kd={Kd}: {str(e)}")
        return None

print("Simulink runner function defined successfully!")

## Cell 4: Q-Learning Environment Setup

In [None]:
# Define parameter ranges and discretization
KP_MIN, KP_MAX, KP_STEP = 0.1, 10.0, 0.5
KI_MIN, KI_MAX, KI_STEP = 0.01, 5.0, 0.25
KD_MIN, KD_MAX, KD_STEP = 0.01, 2.0, 0.1

# Create parameter ranges
kp_range = np.arange(KP_MIN, KP_MAX + KP_STEP/2, KP_STEP)
ki_range = np.arange(KI_MIN, KI_MAX + KI_STEP/2, KI_STEP)
kd_range = np.arange(KD_MIN, KD_MAX + KD_STEP/2, KD_STEP)

print(f"Kp range: {len(kp_range)} values from {KP_MIN} to {KP_MAX}")
print(f"Ki range: {len(ki_range)} values from {KI_MIN} to {KI_MAX}")
print(f"Kd range: {len(kd_range)} values from {KD_MIN} to {KD_MAX}")
print(f"Total state space size: {len(kp_range) * len(ki_range) * len(kd_range)}")

# Define action space
# Actions: 0=increase Kp, 1=decrease Kp, 2=increase Ki, 3=decrease Ki, 4=increase Kd, 5=decrease Kd
NUM_ACTIONS = 6

# Q-Learning hyperparameters
ALPHA = 0.1  # Learning rate
GAMMA = 0.95  # Discount factor
EPSILON = 1.0  # Initial exploration rate
EPSILON_DECAY = 0.995  # Exploration decay
EPSILON_MIN = 0.01  # Minimum exploration rate
NUM_EPISODES = 200  # Number of training episodes

# Initialize Q-table as dictionary
Q_table = defaultdict(lambda: np.zeros(NUM_ACTIONS))

print(f"\nQ-Learning Configuration:")
print(f"  Learning rate (alpha): {ALPHA}")
print(f"  Discount factor (gamma): {GAMMA}")
print(f"  Initial exploration rate: {EPSILON}")
print(f"  Epsilon decay: {EPSILON_DECAY}")
print(f"  Training episodes: {NUM_EPISODES}")

In [None]:
def action_to_string(action):
    """Convert action index to readable description."""
    action_map = {
        0: "â†‘ Increase Kp",
        1: "â†“ Decrease Kp",
        2: "â†‘ Increase Ki",
        3: "â†“ Decrease Ki",
        4: "â†‘ Increase Kd",
        5: "â†“ Decrease Kd"
    }
    return action_map.get(action, f"Unknown action {action}")

def calculate_q_stats(Q_table):
    """Calculate Q-value statistics across entire table."""
    all_q_values = []
    for state in Q_table:
        all_q_values.extend(Q_table[state])

    if not all_q_values:
        return {'avg': 0, 'max': 0, 'min': 0, 'std': 0}

    all_q_values = np.array(all_q_values)
    return {
        'avg': np.mean(all_q_values),
        'max': np.max(all_q_values),
        'min': np.min(all_q_values),
        'std': np.std(all_q_values)
    }

print("Helper functions defined successfully!")

## Cell 6: Helper Functions for Debugging

In [None]:
# Debugging and Logging Configuration
VERBOSE_LEVEL = 1  # 0=minimal, 1=per-episode+events (DEFAULT), 2=per-step, 3=full debug
LOG_Q_VALUES = True  # Track Q-value evolution (user priority)
TRACK_CONVERGENCE = True  # Monitor parameter convergence (user priority)
CONVERGENCE_WINDOW = 50   # Window size for convergence detection
CONVERGENCE_THRESHOLD = 0.001  # RMSE variance threshold for convergence

print("Debugging configuration loaded:")
print(f"  VERBOSE_LEVEL: {VERBOSE_LEVEL}")
print(f"  LOG_Q_VALUES: {LOG_Q_VALUES}")
print(f"  TRACK_CONVERGENCE: {TRACK_CONVERGENCE}")

## Cell 5: Debugging and Logging Configuration

## Cell 5: State and Action Functions

In [None]:
def params_to_state(kp, ki, kd):
    """
    Convert continuous PID parameters to state representation.
    
    Returns:
        tuple: (kp_idx, ki_idx, kd_idx) - indices in the discretized ranges
    """
    kp_idx = int(np.round((kp - KP_MIN) / KP_STEP))
    ki_idx = int(np.round((ki - KI_MIN) / KI_STEP))
    kd_idx = int(np.round((kd - KD_MIN) / KD_STEP))
    
    # Clamp to valid range
    kp_idx = np.clip(kp_idx, 0, len(kp_range) - 1)
    ki_idx = np.clip(ki_idx, 0, len(ki_range) - 1)
    kd_idx = np.clip(kd_idx, 0, len(kd_range) - 1)
    
    return (kp_idx, ki_idx, kd_idx)

def state_to_params(state):
    """
    Convert state representation to continuous PID parameters.
    
    Args:
        state: tuple (kp_idx, ki_idx, kd_idx)
        
    Returns:
        tuple: (Kp, Ki, Kd) continuous values
    """
    kp_idx, ki_idx, kd_idx = state
    kp = kp_range[kp_idx]
    ki = ki_range[ki_idx]
    kd = kd_range[kd_idx]
    return (kp, ki, kd)

def take_action(state, action):
    """
    Take an action in the state space.
    
    Args:
        state: tuple (kp_idx, ki_idx, kd_idx)
        action: int from 0 to 5
        
    Returns:
        tuple: new_state
    """
    kp_idx, ki_idx, kd_idx = state
    
    if action == 0:  # Increase Kp
        kp_idx = min(kp_idx + 1, len(kp_range) - 1)
    elif action == 1:  # Decrease Kp
        kp_idx = max(kp_idx - 1, 0)
    elif action == 2:  # Increase Ki
        ki_idx = min(ki_idx + 1, len(ki_range) - 1)
    elif action == 3:  # Decrease Ki
        ki_idx = max(ki_idx - 1, 0)
    elif action == 4:  # Increase Kd
        kd_idx = min(kd_idx + 1, len(kd_range) - 1)
    elif action == 5:  # Decrease Kd
        kd_idx = max(kd_idx - 1, 0)
    
    return (kp_idx, ki_idx, kd_idx)

print("State and action functions defined successfully!")

## Cell 6: Reward Function

In [None]:
def calculate_reward(rmse):
    """
    Convert RMSE to reward signal.
    Lower RMSE = higher reward (less error = more positive reward).
    
    Args:
        rmse (float): Root Mean Square Error from simulation
        
    Returns:
        float: Reward value
    """
    if rmse is None:
        return -100  # Penalty for simulation failure
    return -rmse  # Negative RMSE as reward (minimizing RMSE)

print("Reward function defined successfully!")

## Cell 7: Q-Learning Training Loop

In [None]:
# Initialize training variables
epsilon = EPSILON
training_history = {
    'episode': [],
    'rmse': [],
    'best_rmse': [],
    'kp': [],
    'ki': [],
    'kd': [],
    'epsilon': []
}

# Q-Learning tracking (user priority: Q-learning internals)
q_value_history = {
    'episode': [],
    'avg_q_value': [],
    'max_q_value': [],
    'min_q_value': [],
    'q_value_std': []
}

# Convergence tracking (user priority: parameter convergence)
convergence_status = {
    'is_converged': False,
    'convergence_episode': None,
    'steps_without_improvement': 0
}

simulation_stats = {
    'total_sims': 0,
    'successful_sims': 0,
    'failed_sims': 0
}

best_rmse = float('inf')
best_params = None

# Initial state: use middle values of parameter ranges
initial_kp_idx = len(kp_range) // 2
initial_ki_idx = len(ki_range) // 2
initial_kd_idx = len(kd_range) // 2
initial_state = (initial_kp_idx, initial_ki_idx, initial_kd_idx)

print(f"Starting Q-Learning Training...")
print(f"Initial state parameters: {state_to_params(initial_state)}")
print("\n" + "="*70)

start_time = time.time()

for episode in range(NUM_EPISODES):
    state = initial_state
    episode_rmse = []
    episode_td_errors = []
    episode_q_updates = []
    
    # Episode loop
    for step in range(10):  # 10 steps per episode
        # Epsilon-greedy action selection
        is_explore = np.random.random() < epsilon
        if is_explore:
            # Explore: random action
            action = np.random.randint(0, NUM_ACTIONS)
        else:
            # Exploit: best known action
            action = np.argmax(Q_table[state])
        
        # Take action and get new state
        new_state = take_action(state, action)
        
        # Run Simulink and get reward
        kp, ki, kd = state_to_params(new_state)
        rmse = run_simulink(kp, ki, kd, eng)
        reward = calculate_reward(rmse)
        
        # Track simulation statistics
        simulation_stats['total_sims'] += 1
        if rmse is not None:
            simulation_stats['successful_sims'] += 1
        else:
            simulation_stats['failed_sims'] += 1
            if VERBOSE_LEVEL >= 1:
                print(f"  âš  Simulation failed: Kp={kp:.4f}, Ki={ki:.4f}, Kd={kd:.4f}")
        
        if rmse is not None:
            episode_rmse.append(rmse)
            
            # Enhanced logging when best parameters update (KEY EVENT)
            if rmse < best_rmse:
                old_best = best_rmse
                best_rmse = rmse
                best_params = (kp, ki, kd)
                convergence_status['steps_without_improvement'] = 0

                if VERBOSE_LEVEL >= 1:
                    improvement = ((old_best - best_rmse) / old_best) * 100
                    print(f"  âœ“ NEW BEST! RMSE: {old_best:.6f} â†’ {best_rmse:.6f} ({improvement:.2f}% better)")
                    print(f"    Parameters: Kp={kp:.4f}, Ki={ki:.4f}, Kd={kd:.4f}")
            else:
                convergence_status['steps_without_improvement'] += 1
        
        # Store Q-value before update for tracking (user priority)
        if LOG_Q_VALUES:
            q_before = Q_table[state][action]
        
        # Q-Learning update
        max_next_q = np.max(Q_table[new_state])
        Q_table[state][action] = Q_table[state][action] + ALPHA * (reward + GAMMA * max_next_q - Q_table[state][action])
        
        # Track Q-value change (user priority: Q-learning internals)
        if LOG_Q_VALUES:
            q_after = Q_table[state][action]
            td_error = reward + GAMMA * max_next_q - q_before
            episode_td_errors.append(abs(td_error))
            episode_q_updates.append(abs(q_after - q_before))
        
        state = new_state
    
    # Decay exploration rate
    epsilon = max(EPSILON_MIN, epsilon * EPSILON_DECAY)
    
    # Calculate Q-value statistics for this episode (user priority)
    if LOG_Q_VALUES:
        all_q_values = []
        for s in Q_table:
            all_q_values.extend(Q_table[s])

        q_value_history['episode'].append(episode + 1)
        q_value_history['avg_q_value'].append(np.mean(all_q_values))
        q_value_history['max_q_value'].append(np.max(all_q_values))
        q_value_history['min_q_value'].append(np.min(all_q_values))
        q_value_history['q_value_std'].append(np.std(all_q_values))

        avg_td_error = np.mean(episode_td_errors) if episode_td_errors else 0
        avg_q_update = np.mean(episode_q_updates) if episode_q_updates else 0
    
    # Record episode statistics
    avg_rmse = np.mean(episode_rmse) if episode_rmse else None
    if avg_rmse is not None:
        training_history['episode'].append(episode + 1)
        training_history['rmse'].append(avg_rmse)
        training_history['best_rmse'].append(best_rmse)
        training_history['epsilon'].append(epsilon)
        
        if best_params:
            training_history['kp'].append(best_params[0])
            training_history['ki'].append(best_params[1])
            training_history['kd'].append(best_params[2])
    
    # Enhanced per-episode logging (VERBOSE_LEVEL >= 1, default)
    if VERBOSE_LEVEL >= 1 and avg_rmse is not None:
        success_rate = (simulation_stats['successful_sims'] /
                       simulation_stats['total_sims'] * 100)

        print(f"Ep {episode + 1:3d}/{NUM_EPISODES} | "
              f"RMSE: {avg_rmse:.6f} | "
              f"Best: {best_rmse:.6f} | "
              f"Îµ: {epsilon:.4f} | "
              f"Success: {success_rate:.1f}%", end="")

        if LOG_Q_VALUES:
            print(f" | Q-avg: {q_value_history['avg_q_value'][-1]:.4f} | "
                  f"TD-err: {avg_td_error:.4f}")
        else:
            print()
    
    # Check convergence (user priority: parameter convergence)
    if TRACK_CONVERGENCE and len(training_history['rmse']) >= CONVERGENCE_WINDOW:
        recent_rmse = training_history['rmse'][-CONVERGENCE_WINDOW:]
        rmse_variance = np.var(recent_rmse)

        if rmse_variance < CONVERGENCE_THRESHOLD and not convergence_status['is_converged']:
            convergence_status['is_converged'] = True
            convergence_status['convergence_episode'] = episode + 1

            if VERBOSE_LEVEL >= 1:
                print(f"  ðŸŽ¯ CONVERGENCE DETECTED at Episode {episode + 1}")
                print(f"    RMSE variance over last {CONVERGENCE_WINDOW} episodes: {rmse_variance:.6f}")
                print(f"    Steps without improvement: {convergence_status['steps_without_improvement']}")

elapsed_total = time.time() - start_time
print("="*70)
print(f"\nTraining completed in {elapsed_total:.1f} seconds")
print(f"\nBest Parameters Found:")
print(f"  Kp = {best_params[0]:.4f}")
print(f"  Ki = {best_params[1]:.4f}")
print(f"  Kd = {best_params[2]:.4f}")
print(f"  RMSE = {best_rmse:.6f}")

In [None]:
# Visualize Q-value evolution (user priority)
if LOG_Q_VALUES and len(q_value_history['episode']) > 0:
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

    # Plot 1: Q-value evolution over episodes
    ax1.plot(q_value_history['episode'], q_value_history['avg_q_value'],
             label='Avg Q-value', linewidth=2, color='blue')
    ax1.fill_between(q_value_history['episode'],
                     q_value_history['min_q_value'],
                     q_value_history['max_q_value'],
                     alpha=0.2, color='blue', label='Min-Max Range')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Q-Value')
    ax1.set_title('Q-Value Evolution During Training')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Plot 2: RMSE vs Q-value correlation
    ax2.scatter(q_value_history['avg_q_value'][:len(training_history['best_rmse'])], 
                training_history['best_rmse'][:len(q_value_history['avg_q_value'])],
                alpha=0.6, s=30, color='red')
    ax2.set_xlabel('Average Q-Value')
    ax2.set_ylabel('Best RMSE')
    ax2.set_title('Q-Value vs Performance Correlation')
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('q_value_analysis.png', dpi=150, bbox_inches='tight')
    plt.show()

    print("\nQ-value analysis saved as 'q_value_analysis.png'")

## Cell 7b: Q-Value Visualization

In [None]:
# Print focused training statistics
print("\n" + "="*70)
print("TRAINING SUMMARY - CONVERGENCE & Q-LEARNING ANALYSIS")
print("="*70)

# Convergence Analysis
print(f"\nConvergence Status:")
if convergence_status['is_converged']:
    print(f"  âœ“ Converged at episode {convergence_status['convergence_episode']}")
    print(f"  Remaining episodes after convergence: "
          f"{NUM_EPISODES - convergence_status['convergence_episode']}")
else:
    print(f"  âš  Did not converge within {NUM_EPISODES} episodes")
print(f"  Final steps without improvement: {convergence_status['steps_without_improvement']}")

# Q-Learning Internals
if LOG_Q_VALUES and len(q_value_history['episode']) > 0:
    print(f"\nQ-Value Evolution:")
    print(f"  Initial avg Q-value: {q_value_history['avg_q_value'][0]:.6f}")
    print(f"  Final avg Q-value: {q_value_history['avg_q_value'][-1]:.6f}")
    print(f"  Q-value range: [{q_value_history['min_q_value'][-1]:.6f}, "
          f"{q_value_history['max_q_value'][-1]:.6f}]")
    print(f"  Final Q-value std dev: {q_value_history['q_value_std'][-1]:.6f}")

    # Detect if Q-values are converging
    if len(q_value_history['avg_q_value']) >= 20:
        recent_change = abs(q_value_history['avg_q_value'][-1] -
                          q_value_history['avg_q_value'][-20])
        print(f"  Q-value change (last 20 episodes): {recent_change:.6f}")

print(f"\nState Space Exploration:")
print(f"  Unique states visited: {len(Q_table)}")
total_states = len(kp_range) * len(ki_range) * len(kd_range)
print(f"  State space coverage: {len(Q_table)/total_states*100:.2f}% "
      f"({len(Q_table)}/{total_states})")

print(f"\nSimulation Statistics:")
success_rate = (simulation_stats['successful_sims']/simulation_stats['total_sims']*100
                if simulation_stats['total_sims'] > 0 else 0)
print(f"  Total simulations: {simulation_stats['total_sims']}")
print(f"  Success rate: {success_rate:.2f}% "
      f"({simulation_stats['successful_sims']}/{simulation_stats['total_sims']})")
if simulation_stats['failed_sims'] > 0:
    print(f"  âš  Failed simulations: {simulation_stats['failed_sims']}")

## Cell 7a: Post-Training Debug Summary

## Cell 8: Results Visualization

In [None]:
# Create comprehensive visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: RMSE over episodes
ax1 = axes[0, 0]
ax1.plot(training_history['episode'], training_history['rmse'], label='Episode Avg RMSE', alpha=0.7)
ax1.plot(training_history['episode'], training_history['best_rmse'], label='Best RMSE', linewidth=2)
ax1.set_xlabel('Episode')
ax1.set_ylabel('RMSE')
ax1.set_title('Training Progress: RMSE over Episodes')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Plot 2: Parameter evolution - Kp
ax2 = axes[0, 1]
ax2.plot(training_history['episode'], training_history['kp'], color='red', label='Kp', linewidth=2)
ax2.set_xlabel('Episode')
ax2.set_ylabel('Kp Value')
ax2.set_title('Proportional Gain (Kp) Evolution')
ax2.grid(True, alpha=0.3)
ax2.axhline(y=best_params[0], color='r', linestyle='--', alpha=0.5)

# Plot 3: Parameter evolution - Ki
ax3 = axes[1, 0]
ax3.plot(training_history['episode'], training_history['ki'], color='green', label='Ki', linewidth=2)
ax3.set_xlabel('Episode')
ax3.set_ylabel('Ki Value')
ax3.set_title('Integral Gain (Ki) Evolution')
ax3.grid(True, alpha=0.3)
ax3.axhline(y=best_params[1], color='g', linestyle='--', alpha=0.5)

# Plot 4: Parameter evolution - Kd
ax4 = axes[1, 1]
ax4.plot(training_history['episode'], training_history['kd'], color='blue', label='Kd', linewidth=2)
ax4.set_xlabel('Episode')
ax4.set_ylabel('Kd Value')
ax4.set_title('Derivative Gain (Kd) Evolution')
ax4.grid(True, alpha=0.3)
ax4.axhline(y=best_params[2], color='b', linestyle='--', alpha=0.5)

plt.tight_layout()
plt.savefig('pid_training_results.png', dpi=150, bbox_inches='tight')
plt.show()

print("\nTraining visualization saved as 'pid_training_results.png'")

## Cell 9: Q-Table Statistics

In [None]:
# Analyze Q-table
q_values = []
for state in Q_table:
    q_values.extend(Q_table[state])

q_values = np.array(q_values)

print("Q-Table Statistics:")
print(f"  Total states explored: {len(Q_table)}")
print(f"  Total Q-values: {len(q_values)}")
print(f"  Mean Q-value: {np.mean(q_values):.6f}")
print(f"  Min Q-value: {np.min(q_values):.6f}")
print(f"  Max Q-value: {np.max(q_values):.6f}")
print(f"  Std Q-value: {np.std(q_values):.6f}")

## Cell 10: Validation Run with Best Parameters

In [None]:
print("\nValidation Run with Best Parameters Found")
print("="*70)

# Run simulation with best parameters
final_rmse = run_simulink(best_params[0], best_params[1], best_params[2], eng)

print(f"\nFinal Validation Results:")
print(f"  Kp = {best_params[0]:.6f}")
print(f"  Ki = {best_params[1]:.6f}")
print(f"  Kd = {best_params[2]:.6f}")
print(f"  RMSE = {final_rmse:.6f}")
print("="*70)

# Compare with initial parameters
initial_kp, initial_ki, initial_kd = state_to_params(initial_state)
initial_rmse = run_simulink(initial_kp, initial_ki, initial_kd, eng)

print(f"\nComparison with Initial Parameters:")
print(f"  Initial: Kp={initial_kp:.6f}, Ki={initial_ki:.6f}, Kd={initial_kd:.6f}, RMSE={initial_rmse:.6f}")
print(f"  Optimized: Kp={best_params[0]:.6f}, Ki={best_params[1]:.6f}, Kd={best_params[2]:.6f}, RMSE={final_rmse:.6f}")
improvement = ((initial_rmse - final_rmse) / initial_rmse) * 100
print(f"  Improvement: {improvement:.2f}%")

## Cell 11: Export Results

In [None]:
# Save results to file
import json

results = {
    'best_parameters': {
        'Kp': float(best_params[0]),
        'Ki': float(best_params[1]),
        'Kd': float(best_params[2]),
    },
    'best_rmse': float(best_rmse),
    'training_episodes': NUM_EPISODES,
    'training_time_seconds': elapsed_total,
    'improvement_percent': improvement
}

with open('pid_optimization_results.json', 'w') as f:
    json.dump(results, f, indent=4)

print("\nResults exported to 'pid_optimization_results.json'")
print("\nFinal Results Summary:")
print(json.dumps(results, indent=2))

## Cell 12: Cleanup

In [None]:
# Close MATLAB engine
print("Closing MATLAB engine...")
eng.quit()
print("MATLAB engine closed successfully!")
print("\nOptimization workflow completed!")