# Week 12 Live Coding Demo — Fourier Transform & Sampling Theory


In [None]:
# Common helpers for spectra and windows used across exercises.
import numpy as np, matplotlib.pyplot as plt

def one_sided_spectrum(x, fs, window=None, zero_pad_factor=1):
    """Compute a properly scaled one-sided amplitude spectrum.
    - Scales so a pure sine of amplitude A shows A at its peak in the one-sided spectrum.
    - Doubles interior bins to conserve total amplitude for real signals.
    - Zero padding only densifies the frequency grid; it adds no new information.
    """
    x = np.asarray(x)
    N = len(x)
    xw = x * window if window is not None else x
    Nz = max(N, int(N * int(zero_pad_factor))) # If Nz > 1, np.fft.rfft() will zero-pad to Nz
    X_full = np.fft.rfft(xw, n=Nz)
    f = np.fft.rfftfreq(Nz, d=1/fs)
    mag = np.abs(X_full) / N
    if len(mag) > 2:
        mag[1:-1] *= 2.0
    return f, mag, X_full

def hann(N):
    n = np.arange(N)
    return 0.5 - 0.5*np.cos(2*np.pi*n/N)

def blackman(N):
    n = np.arange(N)
    a0, a1, a2 = 0.42, 0.5, 0.08
    return a0 - a1*np.cos(2*np.pi*n/N) + a2*np.cos(4*np.pi*n/N)

rng = np.random.default_rng(10)  # reproducible noise
plt.rcParams.update({'figure.dpi': 130})

## Exercise 1 — Diagnose hidden periodic disturbances in a magnetometer log
Physics story: a magnetometer near a pump shows weak periodic disturbances on top of drift + noise. We want to find the disturbance frequencies and amplitudes and clean the data.
- Step 1/4: Simulate slow drift + 2 weak tones + noise, plot full + zoom.
- Step 2/4: Detrend (remove best-fit line) so the FFT isn't dominated by DC.
- Step 3/4: Compute one-sided spectra with and without windowing and read peaks.
- Step 4/4: Zero-pad to refine the frequency grid (no new info, just denser bins).

In [None]:
# ---- Step 1/4: build a realistic time series with drift, two tones, and noise ----
fs = 300.0            # 300 samples/s (typical DAQ or microcontroller)
T  = 6.0              # observe for 6 s -> frequency resolution ≈ 1/T ≈ 0.167 Hz
t  = np.arange(0, T, 1/fs)

# two weak periodic components (unknown to the analyst) + drift + Gaussian noise
x_true = 2e-9*np.sin(2*np.pi*1.8*t + np.deg2rad(15)) + 1e-9*np.cos(2*np.pi*8.5*t + np.deg2rad(-40))
drift  = 4e-9*(t/T)   # slow linear drift (e.g., temperature or sensor bias)
noise  = 0.8e-9*rng.normal(size=len(t))
signal = x_true + drift + noise

# Plot: whole record and a zoomed 2 s window to see the noise texture.
fig, ax = plt.subplots(2,1, figsize=(7,4.6), sharex=False)
ax[0].plot(t, signal, label='measured B-field')
ax[0].set_title('Magnetometer: slow drift + weak periodic disturbances + noise')
ax[0].set_xlabel('time [s]'); ax[0].set_ylabel('B [Tesla]'); ax[0].legend()
ax[1].plot(t[:600], signal[:600], label='first 2 s zoom')
ax[1].set_xlabel('time [s]'); ax[1].set_ylabel('B [Tesla]'); ax[1].legend()
plt.tight_layout(); plt.show()

In [None]:
# ---- Step 2/4: detrend (remove constant + linear trend) before FFT ----
N = len(t)
A = np.vstack([np.ones_like(t), t]).T           # columns: [1, t]
coef, *_ = np.linalg.lstsq(A, signal, rcond=None)
trend = A @ coef                                # best-fit line, B_linear = a0 + a1 * t
detrended = signal - trend

plt.figure(figsize=(7,3))
plt.plot(t, signal, alpha=.5, label='raw')
plt.plot(t, trend, 'k', lw=2, label='fitted trend')
plt.plot(t, detrended, label='detrended')
plt.title('Detrending removes DC and slow drift before FFT')
plt.xlabel('time [s]'); plt.ylabel('B [Tesla]'); plt.legend(); plt.tight_layout(); plt.show()

