# 12: Digital Filter Design

This notebook explores the R4W filter architecture for signal processing applications.

## Learning Objectives
- Understand FIR vs IIR filter characteristics
- Design filters using windowed sinc, Kaiser, and Remez methods
- Analyze frequency response and group delay
- Implement sample rate conversion with polyphase filters
- Apply pulse shaping for communications

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
import subprocess
import json

%matplotlib inline
plt.style.use('seaborn-v0_8-whitegrid')

def run_r4w(*args):
    """Run r4w CLI command and return output."""
    result = subprocess.run(['cargo', 'run', '--bin', 'r4w', '--'] + list(args),
                          capture_output=True, text=True, cwd='..')
    return result.stdout

## FIR vs IIR Filters

| Property | FIR | IIR |
|----------|-----|-----|
| **Impulse Response** | Finite | Infinite |
| **Stability** | Always stable | Can be unstable |
| **Phase** | Linear (constant delay) | Nonlinear |
| **Order** | Higher for sharp cutoff | Lower |
| **Latency** | (N-1)/2 samples | Varies with frequency |

## Windowed FIR Design

The ideal lowpass filter has a rectangular frequency response (brick wall), but its impulse response is an infinite sinc function. We truncate and window it.

In [None]:
# Design lowpass filters with different windows
num_taps = 63
cutoff = 0.2  # Normalized (0 to 1)

# Different windows
windows = {
    'Rectangular': signal.firwin(num_taps, cutoff, window='boxcar'),
    'Hamming': signal.firwin(num_taps, cutoff, window='hamming'),
    'Blackman': signal.firwin(num_taps, cutoff, window='blackman'),
    'Kaiser (β=8)': signal.firwin(num_taps, cutoff, window=('kaiser', 8)),
}

# Plot frequency responses
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for name, h in windows.items():
    w, H = signal.freqz(h, worN=2048)
    axes[0].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=name)
    axes[1].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=name)

axes[0].set_xlim([0, 1])
axes[0].set_ylim([-100, 5])
axes[0].set_xlabel('Normalized Frequency')
axes[0].set_ylabel('Magnitude (dB)')
axes[0].set_title('Full Response')
axes[0].legend()
axes[0].grid(True)

axes[1].set_xlim([0, 0.4])
axes[1].set_ylim([-3, 1])
axes[1].set_xlabel('Normalized Frequency')
axes[1].set_ylabel('Magnitude (dB)')
axes[1].set_title('Passband Detail')
axes[1].axhline(-3, color='r', linestyle='--', label='-3 dB')
axes[1].axvline(cutoff, color='g', linestyle='--', label='Cutoff')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.show()

print("Window Comparison:")
print("- Rectangular: Narrowest main lobe, highest sidelobes (-13 dB)")
print("- Hamming: Good balance (-43 dB sidelobes)")
print("- Blackman: Wide main lobe, low sidelobes (-58 dB)")
print("- Kaiser: Adjustable tradeoff via β parameter")

## Kaiser Window Parameter Selection

The Kaiser window provides optimal trade-off between main lobe width and sidelobe level:

- β = 0: Rectangular
- β ≈ 5: Similar to Hamming
- β ≈ 8.6: Similar to Blackman

Given desired attenuation A (dB):
```
β = 0.1102(A - 8.7)        if A > 50
β = 0.5842(A-21)^0.4 + 0.07886(A-21)  if 21 ≤ A ≤ 50
β = 0                      if A < 21
```

In [None]:
# Kaiser parameter vs attenuation
def kaiser_beta(attenuation_db):
    if attenuation_db > 50:
        return 0.1102 * (attenuation_db - 8.7)
    elif attenuation_db >= 21:
        return 0.5842 * (attenuation_db - 21)**0.4 + 0.07886 * (attenuation_db - 21)
    else:
        return 0.0

attenuations = np.linspace(20, 100, 100)
betas = [kaiser_beta(a) for a in attenuations]

plt.figure(figsize=(10, 5))
plt.plot(attenuations, betas, 'b-', linewidth=2)
plt.xlabel('Stopband Attenuation (dB)')
plt.ylabel('Kaiser β')
plt.title('Kaiser β vs Desired Attenuation')
plt.grid(True)

