## EE 102 Week 12 Lecture 2: Virtual Manipulator on Sampling

### Define all methods as functions

In [None]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
pio.renderers.default = "notebook_connected"


# create the cosine signal as an example here:
f0 = 5.0 # frequency of the cosine signal we used in class (in Hz)
T = 1.0 # we choose the total time duration to be 1 second (arbitrarily)
dt_cont = 1e-3 # to simulate "continuous" time, we use a fine time step that is very small
# time vector for "continuous" time
t_cont = np.arange(0, T, dt_cont)
# continuous-time signal x(t) (note: this is not _really_ continuous, just sampled finely enough)
x_cont = np.cos(2 * np.pi * f0 * t_cont)


def zoh_reconstruct(t_cont, t_s, x_s):
    """
    Zero-Order Hold (piecewise constant) reconstruction --- discussed in class.
    """
    # For each t in t_cont, find index of most recent sample time
    indices = np.searchsorted(t_s, t_cont, side='right') - 1
    indices = np.clip(indices, 0, len(x_s) - 1) # the clip lets us create the "hold"
    return x_s[indices]


def compute_fft(signal, dt):
    """
    Compute centered FFT magnitude and corresponding frequency axis (Hz).
    """
    N = len(signal)
    X = np.fft.fftshift(np.fft.fft(signal))
    freqs = np.fft.fftshift(np.fft.fftfreq(N, dt))
    return freqs, np.abs(X)


def lowpass_reconstruct(t_cont, t_s, x_s, Fs, dt_cont, fc_factor=0.45, num_taps=101):

    # the original signal's sampling rate
    F_cont = 1.0 / dt_cont

    # First we place the discrete-time samples on the fine grid as impulses (zeros elsewhere)
    x_up = np.zeros_like(t_cont)
    # find the indices (by rounding to the nearest "t_cont") on the continuous-time axis that is closest to each sample
    idx = np.round(t_s / dt_cont).astype(int)
    idx = idx[(idx >= 0) & (idx < len(t_cont))] # choose only indices that are positive and within bounds
    x_up[idx] = x_s[:len(idx)] # remember x_s is our sampled signal

    # Design a simple windowed-sinc low-pass filter in discrete time
    # The low-pass filter cutoff frequency in Hz is set to be below Fs/2 (Nyquist) 
    # to avoid aliasing from the copies. 
    fc = fc_factor * Fs # default fc_factor=0.45 means cutoff is 0.45*Fs or 2.22 times f0
    # normalized cutoff [0, 1], where 1 corresponds to F_cont/2
    fc_norm = fc / (F_cont / 2.0)    

    # time indices for the filter
    n = np.arange(num_taps)
    mid = (num_taps - 1) / 2.0

    # ideal low-pass in time domain is h(t) = sinc() [RECALL Week 11 derivation of h(t) for LPF] 
    # in discrete time, normalized
    h = fc_norm * np.sinc(fc_norm * (n - mid))

    # Hamming window to reduce side-lobes: h[n] = h[n] * w[n]: THIS STEP IS OPTIONAL!
    h *= np.hamming(num_taps)

    # normalize for unity gain at DC: sum of h[n] = 1 (to avoid gain change in reconstruction)
    h /= np.sum(h)

    # finally, convolve the "impulse train" of sampled signal with LPF to reconstruct
    x_rec = np.convolve(x_up, h, mode='same')
    L = F_cont / Fs
    x_rec *= L
    return x_rec