In [None]:
# ---- Step 3/4: spectra with and without a window (Hann reduces leakage) ----
f_rect, mag_rect, _ = one_sided_spectrum(detrended, fs, window=None,    zero_pad_factor=1)
f_hann, mag_hann, _ = one_sided_spectrum(detrended, fs, window=hann(N), zero_pad_factor=1)

plt.figure(figsize=(7,3.6))
plt.semilogy(f_rect, mag_rect, alpha=.5, label='rectangular')
plt.semilogy(f_hann, mag_hann, label='Hann window')
plt.xlim(0, 30); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude [Tesla]')
plt.ylim([1e-12, 1e-8])
plt.title('Spectrum with and without windowing (note leakage difference)')
plt.legend(); plt.tight_layout(); plt.show()

# Read off a few strong peaks in a band of interest (0.5–15 Hz)
idx_band = (f_hann>0.5) & (f_hann<15)
top = np.argsort(mag_hann[idx_band])[-5:]
print('Top 5 magnitudes in 0.5–15 Hz (Hann):')
for k in top[::-1]:
    print(f'  f ≈ {f_hann[idx_band][k]:6.2f} Hz,  |X| ≈ {mag_hann[idx_band][k]:.3e} Tesla')

In [None]:
# ---- Step 4/4: zero-padding to refine the frequency grid (no new information) ----
f_hann_zp, mag_hann_zp, _ = one_sided_spectrum(detrended, fs, window=hann(N), zero_pad_factor=8)

plt.figure(figsize=(7,3.6))
plt.plot(f_hann_zp, mag_hann_zp, label='Hann + 8× zero-pad')
plt.xlim(0, 20); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude [Tesla]')
plt.title('Zero-padding refines the frequency axis (bin spacing smaller)')
plt.grid(True, alpha=0.3); plt.legend(); plt.tight_layout(); plt.show()

## Exercise 2 — Window choice: leakage versus amplitude bias
Physics story: a rotation stage encoder emits a tone at frequency f0, but the capture window cuts mid-cycle.
- Step 1/2: Compare rectangular, Hann, Blackman windows for leakage.
- Step 2/2: Estimate amplitude near the true tone and see the bias trade-off.

In [None]:
# ---- Step 1/2: compare leakage across windows ----
fs = 256.0; T = 1.5
t  = np.arange(0, T, 1/fs)
f0 = 14.2                                   # non-integer cycles in window -> leakage expected
x  = 1.0*np.sin(2*np.pi*f0*t + np.deg2rad(20))
N = len(t); w_rect = np.ones(N); w_hann = hann(N); w_blk = blackman(N)

specs = {}
for name, w in [('rect', w_rect), ('hann', w_hann), ('blackman', w_blk)]:
    f, mag, _ = one_sided_spectrum(x, fs, window=w, zero_pad_factor=1)
    specs[name] = (f, mag)

plt.figure(figsize=(7,3.6))
for name, (f, mag) in specs.items():
    plt.plot(f, mag, label=name)
plt.xlim(0, 40); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude')
plt.title('Leakage vs window choice (same signal, different windows)')
plt.legend(); plt.tight_layout(); plt.show()

In [None]:
# ---- Step 2/2: estimate amplitude near the true frequency ----
def estimate_amplitude_near(f, mag, f0, band=2.0):
    mask = (f > f0-band) & (f < f0+band)
    return f[mask][np.argmax(mag[mask])], mag[mask].max()

for name in specs:
    f, mag = specs[name]
    fpeak, apeak = estimate_amplitude_near(f, mag, f0, band=2.0)
    print(f"{name:8s}  peak @ {fpeak:6.2f} Hz   est amplitude ~ {apeak:.3f}")

# Notes: Hann and Blackman reduce leakage (good for revealing weak neighbors) but widen the main lobe
# and change the coherent gain (amplitude bias). Proper calibration can correct the bias if needed.

## Exercise 3 — Remove line noise with a frequency-domain notch filter
Physics story: a photodiode monitoring a LED shows desired 35 Hz flicker plus line noise at 60 Hz and 120 Hz.
- Step 1/3: Inspect the spectrum and identify the line noise peaks.
- Step 2/3: Build a smooth notch (cosine taper) around 60/120 Hz, filter in frequency, inverse FFT.
- Step 3/3: Compare narrow vs wide notch (trade-off: more suppression vs more distortion).

