# Hardware Performance Testing

Test TinyMPC hardware performance by measuring execution time as a function of max_iter for different bitstreams

## 1. Setup and Imports

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import time
from pathlib import Path
import pandas as pd
from scipy import stats

# Add driver path
os.chdir("/home/xilinx/jupyter_notebooks/zhenyu/tinympc_ip_gen/")

sys.path.append('driver')
from tinympc_hw import tinympc_hw

# Import dynamics for test problem setup
from dynamics import LinearizedQuadcopterDynamics, CrazyflieParams, NoiseModel

print("All modules imported successfully")

In [None]:
# Initialize dynamics model for test problem
params = CrazyflieParams()
noise_model = NoiseModel()
dynamics = LinearizedQuadcopterDynamics(params, noise_model)

# Generate system matrices
control_freq = 100.0  # Hz
A, B = dynamics.generate_system_matrices(control_freq)
Q, R = dynamics.generate_cost_matrices()
constraints = dynamics.generate_constraints()

# System dimensions (fixed for quadrotor)
nx = 12  # State dimension
nu = 4   # Control dimension

print(f"Test problem configured:")
print(f"  State dimension (nx): {nx}")
print(f"  Control dimension (nu): {nu}")
print(f"  Note: Prediction horizon (N) will be extracted from each bitstream")

In [None]:
# Generate test data
def generate_test_data(nx, nu, N):
    """Generate random test data for MPC problem"""
    np.random.seed(42)  # For reproducibility
    
    # Initial state with some deviation from origin
    x0 = np.random.randn(nx) * 0.1
    x0[2] = 1.0  # Set altitude to 1m
    
    # Reference trajectory (hover at origin)
    xref = np.zeros((N, nx))
    xref[:, 2] = 1.0  # Reference altitude
    
    # Reference control (hover)
    uref = np.zeros((N-1, nu))
    
    return x0, xref, uref

def extract_N_from_bitstream(bitstream_path):
    """Extract N parameter from bitstream filename"""
    import re
    filename = bitstream_path.name if hasattr(bitstream_path, 'name') else str(bitstream_path)
    pattern = r'tinympcproj_N(\d+)_'
    match = re.search(pattern, filename)
    if match:
        return int(match.group(1))
    else:
        print(f"Warning: Could not extract N from {filename}, using default N=5")
        return 5

# Note: Test data will be generated per bitstream with correct N
print("Test data generation functions defined")

In [None]:
# Find all bitstream files in subdirectories
import os
from pathlib import Path
import glob

def find_all_bitstreams(base_path="."):
    """Find all .bit files in the project directory and subdirectories"""
    bitstream_files = []
    
    # Search for .bit files recursively
    search_pattern = os.path.join(base_path, "**", "*.bit")
    found_files = glob.glob(search_pattern, recursive=True)
    
    # Convert to Path objects and filter out any invalid paths
    for file_path in found_files:
        path_obj = Path(file_path)
        if path_obj.exists() and path_obj.is_file():
            bitstream_files.append(path_obj)
    
    # Also check specific known locations
    known_dirs = ["bitstream", "impl", "output", "build"]
    for dir_name in known_dirs:
        dir_path = Path(base_path) / dir_name
        if dir_path.exists() and dir_path.is_dir():
            bit_files = list(dir_path.glob("*.bit"))
            for bit_file in bit_files:
                if bit_file not in bitstream_files:
                    bitstream_files.append(bit_file)
    
    return sorted(bitstream_files)

