In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from pynq import Overlay, allocate
from scipy import signal
import ipywidgets as widgets
from ipywidgets import interact, Layout
import signal_generator as sg

# --- Configuration ---
FFT_LEN = 4096
FS = 4096

# Locate Bitstream
BITSTREAM_PATH = "/home/alan/Learning/FPGA/fpgachina/PYNQ_Z2_MUSIC_VISUALIZER/Program/PYNQ/design_1_wrapper.bit"
if not os.path.exists(BITSTREAM_PATH):
    BITSTREAM_PATH = "design_1_wrapper.bit" # Fallback to local

print(f"Loading Overlay: {BITSTREAM_PATH}")
overlay = Overlay(BITSTREAM_PATH)

# Initialize DMAs
dma_data = overlay.axi_dma_0 # Data Path
dma_coef = overlay.axi_dma_1 # Coefficient Reload Path

print("Overlay loaded and DMAs initialized.")

In [None]:
# --- Hardware Drivers ---

def reload_fir_coefficients(coeffs):
    """
    Reloads the FIR filter coefficients using DMA1.
    Expects a list/array of coefficients.
    """
    # Convert to int16 as expected by the IP
    coeffs_int16 = np.array(coeffs, dtype=np.int16)
    
    # Allocate buffer (DMA expects 32-bit words, we pack int16 into them)
    buffer_size = len(coeffs_int16)
    coef_buffer = allocate(shape=(buffer_size,), dtype=np.int32)
    
    # Copy data
    np.copyto(coef_buffer, coeffs_int16)
    
    # Transfer
    dma_coef.sendchannel.transfer(coef_buffer)
    dma_coef.sendchannel.wait()
    
    del coef_buffer

def run_hardware_chain(signal_data):
    """
    Sends signal to hardware chain (FIR -> FFT) and returns the FFT magnitude.
    """
    # Input Buffer Allocation
    in_buffer = allocate(shape=(FFT_LEN,), dtype=np.int32)
    
    # Convert float signal to int16 for hardware
    # Note: Ensure signal amplitude is within int16 range (-32768 to 32767)
    sig_int16 = np.int16(signal_data)
    np.copyto(in_buffer, sig_int16)
    
    # Output Buffer Allocation (FFT Magnitude is Float32)
    out_buffer = allocate(shape=(FFT_LEN,), dtype=np.float32)
    
    # DMA Transfer
    dma_data.recvchannel.transfer(out_buffer)
    dma_data.sendchannel.transfer(in_buffer)
    dma_data.sendchannel.wait()
    dma_data.recvchannel.wait()
    
    result = np.copy(out_buffer)
    
    del in_buffer
    del out_buffer
    
    return result

def software_fft(signal_data):
    """Reference Software FFT implementation"""
    return np.abs(np.fft.fft(signal_data))

In [None]:
# --- Filter Design Helper ---