def plot_case(Fs, recon_method="zoh", title_suffix=""):
    """
    This function makes a 3x2 plot:
      Row 1: original x(t)
      Row 2: samples
      Row 3: reconstruction
    Columns: time domain (left), frequency domain (right).
    """

    Ts = 1.0 / Fs
    t_s = np.arange(0, T, Ts)
    x_s = np.cos(2 * np.pi * f0 * t_s)

    # Reconstruction
    if recon_method == "lpf":
        x_rec = lowpass_reconstruct(t_cont, t_s, x_s, Fs, dt_cont)
    elif recon_method == "zoh":
        x_rec = zoh_reconstruct(t_cont, t_s, x_s)
    else:
        raise ValueError("Unknown recon_method")

    # FFTs
    f_cont, X_cont = compute_fft(x_cont, dt_cont)
    f_s, X_s = compute_fft(x_s, Ts)
    f_rec, X_rec = compute_fft(x_rec, dt_cont)

    # Limit frequency range for plotting (around baseband)
    f_min, f_max = -30, 30

    fig = make_subplots(
        rows=3, cols=2,
        subplot_titles=("Original (time)", "Original (frequency)",
                       f"Samples (Fs = {Fs:.1f} Hz)", "Sampled spectrum",
                       "Reconstruction (time)", "Reconstruction (frequency)"),
        vertical_spacing=0.1,
        horizontal_spacing=0.1
    )

    # Row 1, Col 1: Original (time)
    fig.add_trace(go.Scatter(x=t_cont, y=x_cont, mode='lines', name='x(t)', showlegend=False), row=1, col=1)
    fig.update_xaxes(range=[0, T], row=1, col=1)
    fig.update_yaxes(title_text="x(t)", row=1, col=1)

    # Row 1, Col 2: Original (frequency)
    fig.add_trace(go.Scatter(x=f_cont, y=X_cont, mode='lines', name='|X(f)|', showlegend=False), row=1, col=2)
    fig.update_xaxes(range=[f_min, f_max], row=1, col=2)
    fig.update_yaxes(title_text="|X(f)|", row=1, col=2)

    # Row 2, Col 1: Samples (stem plot)
    fig.add_trace(go.Scatter(x=t_s, y=x_s, mode='markers', name='samples',
                            marker=dict(size=8), showlegend=False), row=2, col=1)
    for i in range(len(t_s)):
        fig.add_trace(go.Scatter(x=[t_s[i], t_s[i]], y=[0, x_s[i]], mode='lines',
                                line=dict(color='blue', width=1), showlegend=False), row=2, col=1)
    fig.update_xaxes(range=[0, T], row=2, col=1)
    fig.update_yaxes(title_text="samples", row=2, col=1)

    # Row 2, Col 2: Sampled spectrum
    fig.add_trace(go.Scatter(x=f_s, y=X_s, mode='lines', name='|X_s(f)|', showlegend=False), row=2, col=2)
    fig.update_xaxes(range=[f_min, f_max], row=2, col=2)
    fig.update_yaxes(title_text="|X_s(f)|", row=2, col=2)

    # Row 3, Col 1: Reconstruction (time)
    fig.add_trace(go.Scatter(x=t_cont, y=x_cont, mode='lines', name='original',
                            line=dict(dash='dash')), row=3, col=1)
    fig.add_trace(go.Scatter(x=t_cont, y=x_rec, mode='lines', name='recon'), row=3, col=1)
    fig.add_trace(go.Scatter(x=t_s, y=x_s, mode='markers', name='samples',
                            marker=dict(size=6, color='green')), row=3, col=1)
    for i in range(len(t_s)):
        fig.add_trace(go.Scatter(x=[t_s[i], t_s[i]], y=[0, x_s[i]], mode='lines',
                                line=dict(color='green', width=1), showlegend=False), row=3, col=1)
    fig.update_xaxes(title_text="t (s)", range=[0, T], row=3, col=1)
    fig.update_yaxes(title_text="x(t)", row=3, col=1)

    # Row 3, Col 2: Reconstruction (frequency)
    fig.add_trace(go.Scatter(x=f_cont, y=X_cont, mode='lines', name='original',
                            line=dict(dash='dash'), showlegend=False), row=3, col=2)
    fig.add_trace(go.Scatter(x=f_rec, y=X_rec, mode='lines', name='recon', showlegend=False), row=3, col=2)
    fig.update_xaxes(title_text="f (Hz)", range=[f_min, f_max], row=3, col=2)
    fig.update_yaxes(title_text="|X(f)|", row=3, col=2)

    fig.update_layout(
        title_text=title_suffix,
        height=800,
        width=800,
        showlegend=True,
        legend=dict(font=dict(size=8))
    )
    fig.show()


Click on legend on the plotly plots to select/unselect to see each component separately:

In [None]:
# Case 1: Ideal. No aliasing
# f0 = 5 Hz, Nyquist = 2*f0 = 10 Hz.