def test_bitstream_performance(bitstream_path, test_maxiter_values=[10, 100, 1000], num_trials=10):
    """
    Test a bitstream with specific max_iter values, with warmup run
    
    Args:
        bitstream_path: Path to bitstream file
        test_maxiter_values: List of max_iter values to test (default: [10, 100, 1000])
        num_trials: Number of trials per max_iter value (excluding warmup)
    
    Returns:
        dict: Results with average execution times
    """
    # Extract N from bitstream filename
    N = extract_N_from_bitstream(bitstream_path)
    
    # Generate test data with correct dimensions
    nx = 12  # State dimension (fixed for quadrotor)
    nu = 4   # Control dimension (fixed for quadrotor)
    x0_test, xref_test, uref_test = generate_test_data(nx, nu, N)
    
    # Initialize hardware solver
    hw_solver = tinympc_hw(bitstream_path=str(bitstream_path))
    
    results = {
        'bitstream': str(bitstream_path),
        'N': N,
        'maxiter_values': test_maxiter_values,
        'avg_times': {},
        'all_times': {}
    }
    
    for max_iter in test_maxiter_values:
        # Set check_termination equal to max_iter as requested
        check_termination = max_iter
        hw_solver.setup(max_iter=max_iter, check_termination=check_termination, verbose=0)
        
        # Warmup run (first run to prepare hardware)
        hw_solver.set_x0(x0_test)
        hw_solver.set_x_ref(xref_test)
        hw_solver.set_u_ref(uref_test)
        hw_solver.solve(timeout=1.0)  # Warmup - don't record this time
        
        # Actual timing runs
        times = []
        for trial in range(num_trials):
            # Set problem data
            hw_solver.set_x0(x0_test)
            hw_solver.set_x_ref(xref_test)
            hw_solver.set_u_ref(uref_test)
            
            # Measure execution time
            start_time = time.perf_counter()
            success = hw_solver.solve(timeout=1.0)
            end_time = time.perf_counter()
            
            if success:
                exec_time = (end_time - start_time) * 1000  # Convert to ms
                times.append(exec_time)
        
        if len(times) > 0:
            results['avg_times'][max_iter] = np.mean(times)
            results['all_times'][max_iter] = times
    
    # Cleanup
    hw_solver.cleanup()
    
    return results

# Find all available bitstreams
print("Searching for bitstream files...")
bitstreams = find_all_bitstreams()

if len(bitstreams) == 0:
    print("No bitstream files found. Looking in current directory and subdirectories...")
    # Try with absolute path
    bitstreams = find_all_bitstreams("/home/xilinx/jupyter_notebooks/zhenyu/tinympc_ip_gen/")

print(f"\nFound {len(bitstreams)} bitstream file(s):")
for idx, bitstream in enumerate(bitstreams):
    print(f"  {idx+1}. {bitstream}")

# Select first bitstream as default if available
if len(bitstreams) > 0:
    selected_bitstream = bitstreams[0]
    print(f"\nDefault selected bitstream: {selected_bitstream}")

In [None]:
# Test all bitstreams with max_iter = 10, 100, 1000
all_test_results = []
test_maxiter_values = [10, 100, 1000]

if len(bitstreams) > 0:
    print(f"\nTesting {len(bitstreams)} bitstream(s) with max_iter values: {test_maxiter_values}")
    print("=" * 80)
    
    for idx, bitstream in enumerate(bitstreams):
        print(f"\n[{idx+1}/{len(bitstreams)}] Testing: {bitstream.name}")
        print("-" * 40)
        
        try:
            # Run performance test with warmup
            results = test_bitstream_performance(
                bitstream, 
                test_maxiter_values=test_maxiter_values,
                num_trials=10  # 10 trials after warmup for each max_iter
            )
            
            all_test_results.append(results)
            
            # Display results immediately
            print(f"  N parameter: {results['N']}")
            print(f"  Average execution times (ms):")
            for max_iter in test_maxiter_values:
                if max_iter in results['avg_times']:
                    avg_time = results['avg_times'][max_iter]
                    std_time = np.std(results['all_times'][max_iter])
                    print(f"    max_iter={max_iter:4d}: {avg_time:8.3f} ± {std_time:.3f} ms")
                    
        except Exception as e:
            print(f"  ERROR: Failed to test bitstream - {e}")
    
    print("\n" + "=" * 80)
    print("TESTING COMPLETE")
    print("=" * 80)
    
else:
    print("No bitstreams found to test.")

# Create summary table of all results
if len(all_test_results) > 0:
    # Build DataFrame for summary
    summary_data = []
    for result in all_test_results:
        row = {
            'Bitstream': Path(result['bitstream']).name,
            'N': result['N'],
        }
        # Add average times for each max_iter value
        for max_iter in test_maxiter_values:
            if max_iter in result['avg_times']:
                row[f'max_iter={max_iter} (ms)'] = f"{result['avg_times'][max_iter]:.3f}"
            else:
                row[f'max_iter={max_iter} (ms)'] = "N/A"
        summary_data.append(row)
    
    df_summary = pd.DataFrame(summary_data)
    
    print("\nSUMMARY TABLE - Average Execution Times")
    print("=" * 80)
    print(df_summary.to_string(index=False))
    print("=" * 80)
    
    # Export to CSV
    csv_filename = 'bitstream_performance_summary.csv'
    df_summary.to_csv(csv_filename, index=False)
    print(f"\nResults exported to: {csv_filename}")