In [None]:
# ---- Step 1/3: build signal and look at spectrum ----
fs = 500.0; T = 3.0
t = np.arange(0, T, 1/fs)
signal = 2 * np.sign(np.sin(2 * np.pi * 21 * t + np.deg2rad(-10)))  # square wave at 21 Hz
line   = 0.9*np.sin(2*np.pi*60*t) + 0.25*np.sin(2*np.pi*120*t)
noise  = 0.3*rng.normal(size=len(t))
x = signal + line + noise

plt.figure(figsize=(7,3.2))
plt.plot(t[:600], x[:600], label='raw noisy data')
plt.xlabel('time [s]'); plt.ylabel('signal'); plt.title('Raw noisy data: square wave + line noise + noise')
plt.legend(); plt.tight_layout(); plt.show()

f0, mag0, _ = one_sided_spectrum(x, fs, window=hann(len(x)), zero_pad_factor=4)
plt.figure(figsize=(7,3.6))
plt.plot(f0, mag0)
plt.xlim(0, 200); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude')
plt.title('Raw spectrum shows 60 Hz and 120 Hz')
plt.tight_layout(); plt.show()

In [None]:
# ---- Step 2/3: design a smooth notch (cosine taper) and filter in frequency ----
N = len(t)
X_full = np.fft.rfft(x)                 # window first for a cleaner spectrum
f = np.fft.rfftfreq(N, 1/fs)

def cosine_taper(f, f0, width):
    """Raised-cosine notch centered at f0, full width 'width' (Hz).
       1 outside, dips to 0 at center, returns to 1 at edges."""
    g = np.ones_like(f)                                        # default = pass
    half = width / 2.0                                         # half width
    band = np.abs(f - f0) <= half                              # notch region
    s = (f[band] - f0) / half                                  # map to [-1, 1]
    g[band] = 0.5 * (1.0 - np.cos(np.pi * s))                  # 0 at s=0, 1 at s=±1
    return g

G = cosine_taper(f, 60.0, 8.0) * cosine_taper(f, 120.0, 6.0)

plt.figure(figsize=(7,3.2))
plt.plot(f, G)
plt.xlim(0, 200); plt.xlabel('frequency [Hz]'); plt.ylabel('filter gain')
plt.title('Notch filter frequency response (60 Hz and 120 Hz notches)')
plt.grid(True, alpha=0.3); plt.tight_layout(); plt.show()

Y = X_full * G
y = np.fft.irfft(Y, n=N)

# Time domain: short segment for clarity
plt.figure(figsize=(7,3.2))
plt.plot(t[:600], x[:600], alpha=.5, label='raw')
plt.plot(t[:600], y[:600], label='after notch')
plt.xlabel('time [s]'); plt.ylabel('signal'); plt.title('Time domain: removing 60/120 Hz hum')
plt.xlim(1, 0.4)
plt.legend(); plt.tight_layout(); plt.show()

# Frequency domain before/after
f1, mag1, _ = one_sided_spectrum(y, fs, window=hann(len(y)), zero_pad_factor=4)
plt.figure(figsize=(7,3.6))
plt.plot(f0, mag0, alpha=.5, label='before')
plt.plot(f1, mag1, label='after notch')
plt.xlim(0, 200); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude')
plt.title('Spectrum before/after notch filtering'); plt.legend(); plt.tight_layout(); plt.show()

In [None]:
# ---- Step 3/3: narrow vs wide notch trade-off ----
def apply_notch(x, fs, f0=60.0, w0=6.0, f1=120.0, w1=6.0):
    N = len(x); X = np.fft.rfft(x); f = np.fft.rfftfreq(N, 1/fs)
    G = cosine_taper(f, f0, w0) * cosine_taper(f, f1, w1)
    y = np.fft.irfft(X * G, n=N)
    return y

y_narrow = apply_notch(x, fs, 60.0, 1.0, 120.0, 1.0)
y_wide   = apply_notch(x, fs, 60.0, 54.0, 120.0, 30.0)

fN, magN, _ = one_sided_spectrum(y_narrow, fs, window=hann(len(x)), zero_pad_factor=4)
fW, magW, _ = one_sided_spectrum(y_wide,   fs, window=hann(len(x)), zero_pad_factor=4)

plt.figure(figsize=(7,3.6))

