In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import polars as pl

In [None]:
n_pdm_samples = 128

d = pl.read_csv("../frontend/parameter_sweep.csv")
d

In [None]:
grouped = d.group_by("pdm_frequency", "sampling_frequency", maintain_order=True)

In [None]:
# Measure phases via correlation. Use a sliding window so we can get many measurements of the phase and thus a sense of the "noise"
def measure_phases(samples, sampling_frequency, signal_frequency, window_size=1024):
    num_windows = len(samples) - window_size + 1
    phases = np.zeros(num_windows)
    t = np.arange(len(samples)) / sampling_frequency
    for i in range(num_windows):
        window = samples[i:i+window_size]
        
        sine_wave = np.sin(2 * np.pi * signal_frequency * t[i:i+window_size])
        cosine_wave = np.cos(2 * np.pi * signal_frequency * t[i:i+window_size])
        correlation_sine = np.sum(window * sine_wave)
        correlation_cosine = np.sum(window * cosine_wave)
        phases[i] = np.arctan2(correlation_sine, correlation_cosine)

    return phases

In [None]:

(pdm_frequency, sampling_frequency), g = list(grouped)[50]
plt.figure()
plt.plot(g["sample"])
plt.xlabel("Sample Index")

signal_frequency = g[0, "pdm_frequency"] / n_pdm_samples
sampling_frequency = g[0, "sampling_frequency"]

phases = measure_phases(g["sample"].to_numpy(), sampling_frequency, signal_frequency)
plt.figure()
plt.plot(phases)
plt.xlabel("Sample Index")
plt.ylabel("Phase (radians)")
plt.title("Phase Values for First PDM Frequency")


In [None]:
def calculate_phases(df):
    signal_frequency = df[0, "pdm_frequency"] / n_pdm_samples
    sampling_frequency = df[0, "sampling_frequency"]
    window_sizes = [128, 256, 512, 1024, 2048]
    df = pl.DataFrame({
        "pdm_frequency": df[0, "pdm_frequency"],
        "sampling_frequency": df[0, "sampling_frequency"],
        "window_size": window_sizes,
        "phase": [pl.Series(measure_phases(df["sample"].to_numpy(), sampling_frequency, signal_frequency, window_size)) for window_size in window_sizes]
    })
    return df

with_phases = d.group_by("pdm_frequency", "sampling_frequency", maintain_order=True).map_groups(calculate_phases)


In [None]:
# Calculate std dev of phases for each group
phase_stats = with_phases.select(
    "pdm_frequency",
    "sampling_frequency", 
    "window_size",
    pl.col("phase").list.std().alias("phase_std")
)

# Create figure with subplots stacked vertically
window_sizes = sorted(phase_stats["window_size"].unique().to_list())
fig, axes = plt.subplots(len(window_sizes), 1, figsize=(12, 4*len(window_sizes)))

# Get global min and max for y-axis scaling
y_min = phase_stats["phase_std"].min()
y_max = phase_stats["phase_std"].max()

# Create a plot for each window size
for i, window_size in enumerate(window_sizes):
    window_data = phase_stats.filter(pl.col("window_size") == window_size)
    
    sns.lineplot(
        data=window_data.with_columns(
            pdm_frequency=pl.col("pdm_frequency") / 1000,
            sampling_frequency=(pl.col("sampling_frequency") / 1000).round(1)
        ),
        x="pdm_frequency",
        y="phase_std",
        hue="sampling_frequency",
        palette="viridis",
        legend="full" if i == 0 else False,  # Only show legend on first plot
        ax=axes[i]
    )
    
    # Modify legend title and align labels for first plot
    if i == 0:
        legend = axes[i].get_legend()
        legend.set_title("Sampling Frequency (kHz)")
    
    axes[i].set_xlabel("PDM Frequency (kHz)")
    axes[i].set_ylabel("Phase Standard Deviation (radians)")
    axes[i].set_title(f"Phase Variation vs PDM Frequency (Window Size: {window_size})")
    axes[i].set_ylim(y_min, y_max)  # Set same y-axis limits for all subplots

# Adjust spacing between subplots
plt.tight_layout()

In [None]:
# Calculate std dev of phases for each group
phase_stats = with_phases.select(
    "pdm_frequency",
    "sampling_frequency", 
    "window_size",
    pl.col("phase").list.std().alias("phase_std")
)

# Create figure with subplots stacked vertically
window_sizes = sorted(phase_stats["window_size"].unique().to_list())
fig, axes = plt.subplots(len(window_sizes), 1, figsize=(12, 4*len(window_sizes)))

# Get global min and max for y-axis scaling
y_min = phase_stats["phase_std"].min()
y_max = phase_stats["phase_std"].max()

# Create a plot for each window size
for i, window_size in enumerate(window_sizes):
    window_data = phase_stats.filter(pl.col("window_size") == window_size)
    
    # Create scatter plot with lines
    plot = sns.lineplot(
        data=window_data.with_columns(
            pdm_frequency=pl.col("pdm_frequency") / 1000,
            sampling_frequency=(pl.col("sampling_frequency") / 1000).round(1)
        ),
        x="pdm_frequency",
        y="phase_std",
        hue="sampling_frequency",
        palette="viridis",
        legend="full" if i == 0 else False,  # Only show legend on first plot
        ax=axes[i]
    )
    
    # Add hover annotations
    annot = axes[i].annotate("", xy=(0,0), xytext=(10,10),
                            textcoords="offset points",
                            bbox=dict(boxstyle="round", fc="w", ec="0.5", alpha=0.9),
                            arrowprops=dict(arrowstyle="->"))
    annot.set_visible(False)

    def hover(event):
        if event.inaxes == axes[i]:
            cont, ind = plot.lines[0].contains(event)
            if cont:
                x = event.xdata
                y = event.ydata
                annot.xy = (x, y)
                text = f'PDM Freq: {x:.1f} kHz\nPhase StdDev: {y:.3f} rad'
                annot.set_text(text)
                annot.set_visible(True)
                fig.canvas.draw_idle()
            else:
                annot.set_visible(False)
                fig.canvas.draw_idle()
    
    # Modify legend title and align labels for first plot
    if i == 0:
        legend = axes[i].get_legend()
        legend.set_title("Sampling Frequency (kHz)")
    
    axes[i].set_xlabel("PDM Frequency (kHz)")
    axes[i].set_ylabel("Phase Standard Deviation (radians)")
    axes[i].set_title(f"Phase Variation vs PDM Frequency (Window Size: {window_size})")
    axes[i].set_ylim(y_min, y_max)  # Set same y-axis limits for all subplots

# Connect the hover event
fig.canvas.mpl_connect("motion_notify_event", hover)

# Adjust spacing between subplots
plt.tight_layout()