# Mark common values
for atten in [40, 60, 80]:
    beta = kaiser_beta(atten)
    plt.plot(atten, beta, 'ro', markersize=10)
    plt.annotate(f'{atten} dB → β={beta:.2f}', (atten+2, beta), fontsize=10)

plt.show()

## IIR Filter Design

IIR filters use feedback (recursive structure) for efficiency. Common types:

| Type | Passband | Stopband | Group Delay |
|------|----------|----------|-------------|
| **Butterworth** | Maximally flat | Monotonic | Varies |
| **Chebyshev I** | Equiripple | Monotonic | Varies |
| **Chebyshev II** | Flat | Equiripple | Varies |
| **Elliptic** | Equiripple | Equiripple | Varies |
| **Bessel** | Gradual | Gradual | Maximally flat |

In [None]:
# Compare IIR filter types
order = 4
cutoff = 0.2

filters = {
    'Butterworth': signal.butter(order, cutoff, btype='low', analog=False, output='ba'),
    'Chebyshev I (1dB)': signal.cheby1(order, 1, cutoff, btype='low', analog=False, output='ba'),
    'Chebyshev II (40dB)': signal.cheby2(order, 40, cutoff, btype='low', analog=False, output='ba'),
    'Bessel': signal.bessel(order, cutoff, btype='low', analog=False, output='ba', norm='phase'),
}

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Magnitude response
for name, (b, a) in filters.items():
    w, H = signal.freqz(b, a, worN=2048)
    axes[0, 0].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=name)

axes[0, 0].set_xlim([0, 1])
axes[0, 0].set_ylim([-80, 5])
axes[0, 0].set_xlabel('Normalized Frequency')
axes[0, 0].set_ylabel('Magnitude (dB)')
axes[0, 0].set_title('Magnitude Response')
axes[0, 0].legend()
axes[0, 0].grid(True)

# Passband detail
for name, (b, a) in filters.items():
    w, H = signal.freqz(b, a, worN=2048)
    axes[0, 1].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=name)

axes[0, 1].set_xlim([0, 0.3])
axes[0, 1].set_ylim([-5, 2])
axes[0, 1].set_xlabel('Normalized Frequency')
axes[0, 1].set_ylabel('Magnitude (dB)')
axes[0, 1].set_title('Passband Detail')
axes[0, 1].legend()
axes[0, 1].grid(True)

# Phase response
for name, (b, a) in filters.items():
    w, H = signal.freqz(b, a, worN=2048)
    axes[1, 0].plot(w/np.pi, np.unwrap(np.angle(H)), label=name)

axes[1, 0].set_xlim([0, 0.5])
axes[1, 0].set_xlabel('Normalized Frequency')
axes[1, 0].set_ylabel('Phase (radians)')
axes[1, 0].set_title('Phase Response')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Group delay
for name, (b, a) in filters.items():
    w, gd = signal.group_delay((b, a), w=2048)
    axes[1, 1].plot(w/np.pi, gd, label=name)

axes[1, 1].set_xlim([0, 0.5])
axes[1, 1].set_ylim([0, 20])
axes[1, 1].set_xlabel('Normalized Frequency')
axes[1, 1].set_ylabel('Group Delay (samples)')
axes[1, 1].set_title('Group Delay')
axes[1, 1].legend()
axes[1, 1].grid(True)

plt.tight_layout()
plt.show()

print("Key Observations:")
print("- Butterworth: Smooth passband but slowest rolloff")
print("- Chebyshev I: Fastest rolloff but ripple in passband")
print("- Chebyshev II: Flat passband with ripple in stopband")
print("- Bessel: Nearly constant group delay (best for pulse preservation)")

## Parks-McClellan (Remez) Optimal Design

The Remez algorithm designs FIR filters that minimize the maximum error (Chebyshev criterion), producing equiripple behavior in both passband and stopband.

In [None]:
# Compare windowed vs Remez design
num_taps = 63
pass_edge = 0.2
stop_edge = 0.25

# Windowed design
h_kaiser = signal.firwin(num_taps, pass_edge, window=('kaiser', 8))

# Remez design
bands = [0, pass_edge, stop_edge, 1]  # Band edges
desired = [1, 0]  # Desired response in each band
weights = [1, 1]  # Equal weight
h_remez = signal.remez(num_taps, bands, desired, weight=weights, fs=2)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Full response
for h, name in [(h_kaiser, 'Kaiser Window'), (h_remez, 'Remez')]:
    w, H = signal.freqz(h, worN=2048)
    axes[0].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=name, linewidth=2)