plt.semilogy(fN, magN, alpha=.6, label='narrow notch')
plt.semilogy(fW, magW, alpha=.6, label='wide notch')
plt.semilogy(f0, mag0, 'k--', lw=1, alpha=.3, label='before')
plt.xlim(0, 200); plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude')
plt.title('Notch width trade-off in frequency'); plt.legend(); plt.tight_layout(); plt.show()

plt.figure(figsize=(7,3.2))
plt.plot(t[:600], y_narrow[:600], label='narrow', alpha=0.6)
plt.plot(t[:600], y_wide[:600], label='wide', alpha=0.6)
plt.xlim([0, 0.4])
plt.xlabel('time [s]'); plt.ylabel('signal'); #plt.title('Time ringing grows with sharper frequency cuts')
plt.legend(); plt.tight_layout(); plt.show()

## Exercise 4 — Aliasing in practice and how anti-alias filters help
Physics story: a rotating wheel produces a 130 Hz optical signal. Sampling too slowly makes it **look** slower (aliasing).
- Step 1/3: Compare safe vs unsafe sampling rates in time domain; compute alias frequency.
- Step 2/3: FFT of unsafe sequence shows peak below Nyquist.
- Step 3/3: Anti-alias: low-pass before downsampling, then decimate to a lower rate safely.

In [None]:
# ---- Step 1/3: safe vs unsafe sampling in time ----
f_true = 130.0
f_true_2 = 40
fs_safe = 500.0
fs_low  = 150.0
T = 2.0
t_safe = np.arange(0, T, 1/fs_safe)
t_low  = np.arange(0, T, 1/fs_low)
x_safe = np.sin(2*np.pi*f_true*t_safe) + 0.4 * np.sin(2*np.pi*f_true_2*t_safe)
x_low  = np.sin(2*np.pi*f_true*t_low) + 0.4 * np.sin(2*np.pi*f_true_2*t_low)

plt.figure(figsize=(7,3.2))
plt.plot(t_safe[:51], x_safe[:51], label='500 Hz sampling')
plt.stem(t_low[:16],  x_low[:16], basefmt='--', linefmt='C1-', markerfmt='C1o', label='150 Hz sampling')
plt.xlabel('time [s]'); plt.ylabel('amplitude'); plt.title('130 Hz signal sampled at different rate')
plt.legend(loc='lower right'); plt.tight_layout(); plt.show()

def aliased_frequency(f, fs):
    m = int(np.round(f / fs))      # fold by nearest multiple of fs
    return abs(f - m*fs)

print('Aliased frequency at 150 Hz sampling:', aliased_frequency(f_true, fs_low), f'Hz instead of {f_true} Hz')

In [None]:
# ---- Step 2/3: unsafe spectrum shows aliased peak below Nyquist ----
# Compute spectrum for unsafe (aliased) and true (safe) sampling
f_low, mag_low, _ = one_sided_spectrum(x_low, fs_low, window=hann(len(x_low)), zero_pad_factor=4)
f_safe, mag_safe, _ = one_sided_spectrum(x_safe, fs_safe, window=hann(len(x_safe)), zero_pad_factor=4)

plt.figure(figsize=(7,3.2))
plt.plot(f_safe, mag_safe, '-', alpha=0.7, label='500 Hz sampling (true)')
plt.plot(f_low, mag_low+0.5, label='150 Hz sampling (aliased)')
plt.axvline(fs_low/2, color='r', linestyle='--', label='Nyquist (150 Hz)')
plt.xlabel('frequency [Hz]')
plt.ylabel('scaled magnitude')
plt.title('Aliased spectrum peak vs true spectrum')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# ---- Step 3/3: anti-alias low-pass before downsampling ----
def raised_cosine_lowpass(f, cutoff, trans):
    """Smooth low-pass: 1 below (cutoff-trans/2), 0 above (cutoff+trans/2)."""
    H = np.ones_like(f)
    f1, f2 = cutoff - trans/2, cutoff + trans/2
    band = (f>=f1) & (f<=f2)
    H[f>f2] = 0.0
    phi = (f[band]-f1)/trans * np.pi
    H[band] = 0.5*(1 + np.cos(phi))
    return H

# Low-pass the safe-rate data at 80 Hz (with 20 Hz transition) and then downsample near 180 Hz
X = np.fft.rfft(x_safe)
f = np.fft.rfftfreq(len(x_safe), 1/fs_safe)
H = raised_cosine_lowpass(f, cutoff=80.0, trans=20.0)
y_safe = np.fft.irfft(X*H, n=len(x_safe))