def get_fir_coeffs(filter_type, cutoff_hz, num_taps=21, gain=100):
    """
    Generates FIR coefficients based on selected type.
    Returns int16 array scaled by 'gain'.
    """
    nyq = 0.5 * FS
    norm_cutoff = cutoff_hz / nyq
    
    # Safety clamp for cutoff frequency
    norm_cutoff = np.clip(norm_cutoff, 0.01, 0.99)
    
    if filter_type == "Pass-Through":
        # Impulse response: [0, ... 1, ... 0]
        coeffs = np.zeros(num_taps)
        coeffs[num_taps // 2] = 1.0
        
    elif filter_type == "Low-Pass":
        coeffs = signal.firwin(num_taps, norm_cutoff)
        
    elif filter_type == "High-Pass":
        coeffs = signal.firwin(num_taps, norm_cutoff, pass_zero=False)
        
    elif filter_type == "Band-Pass":
        # Create a band around the cutoff
        width = 0.1
        low = max(0.01, norm_cutoff - width)
        high = min(0.99, norm_cutoff + width)
        coeffs = signal.firwin(num_taps, [low, high], pass_zero=False)
        
    else:
        coeffs = np.zeros(num_taps)
        coeffs[num_taps // 2] = 1.0

    # Scale to integer range and cast
    coeffs_int = np.int16(coeffs * gain)
    return coeffs_int

In [None]:
import time

def benchmark_performance():
    print("--- Starting Performance Benchmark (FIR + FFT) ---")
    
    # 1. Setup Test Data
    # Use a random noise signal for benchmarking
    t = np.arange(FFT_LEN) / FS
    test_signal = sg.generate_white_noise(FFT_LEN)
    
    # Setup Filter Coefficients (Low-Pass) for both HW and SW
    # We use the helper function defined earlier
    coeffs = get_fir_coeffs("Low-Pass", 500, gain=100)
    
    # Reload HW coefficients once before test to ensure consistent state
    reload_fir_coefficients(coeffs)
    
    # 2. Define Software Chain Function (FIR + FFT)
    def run_software_chain(sig, b):
        # Software FIR using scipy.signal.lfilter
        # Normalize coefficients back to float for fair comparison if needed, 
        # but for timing, the operation count is what matters.
        # We use the same coefficients vector.
        filtered = signal.lfilter(b, [1.0], sig)
        # Software FFT
        return np.abs(np.fft.fft(filtered))
        
    # 3. Measure Execution Time
    iterations = 100 # Run enough times to get stable average
    
    print(f"Running {iterations} iterations for each method...")
    
    # --- Hardware Measurement (Optimized) ---
    # Pre-allocate buffers to exclude memory allocation overhead from the benchmark
    # This gives a more precise measurement of the actual FPGA processing + DMA transfer time
    in_buffer = allocate(shape=(FFT_LEN,), dtype=np.int32)
    out_buffer = allocate(shape=(FFT_LEN,), dtype=np.float32)
    
    # Prepare input data
    sig_int16 = np.int16(test_signal)
    np.copyto(in_buffer, sig_int16)
    
    # Warmup
    dma_data.recvchannel.transfer(out_buffer)
    dma_data.sendchannel.transfer(in_buffer)
    dma_data.sendchannel.wait()
    dma_data.recvchannel.wait()
    
    # Measure Hardware
    start_time = time.perf_counter()
    for _ in range(iterations):
        dma_data.recvchannel.transfer(out_buffer)
        dma_data.sendchannel.transfer(in_buffer)
        dma_data.sendchannel.wait()
        dma_data.recvchannel.wait()
    end_time = time.perf_counter()
    avg_hw_time = (end_time - start_time) / iterations
    
    # Cleanup
    del in_buffer
    del out_buffer
    
    # --- Software Measurement ---
    # Warmup
    run_software_chain(test_signal, coeffs)
    
    # Measure Software
    start_time = time.perf_counter()
    for _ in range(iterations):
        run_software_chain(test_signal, coeffs)
    end_time = time.perf_counter()
    avg_sw_time = (end_time - start_time) / iterations
    
    # 4. Visualize Results
    labels = ['Hardware (FPGA)', 'Software (ARM CPU)']
    times = [avg_hw_time * 1000, avg_sw_time * 1000] # Convert to ms
    colors = ['#ff7f0e', '#2ca02c']
    
    fig, ax = plt.subplots(figsize=(10, 6))
    bars = ax.bar(labels, times, color=colors, width=0.5)
    
    ax.set_ylabel('Execution Time (ms)')
    ax.set_title(f'Performance Comparison: FIR Filter + FFT (N={FFT_LEN})')
    ax.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Add value labels
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.4f} ms',
                ha='center', va='bottom', fontsize=12, fontweight='bold')
                
    # Add Speedup Annotation
    speedup = avg_sw_time / avg_hw_time
    plt.text(0.5, max(times)*0.6, f"Speedup: {speedup:.2f}x", 
             ha='center', fontsize=16, bbox=dict(facecolor='white', edgecolor='black', boxstyle='round,pad=0.5'))
             
    plt.tight_layout()
    plt.show()
    
    print(f"Average Hardware Time: {avg_hw_time*1000:.4f} ms")
    print(f"Average Software Time: {avg_sw_time*1000:.4f} ms")
    print(f"Hardware Acceleration Speedup: {speedup:.2f}x")

# Run the benchmark
benchmark_performance()

In [None]:
# --- Interactive Demo ---