In [None]:
# Visualize linear relationships
if len(linear_results) > 0:
    # Create comprehensive visualization for each bitstream
    for result in linear_results:
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        fig.suptitle(f'Linear Analysis: {result["bitstream"]} (N={result["N"]})', 
                     fontsize=16, fontweight='bold')
        
        # Plot 1: Linear fit with data points
        ax1 = axes[0, 0]
        x = np.array(result['max_iter_values'])
        y = np.array(result['mean_times'])
        yerr = np.array(result['std_times'])
        
        # Plot data points with error bars
        ax1.errorbar(x, y, yerr=yerr, fmt='o', capsize=5, markersize=8, 
                    label='Measured', color='blue', alpha=0.7)
        
        # Plot fitted line
        x_fit = np.linspace(0, max(x) * 1.1, 100)
        y_fit = result['slope'] * x_fit + result['intercept']
        ax1.plot(x_fit, y_fit, 'r--', linewidth=2, 
                label=f'Fit: y = {result["slope"]:.4f}x + {result["intercept"]:.2f}')
        
        # Add equation and R² to plot
        equation_text = f'time = {result["slope"]:.4f} × max_iter + {result["intercept"]:.2f}\n'
        equation_text += f'R² = {result["r_squared"]:.6f}'
        ax1.text(0.05, 0.95, equation_text, transform=ax1.transAxes, 
                fontsize=11, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
        
        ax1.set_xlabel('max_iter', fontsize=12)
        ax1.set_ylabel('Execution Time (ms)', fontsize=12)
        ax1.set_title('Linear Regression Fit', fontsize=14, fontweight='bold')
        ax1.grid(True, alpha=0.3)
        ax1.legend(fontsize=10, loc='lower right')
        
        # Plot 2: Residuals
        ax2 = axes[0, 1]
        residuals = y - (result['slope'] * x + result['intercept'])
        ax2.scatter(x, residuals, s=50, alpha=0.7, color='green')
        ax2.axhline(y=0, color='r', linestyle='--', linewidth=1.5)
        ax2.set_xlabel('max_iter', fontsize=12)
        ax2.set_ylabel('Residual (ms)', fontsize=12)
        ax2.set_title('Residual Plot', fontsize=14, fontweight='bold')
        ax2.grid(True, alpha=0.3)
        
        # Add residual statistics
        residual_text = f'Mean: {np.mean(residuals):.4f} ms\n'
        residual_text += f'Std: {np.std(residuals):.4f} ms'
        ax2.text(0.05, 0.95, residual_text, transform=ax2.transAxes,
                fontsize=10, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.5))
        
        # Plot 3: Distribution of measurements
        ax3 = axes[1, 0]
        positions = result['max_iter_values'][:5]  # Show first 5 for clarity
        data_to_plot = result['all_measurements'][:5]
        bp = ax3.boxplot(data_to_plot, positions=positions, widths=5,
                         patch_artist=True, showmeans=True)
        for patch in bp['boxes']:
            patch.set_facecolor('lightblue')
        ax3.set_xlabel('max_iter', fontsize=12)
        ax3.set_ylabel('Execution Time (ms)', fontsize=12)
        ax3.set_title('Measurement Distribution (First 5 Points)', fontsize=14, fontweight='bold')
        ax3.grid(True, alpha=0.3, axis='y')
        
        # Plot 4: Summary table
        ax4 = axes[1, 1]
        ax4.axis('off')
        
        # Create summary table
        table_data = [
            ['Parameter', 'Value', 'Unit'],
            ['Slope (a)', f'{result["slope"]:.6f}', 'ms/iter'],
            ['Intercept (b)', f'{result["intercept"]:.3f}', 'ms'],
            ['Hardware Startup', f'{result["intercept"]:.3f}', 'ms'],
            ['Per-iteration Cost', f'{result["slope"]:.6f}', 'ms'],
            ['R-squared', f'{result["r_squared"]:.6f}', '-'],
            ['P-value', f'{result["p_value"]:.2e}', '-'],
            ['Data Points', f'{len(result["max_iter_values"])}', '-'],
            ['Max iter tested', f'{max(result["max_iter_values"])}', '-']
        ]
        
        table = ax4.table(cellText=table_data,
                         colWidths=[0.4, 0.3, 0.3],
                         cellLoc='left',
                         loc='center')
        table.auto_set_font_size(False)
        table.set_fontsize(10)
        table.scale(1.2, 1.8)
        
        # Style the header row
        for i in range(3):
            table[(0, i)].set_facecolor('#40466e')
            table[(0, i)].set_text_props(weight='bold', color='white')
        
        # Highlight hardware startup time row
        table[(3, 0)].set_facecolor('#ffe6e6')
        table[(3, 1)].set_facecolor('#ffe6e6')
        table[(3, 2)].set_facecolor('#ffe6e6')
        
        ax4.set_title('Linear Model Summary', fontsize=14, fontweight='bold', pad=20)
        
        plt.tight_layout()
        plt.show()
        
        # Save figure
        fig_filename = f'linear_analysis_{result["bitstream"].replace(".bit", "")}.png'
        fig.savefig(fig_filename, dpi=150, bbox_inches='tight')
        print(f"Figure saved as: {fig_filename}")
    
    # Summary comparison if multiple bitstreams
    if len(linear_results) > 1:
        print("\n" + "=" * 80)
        print("COMPARISON ACROSS ALL BITSTREAMS")
        print("=" * 80)
        
        comparison_data = []
        for result in linear_results:
            comparison_data.append({
                'Bitstream': result['bitstream'],
                'N': result['N'],
                'Slope (ms/iter)': f"{result['slope']:.6f}",
                'Intercept/Startup (ms)': f"{result['intercept']:.3f}",
                'R²': f"{result['r_squared']:.6f}"
            })
        
        df_comparison = pd.DataFrame(comparison_data)
        print(df_comparison.to_string(index=False))
        
        # Save comparison to CSV
        csv_filename = 'linear_analysis_comparison.csv'
        df_comparison.to_csv(csv_filename, index=False)
        print(f"\nComparison saved to: {csv_filename}")
        print("=" * 80)