M = int(round(fs_safe/fs_low))             # integer decimation factor
y_ds = y_safe[::M]
fs_ds = fs_safe / M
print('Downsample factor M =', M, ' -> new fs ≈', fs_ds, 'Hz')

f_ds, mag_ds, _ = one_sided_spectrum(y_ds, fs_ds, window=hann(len(y_ds)), zero_pad_factor=1)
plt.figure(figsize=(7,3.2))
plt.plot(f_low, mag_low, alpha=.5, label=f'unsafe {fs_low} Hz (aliased)')
plt.plot(f_ds,  mag_ds, alpha=0.5, label='LPF then downsample (anti-aliased)')
plt.axvline(fs_ds/2, color='k', linestyle='--', label='Nyquist (downsampled)')
plt.xlabel('frequency [Hz]'); plt.ylabel('scaled magnitude'); plt.title('Anti-alias low-pass prevents folding')
plt.legend(); plt.tight_layout(); plt.show()

## Exercise 5 (Bonus) — Short-time spectrum (spectrogram) of an accelerating cart
Physics story: a cart with a reflective pattern passes a photodiode; the tick rate increases as it accelerates.
- Step 1/2: Build a noisy signal with linearly increasing instantaneous frequency and visualize the time series.
- Step 2/2: Compute a short-time spectrum (overlapping Hann windows) to track frequency vs time; compare window sizes.

In [None]:
# ---- Step 1/2: accelerating cart signal ----
fs = 400.0; T = 4.0
t = np.arange(0, T, 1/fs)
# Make the instantaneous frequency increase nonlinearly, e.g., quadratically (like a chirp in LIGO detection)
f_inst = 5 + 20*(t/T)**2                   # 10 Hz → 40 Hz, but nonlinear (quadratic) increase
# Phase(t) = integral of 2π f_inst dt; we approximate by cumulative sum at fs
x = 0.9*np.sin(2*np.pi * np.cumsum(f_inst)/fs) + 0.25*rng.normal(size=len(t))

fig, ax = plt.subplots(2,1, figsize=(7,4.4))
ax[0].plot(t[:800], x[:800]); ax[0].set_title('Accelerating cart signal (first 2 s)')
ax[0].set_xlabel('t [s]'); ax[0].set_ylabel('signal')
ax[1].plot(t, x); ax[1].set_title('Full record with noise')
ax[1].set_xlabel('t [s]'); ax[1].set_ylabel('signal')
plt.tight_layout(); plt.show()

In [None]:
# ---- Step 2/2: simple STFT (short-time Fourier transform) with overlapping Hann windows ----
def hann(N):
    n = np.arange(N); return 0.5 - 0.5*np.cos(2*np.pi*n/N)

def stft_mag(x, fs, win_len=256, hop=64):
    N = len(x); w = 1 #hann(win_len)
    cols = 1 + (N - win_len) // hop
    spec = []; times = []
    for c in range(cols):
        i0 = c*hop
        seg = x[i0:i0+win_len]
        X = np.fft.rfft(seg*w)
        spec.append(np.abs(X))
        times.append((i0 + win_len/2)/fs)  # center time of the window
    spec = np.array(spec).T
    freqs = np.fft.rfftfreq(win_len, 1/fs)
    times = np.array(times)
    return freqs, times, spec

freqs, times, S = stft_mag(x, fs, win_len=256, hop=64)
plt.figure(figsize=(7.2,3.8))
plt.imshow(S, origin='lower', aspect='auto', extent=[times[0], times[-1], freqs[0], freqs[-1]])
plt.colorbar(label='magnitude'); plt.xlabel('time [s]'); plt.ylabel('frequency [Hz]')
plt.title('Short-time spectrum tracks increasing frequency')
plt.ylim(0, 40); plt.tight_layout(); plt.show()

# Compare shorter and longer windows: time vs frequency resolution trade-off
for win in [128, 512]:
    f2, t2, S2 = stft_mag(x, fs, win_len=win, hop=win//4)
    plt.figure(figsize=(7.2,3.0))
    plt.imshow(S2, origin='lower', aspect='auto', extent=[t2[0], t2[-1], f2[0], f2[-1]])
    plt.colorbar(label='magnitude'); plt.xlabel('time [s]'); plt.ylabel('frequency [Hz]')
    plt.title(f'Window {win}: time vs frequency resolution trade-off')
    plt.ylim(0, 40); plt.tight_layout(); plt.show()