def update_demo(signal_name, filter_type, cutoff, gain):
    # 1. Generate Signal
    t = np.arange(FFT_LEN) / FS
    
    if signal_name == "Sine (50Hz)":
        sig = sg.generate_sine_wave(t, freq=50, amplitude=500)
    elif signal_name == "Multi-Sine (Low+High)":
        # Mix of 50Hz, 200Hz, and 1000Hz
        sig = sg.generate_multi_sine_wave(t, freqs=[50, 200, 1000], amplitudes=[0.5, 0.3, 0.2], base_amplitude=500)
    elif signal_name == "Square (50Hz)":
        sig = sg.generate_square_wave(t, freq=50, amplitude=500)
    elif signal_name == "Chirp (10-1000Hz)":
        sig = sg.generate_chirp_signal(t, start_freq=10, end_freq=1000, amplitude=500)
    elif signal_name == "AM (Carrier 500Hz)":
        sig = sg.generate_am_signal(t, carrier_freq=500, signal_freq=20, amplitude=500)
    else:
        sig = np.zeros_like(t)

    # 2. Prepare & Reload Coefficients
    coeffs = get_fir_coeffs(filter_type, cutoff, gain=gain)
    try:
        reload_fir_coefficients(coeffs)
        reload_msg = "Success"
    except Exception as e:
        reload_msg = f"Failed ({str(e)})"

    # 3. Run Hardware Chain
    hw_fft = run_hardware_chain(sig)
    
    # 4. Run Software FFT (Reference on Original Signal)
    sw_fft = software_fft(sig)
    
    # 5. Visualization
    fig, axs = plt.subplots(3, 1, figsize=(10, 12))
    plt.subplots_adjust(hspace=0.4)
    
    # Plot 1: Time Domain
    axs[0].plot(t[:500], sig[:500], label='Input Signal')
    axs[0].set_title(f"Time Domain Signal (First 500 samples) - {signal_name}")
    axs[0].set_xlabel("Time (s)")
    axs[0].set_ylabel("Amplitude")
    axs[0].grid(True, alpha=0.3)
    axs[0].legend()

    # Plot 2: Hardware FFT (Filtered)
    freqs = np.fft.fftfreq(FFT_LEN, 1/FS)
    freqs_shifted = np.fft.fftshift(freqs)
    hw_shifted = np.fft.fftshift(hw_fft)
    
    axs[1].plot(freqs_shifted, hw_shifted, color='#ff7f0e', label='Hardware Output')
    axs[1].set_title(f"Hardware FFT Output (After {filter_type} Filter) - Coeff Reload: {reload_msg}")
    axs[1].set_xlabel("Frequency (Hz)")
    axs[1].set_ylabel("Magnitude")
    axs[1].grid(True, alpha=0.3)
    axs[1].legend()
    
    # Plot 3: Software FFT (Original)
    sw_shifted = np.fft.fftshift(sw_fft)
    
    # Normalize SW plot to match HW range for easier comparison if needed, 
    # but keeping absolute values is better to see Gain effects.
    axs[2].plot(freqs_shifted, sw_shifted, color='#2ca02c', label='Software Reference')
    axs[2].set_title("Software FFT Output (Original Unfiltered Signal)")
    axs[2].set_xlabel("Frequency (Hz)")
    axs[2].set_ylabel("Magnitude")
    axs[2].grid(True, alpha=0.3)
    axs[2].legend()
    
    plt.show()

# --- Launch Interface ---
style = {'description_width': 'initial'}
layout = Layout(width='50%')

interact(update_demo, 
         signal_name=widgets.Dropdown(
             options=["Sine (50Hz)", "Multi-Sine (Low+High)", "Square (50Hz)", "Chirp (10-1000Hz)", "AM (Carrier 500Hz)"], 
             value="Multi-Sine (Low+High)", 
             description="Signal Type", 
             style=style, layout=layout
         ),
         filter_type=widgets.Dropdown(
             options=["Pass-Through", "Low-Pass", "High-Pass", "Band-Pass"], 
             value="Pass-Through", 
             description="Filter Type", 
             style=style, layout=layout
         ),
         cutoff=widgets.FloatSlider(
             min=100, max=1500, step=50, value=500, 
             description="Cutoff Freq (Hz)", 
             style=style, layout=layout
         ),
         gain=widgets.IntSlider(
             min=1, max=500, value=100, 
             description="Filter Gain", 
             style=style, layout=layout
         )
        )