axes[0].axvline(pass_edge, color='g', linestyle='--', alpha=0.7)
axes[0].axvline(stop_edge, color='r', linestyle='--', alpha=0.7)
axes[0].set_xlim([0, 0.5])
axes[0].set_ylim([-80, 5])
axes[0].set_xlabel('Normalized Frequency')
axes[0].set_ylabel('Magnitude (dB)')
axes[0].set_title('Kaiser vs Remez Design')
axes[0].legend()
axes[0].grid(True)

# Impulse response
n = np.arange(num_taps)
axes[1].stem(n, h_kaiser, linefmt='b-', markerfmt='bo', basefmt='k-', label='Kaiser')
axes[1].stem(n, h_remez, linefmt='r-', markerfmt='ro', basefmt='k-', label='Remez')
axes[1].set_xlabel('Sample')
axes[1].set_ylabel('Amplitude')
axes[1].set_title('Impulse Response')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.show()

print("Remez advantages:")
print("- Equiripple error distribution (optimal for given length)")
print("- Can specify exact transition band edges")
print("- Configurable passband/stopband weights")

## Polyphase Sample Rate Conversion

Polyphase decomposition enables efficient multirate filtering:

- **Decimation (↓M)**: Filter at high rate, keep every M-th sample
- **Interpolation (↑L)**: Insert zeros, filter to remove images
- **Rational (L/M)**: Interpolate by L, decimate by M

In [None]:
# Demonstrate decimation
fs = 48000  # Original sample rate
M = 4       # Decimation factor
fs_new = fs // M

# Create test signal: 1 kHz + 5 kHz
t = np.arange(0, 0.01, 1/fs)
x = np.cos(2*np.pi*1000*t) + 0.5*np.cos(2*np.pi*5000*t)

# Anti-aliasing filter
h = signal.firwin(64, 1/(2*M))  # Cutoff at new Nyquist

# Filter and decimate
x_filtered = signal.lfilter(h, 1, x)
x_decimated = x_filtered[::M]
t_decimated = t[::M]

fig, axes = plt.subplots(2, 2, figsize=(14, 8))

# Time domain
axes[0, 0].plot(t*1000, x, 'b-', alpha=0.7, label='Original')
axes[0, 0].plot(t*1000, x_filtered, 'g-', label='Filtered')
axes[0, 0].set_xlabel('Time (ms)')
axes[0, 0].set_ylabel('Amplitude')
axes[0, 0].set_title(f'Original Signal ({fs} Hz)')
axes[0, 0].legend()
axes[0, 0].grid(True)

axes[0, 1].plot(t_decimated*1000, x_decimated, 'r.-')
axes[0, 1].set_xlabel('Time (ms)')
axes[0, 1].set_ylabel('Amplitude')
axes[0, 1].set_title(f'Decimated Signal ({fs_new} Hz)')
axes[0, 1].grid(True)