Fs1 = 40.0
# the sinc reconstruction approximates the impulses at each sample time
plot_case(Fs1, recon_method="zoh",
          title_suffix="Case 1: No aliasing (Fs = 40 Hz): zero-order hold reconstruction.")


In [None]:
# Case 2: Aliasing
# f0 = 5 Hz, Nyquist = 10 Hz
# Pick Fs < 10 Hz, e.g., 8 Hz

Fs2 = 5.0
plot_case(Fs2, recon_method="zoh",
          title_suffix="Case 2: Aliasing (Fs = 5 Hz < 2*f0, zero-order hold reconstruction)")

In [None]:
# Case 3: No aliasing, but distortion from non-ideal reconstruction
# f0 = 5 Hz, Nyquist = 10 Hz
# Pick Fs > 10 Hz, e.g., 12 Hz (no aliasing)
# Reconstruct with Zero-Order Hold (ZOH), not ideal low-pass

Fs3 = 25
plot_case(Fs3, recon_method="zoh",
          title_suffix="Case 3: No aliasing, but ZOH distortion (Fs = 12 Hz, ZOH recon)")


## Sample an audio signal: `guitar_clean.wav`
Download the file from this [URL](https://github.com/jvierine/signal_processing/blob/master/003_guitar/guitar_clean.wav)

In [1]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.io import wavfile
from scipy import signal
from fractions import Fraction

Fs_orig, data = wavfile.read("guitar_clean.wav")

# If stereo, take left channel
if data.ndim > 1:
    data = data[:, 0]

# Convert to float in [-1, 1]
if data.dtype.kind in "iu":  # int types
    data = data.astype(np.float32) / np.iinfo(data.dtype).max
else:
    data = data.astype(np.float32)

duration_plot = 0.1
N3 = int(duration_plot * Fs_orig)
x_orig = data[:N3]
t_orig = np.arange(len(x_orig)) / Fs_orig



def resample_to_Fs(x, Fs_in, Fs_out):
    """
    Bandlimited resample from Fs_in to Fs_out using resample_poly.
    Returns (y, Fs_out).
    THIS IS IMPORTANT!! 
    Otherwise your frequency axis will be wrong and the audio playback will be at wrong speed.
    """
    ratio = Fraction(Fs_out, Fs_in).limit_denominator()
    up, down = ratio.numerator, ratio.denominator
    y = signal.resample_poly(x, up, down)
    return y, Fs_out


def compute_fft_mag(x, Fs):
    """Centered FFT magnitude and frequency axis (Hz)."""
    N = len(x)
    X = np.fft.fftshift(np.fft.fft(x))
    freqs = np.fft.fftshift(np.fft.fftfreq(N, d=1.0/Fs))
    return freqs, np.abs(X)


def write_wav(filename, Fs, x):
    """Write float audio in [-1,1] to 16-bit WAV."""
    x_clip = np.clip(x, -1.0, 1.0)
    x_int16 = (x_clip * 32767).astype(np.int16)
    wavfile.write(filename, Fs, x_int16)


def plot_sampling_case(Fs_target, out_wav, title_suffix=""):
    x_s, Fs_s = resample_to_Fs(x_orig, Fs_orig, Fs_target)
    t_s = np.arange(len(x_s)) / Fs_s

    write_wav(out_wav, Fs_s, x_s)

    x_rec, _ = resample_to_Fs(x_s, Fs_s, Fs_orig)
    # align lengths for plotting / FFT
    N = min(len(x_orig), len(x_rec))
    x_rec = x_rec[:N]
    x_orig_aligned = x_orig[:N]
    t_rec = np.arange(N) / Fs_orig

    f_orig, X_orig = compute_fft_mag(x_orig_aligned, Fs_orig)
    f_s, X_s = compute_fft_mag(x_s, Fs_s)
    f_rec, X_rec = compute_fft_mag(x_rec, Fs_orig)

    f_min, f_max = 0, Fs_orig / 2  # one-sided (audio intuition)

    fig = make_subplots(
        rows=3, cols=2,
        subplot_titles=("Original (time)", "Original (magnitude spectrum)",
                       f"Samples (Fs = {Fs_s:.0f} Hz)", "Sampled spectrum",
                       "Reconstruction (time, back at Fs_orig)", "Reconstruction (spectrum)"),
        vertical_spacing=0.1,
        horizontal_spacing=0.1
    )

    fig.add_trace(go.Scatter(x=t_orig, y=x_orig, mode='lines', name='x(t)', showlegend=False), row=1, col=1)
    fig.update_xaxes(range=[0, duration_plot], row=1, col=1)
    fig.update_yaxes(title_text="x(t)", row=1, col=1)

    fig.add_trace(go.Scatter(x=f_orig, y=X_orig, mode='lines', name='|X(f)|', showlegend=False), row=1, col=2)
    fig.update_xaxes(range=[f_min, f_max], row=1, col=2)
    fig.update_yaxes(title_text="|X(f)|", row=1, col=2)

    fig.add_trace(go.Scatter(x=t_s, y=x_s, mode='markers', name='samples',
                            marker=dict(size=2), showlegend=False), row=2, col=1)
    step = max(1, len(t_s) // 100)
    for i in range(0, len(t_s), step):
        fig.add_trace(go.Scatter(x=[t_s[i], t_s[i]], y=[0, x_s[i]], mode='lines',
                                line=dict(color='blue', width=1), showlegend=False), row=2, col=1)
    fig.update_xaxes(range=[0, duration_plot], row=2, col=1)
    fig.update_yaxes(title_text="samples", row=2, col=1)

    fig.add_trace(go.Scatter(x=f_s, y=X_s, mode='lines', name='|X_s(f)|', showlegend=False), row=2, col=2)
    fig.update_xaxes(range=[f_min, min(f_max, Fs_s/2)], row=2, col=2)
    fig.update_yaxes(title_text="|X_s(f)|", row=2, col=2)

    fig.add_trace(go.Scatter(x=t_rec, y=x_orig_aligned, mode='lines', name='original',
                            line=dict(dash='dash')), row=3, col=1)
    fig.add_trace(go.Scatter(x=t_rec, y=x_rec, mode='lines', name='reconstructed'), row=3, col=1)
    fig.update_xaxes(title_text="t (s)", range=[0, duration_plot], row=3, col=1)
    fig.update_yaxes(title_text="x(t)", row=3, col=1)

    fig.add_trace(go.Scatter(x=f_orig, y=X_orig, mode='lines', name='original',
                            line=dict(dash='dash'), showlegend=False), row=3, col=2)
    fig.add_trace(go.Scatter(x=f_rec, y=X_rec, mode='lines', name='reconstructed', showlegend=False), row=3, col=2)
    fig.update_xaxes(title_text="f (Hz)", range=[f_min, f_max], row=3, col=2)
    fig.update_yaxes(title_text="|X(f)|", row=3, col=2)

    fig.update_layout(
        title_text=title_suffix,
        height=800,
        width=1000,
        showlegend=True,
        legend=dict(font=dict(size=8))
    )

    fig.show()


### With plotly, you can select/unselect legends and also zoom into the plot! 
If it is slow on your computer, try changing the plotting function above to only plot 0.1 seconds of data.

In [2]:
# Case 1: moderately high rate (no obvious degradation)
Fs_target_1 = min(Fs_orig // 2, Fs_orig)  # e.g. 22050 if Fs_orig=44100

plot_sampling_case(
    Fs_target_1,
    out_wav="guitar_down_22k.wav",
    title_suffix=f"Case 1: Fs = {Fs_target_1} Hz (high rate, small change)"
)



### Sample like a telephone call

In [3]:
Fs_target_2 = 5000
if Fs_target_2 >= Fs_orig:
    Fs_target_2 = Fs_orig // 4  # fallback if original is already low-rate

plot_sampling_case(
    Fs_target_2,
    out_wav="guitar_down_5k.wav",
    title_suffix=f"Case 2: Fs = {Fs_target_2} Hz (telephone-like quality)"
)


In [4]:
# Case 3: very low sampling rate (strong loss of detail)
Fs_target_3 = 500
if Fs_target_3 >= Fs_orig:
    Fs_target_3 = max(1000, Fs_orig // 20)  # fallback

plot_sampling_case(
    Fs_target_3,
    out_wav="guitar_down_2k.wav",
    title_suffix=f"Case 3: Fs = {Fs_target_3} Hz (very low, strong degradation)"
)


In [None]:
# END