In [None]:
def analyze_linear_relationship(bitstream_path, max_iter_range=None, num_trials=10):
    """
    Analyze linear relationship between max_iter and execution time
    
    Args:
        bitstream_path: Path to bitstream file
        max_iter_range: Range of max_iter values to test (default: 10 to 200 in steps of 10)
        num_trials: Number of trials per max_iter value
    
    Returns:
        dict: Linear regression results and statistics
    """
    if max_iter_range is None:
        max_iter_range = list(range(10, 210, 10))  # 10, 20, 30, ..., 200
    
    print(f"Testing linear relationship for: {bitstream_path.name}")
    print(f"Max_iter range: {min(max_iter_range)} to {max(max_iter_range)}")
    print("-" * 60)
    
    # Extract N from bitstream
    N = extract_N_from_bitstream(bitstream_path)
    
    # Generate test data
    nx = 12
    nu = 4
    x0_test, xref_test, uref_test = generate_test_data(nx, nu, N)
    
    # Initialize hardware solver
    hw_solver = tinympc_hw(bitstream_path=str(bitstream_path))
    
    # Collect data points
    max_iter_values = []
    mean_times = []
    std_times = []
    all_measurements = []
    
    for max_iter in max_iter_range:
        hw_solver.setup(max_iter=max_iter, check_termination=max_iter, verbose=0)
        
        # Warmup run
        hw_solver.set_x0(x0_test)
        hw_solver.set_x_ref(xref_test)
        hw_solver.set_u_ref(uref_test)
        hw_solver.solve(timeout=1.0)
        
        # Collect timing data
        times = []
        for _ in range(num_trials):
            hw_solver.set_x0(x0_test)
            hw_solver.set_x_ref(xref_test)
            hw_solver.set_u_ref(uref_test)
            
            start_time = time.perf_counter()
            success = hw_solver.solve(timeout=1.0)
            end_time = time.perf_counter()
            
            if success:
                exec_time = (end_time - start_time) * 1000  # ms
                times.append(exec_time)
        
        if len(times) > 0:
            max_iter_values.append(max_iter)
            mean_times.append(np.mean(times))
            std_times.append(np.std(times))
            all_measurements.append(times)
            print(f"  max_iter={max_iter:3d}: {np.mean(times):.3f} ± {np.std(times):.3f} ms")
    
    # Cleanup
    hw_solver.cleanup()
    
    # Perform linear regression
    if len(max_iter_values) > 1:
        slope, intercept, r_value, p_value, std_err = stats.linregress(max_iter_values, mean_times)
        
        # Calculate confidence intervals (95%)
        n = len(max_iter_values)
        t_val = stats.t.ppf(0.975, n-2)  # 95% confidence
        
        # Standard error of slope and intercept
        x_mean = np.mean(max_iter_values)
        ss_x = np.sum((np.array(max_iter_values) - x_mean)**2)
        se_slope = std_err
        se_intercept = std_err * np.sqrt(np.sum(np.array(max_iter_values)**2) / (n * ss_x))
        
        # Confidence intervals
        slope_ci = (slope - t_val * se_slope, slope + t_val * se_slope)
        intercept_ci = (intercept - t_val * se_intercept, intercept + t_val * se_intercept)
        
        results = {
            'bitstream': bitstream_path.name,
            'N': N,
            'max_iter_values': max_iter_values,
            'mean_times': mean_times,
            'std_times': std_times,
            'all_measurements': all_measurements,
            'slope': slope,  # a in ax+b
            'intercept': intercept,  # b in ax+b
            'r_squared': r_value**2,
            'p_value': p_value,
            'std_err': std_err,
            'slope_ci': slope_ci,
            'intercept_ci': intercept_ci
        }
        
        print("\n" + "=" * 60)
        print("LINEAR REGRESSION RESULTS")
        print("=" * 60)
        print(f"Model: time(ms) = {slope:.6f} × max_iter + {intercept:.3f}")
        print(f"\nCoefficients:")
        print(f"  a (slope): {slope:.6f} ms/iteration")
        print(f"    95% CI: [{slope_ci[0]:.6f}, {slope_ci[1]:.6f}]")
        print(f"  b (intercept): {intercept:.3f} ms (hardware startup time)")
        print(f"    95% CI: [{intercept_ci[0]:.3f}, {intercept_ci[1]:.3f}]")
        print(f"\nStatistics:")
        print(f"  R-squared: {r_value**2:.6f}")
        print(f"  P-value: {p_value:.2e}")
        print(f"  Standard error: {std_err:.6f}")
        print("=" * 60)
        
        return results
    else:
        print("Insufficient data for linear regression")
        return None