# Spectrum
f_orig = np.fft.fftfreq(len(x), 1/fs)[:len(x)//2]
X_orig = np.abs(np.fft.fft(x))[:len(x)//2]
axes[1, 0].plot(f_orig/1000, 20*np.log10(X_orig+1e-10))
axes[1, 0].axvline(fs_new/2/1000, color='r', linestyle='--', label=f'New Nyquist ({fs_new//2} Hz)')
axes[1, 0].set_xlabel('Frequency (kHz)')
axes[1, 0].set_ylabel('Magnitude (dB)')
axes[1, 0].set_title('Original Spectrum')
axes[1, 0].legend()
axes[1, 0].grid(True)

f_dec = np.fft.fftfreq(len(x_decimated), 1/fs_new)[:len(x_decimated)//2]
X_dec = np.abs(np.fft.fft(x_decimated))[:len(x_decimated)//2]
axes[1, 1].plot(f_dec/1000, 20*np.log10(X_dec+1e-10))
axes[1, 1].set_xlabel('Frequency (kHz)')
axes[1, 1].set_ylabel('Magnitude (dB)')
axes[1, 1].set_title('Decimated Spectrum')
axes[1, 1].grid(True)

plt.tight_layout()
plt.show()

print(f"Original: {len(x)} samples at {fs} Hz")
print(f"Decimated: {len(x_decimated)} samples at {fs_new} Hz")
print(f"5 kHz component removed by anti-aliasing filter")

## Pulse Shaping for Communications

Raw digital pulses (rectangular) have infinite bandwidth. Pulse shaping:
- Limits bandwidth (spectral efficiency)
- Reduces intersymbol interference (ISI)
- Improves adjacent channel performance

In [None]:
# Root Raised Cosine pulse shaping
def rrc_filter(num_taps, rolloff, sps):
    """Generate Root Raised Cosine filter coefficients."""
    t = np.arange(-num_taps//2, num_taps//2 + 1) / sps
    h = np.zeros_like(t)
    
    for i, ti in enumerate(t):
        if abs(ti) < 1e-10:
            h[i] = 1 - rolloff + 4*rolloff/np.pi
        elif abs(abs(ti) - 1/(4*rolloff)) < 1e-10:
            h[i] = rolloff/np.sqrt(2) * ((1+2/np.pi)*np.sin(np.pi/(4*rolloff)) + 
                                          (1-2/np.pi)*np.cos(np.pi/(4*rolloff)))
        else:
            num = np.sin(np.pi*ti*(1-rolloff)) + 4*rolloff*ti*np.cos(np.pi*ti*(1+rolloff))
            den = np.pi*ti*(1-(4*rolloff*ti)**2)
            h[i] = num / den if abs(den) > 1e-10 else 0
    
    return h / np.sqrt(np.sum(h**2))

# Compare different roll-off factors
sps = 8  # Samples per symbol
num_taps = 8 * sps + 1

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for alpha in [0.1, 0.25, 0.5, 1.0]:
    h = rrc_filter(num_taps, alpha, sps)
    w, H = signal.freqz(h, worN=2048)
    
    axes[0].plot(np.arange(len(h))/sps - len(h)//2/sps, h, label=f'α={alpha}')
    axes[1].plot(w/np.pi, 20*np.log10(np.abs(H)+1e-10), label=f'α={alpha}')

axes[0].set_xlabel('Symbol Periods')
axes[0].set_ylabel('Amplitude')
axes[0].set_title('RRC Impulse Response')
axes[0].legend()
axes[0].grid(True)
axes[0].axhline(0, color='k', linewidth=0.5)

axes[1].set_xlim([0, 0.25])
axes[1].set_ylim([-60, 5])
axes[1].set_xlabel('Normalized Frequency')
axes[1].set_ylabel('Magnitude (dB)')
axes[1].set_title('RRC Frequency Response')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.show()

print("Roll-off factor α:")
print("- α = 0: Sinc (infinite bandwidth theoretically, maximum ISI tolerance)")
print("- α = 0.25-0.35: Common in practice (good tradeoff)")
print("- α = 1: Maximum bandwidth, best ISI tolerance")

## R4W Filter API Example

R4W provides a comprehensive filter API in `r4w_core::filters`:

```rust
use r4w_core::filters::{
    FirFilter, IirFilter, Filter,
    PolyphaseDecimator, PolyphaseInterpolator,
    RemezSpec, Window,
};

// FIR lowpass
let mut lpf = FirFilter::lowpass(1000.0, 8000.0, 63);
let output = lpf.process_block(&input);

// IIR Butterworth
let mut butter = IirFilter::butterworth_lowpass(4, 1000.0, 8000.0);
let output = butter.process_block(&input);

// Polyphase decimation
let mut dec = PolyphaseDecimator::new(4, 64);
let downsampled = dec.process(&input);

// Remez optimal design
let coeffs = RemezSpec::lowpass(0.2, 0.3)
    .with_num_taps(63)
    .design();
```

## Exercises

1. **Filter Order**: Design lowpass filters with orders 4, 8, 16 and compare transition width.

2. **Weight Tradeoff**: Use Remez to design a filter with 10x weight on stopband. How does it affect passband ripple?

3. **Multirate System**: Design a 48kHz → 8kHz decimation chain. Compare single-stage vs. cascaded 2x stages.

4. **Matched Filter**: Implement TX with RRC filter, add noise, RX with matched RRC. Verify zero ISI at symbol centers.

In [None]:
# Your code here