# Perform linear analysis for each bitstream
linear_results = []

if len(bitstreams) > 0:
    print("\n" + "=" * 80)
    print("ANALYZING LINEAR RELATIONSHIP FOR ALL BITSTREAMS")
    print("=" * 80)
    
    for idx, bitstream in enumerate(bitstreams):
        print(f"\n[{idx+1}/{len(bitstreams)}] Analyzing: {bitstream.name}")
        
        try:
            result = analyze_linear_relationship(
                bitstream,
                max_iter_range=list(range(10, 210, 10)),  # Test from 10 to 200
                num_trials=10
            )
            if result:
                linear_results.append(result)
        except Exception as e:
            print(f"  ERROR: Failed to analyze - {e}")
    
    print("\n" + "=" * 80)
    print("ANALYSIS COMPLETE")
    print("=" * 80)

## 3. Linear Relationship Analysis: max_iter vs Execution Time

Explore the linear relationship between max_iter and execution time using the model: **time = a × max_iter + b**
- **a**: Time per iteration (ms/iteration)
- **b**: Hardware startup/overhead time (ms)

## Bitstream Switch Time Test

In [None]:
import time
all_bitstreams = list(Path("bitstream").glob("*.bit"))

# Test bitstream switch time
print("\n" + "="*60)
print("BITSTREAM SWITCH TIME TEST")
print("="*60)

if len(all_bitstreams) < 2:
    print("Need at least 2 bitstreams to test switching time")
else:
    # Initialize results storage
    switch_times = []
    
    # Number of switches to test
    num_tests = 10
    
    print(f"Testing {num_tests} switches between bitstreams...")
    
    # Alternate between first two bitstreams
    for i in range(num_tests):
        bitstream = all_bitstreams[i % len(all_bitstreams)]
        
        # Record start time
        start_time = time.time()
        new_solver = tinympc_hw(bitstream_path=str(bitstream))
        switch_time = (time.time() - start_time) * 1000  # Convert to ms
        switch_times.append(switch_time)
        print(f"Switch {i+1}: {switch_time:.2f} ms")
    
    # Calculate statistics
    avg_switch = np.mean(switch_times)
    std_switch = np.std(switch_times)
    
    print("\nResults:")
    print(f"Average switch time: {avg_switch:.2f} ms")
    print(f"Standard deviation: {std_switch:.2f} ms")
    print("="*60)
