# Interactive filtering explorer (ObsPy)
### Sharp transient + noise + linear trend • Filter type, corners, corner frequencies, and filter family

This notebook lets you *interactively* explore filtering a sharp transient embedded in noise with a linear trend.

**Controls**
- Filter: **lowpass / highpass / bandpass**
- Corner frequency (or band edges)
- Filter order (`corners` in ObsPy)
- Filter family (Butterworth, Chebyshev I/II, Elliptic, Bessel)
- One-way (**causal**) vs two-way (**zero-phase / acausal**) application

**You will see the same filter in different representations**
- **Impulse response** (time-domain)
- **Frequency response** (magnitude + phase)
- **Difference equation** and **z-transform** \(H(z)\)
- **Pole–zero** plot in the z-plane

**And its effect on the signal**
- Raw vs filtered **zoom around the transient**
- Raw vs filtered **spectra overlay**

> Notes  
> - Filtering of the *signal* is done via **ObsPy** `Trace.filter(...)`.  
> - The “representations” (poles/zeros, difference equation, etc.) are computed from the **equivalent IIR design** that ObsPy uses under the hood (SciPy’s IIR filter design).



## 0) Setup

If you don’t have these packages:
```bash
conda install -c conda-forge obspy ipywidgets scipy matplotlib
# or
pip install obspy ipywidgets scipy matplotlib
```

If widgets don’t display, your environment may need ipywidgets enabled.


In [1]:

import numpy as np
import matplotlib.pyplot as plt

from obspy import Trace, UTCDateTime

# For filter “representations” (equivalent design)
from scipy import signal

import ipywidgets as widgets
from IPython.display import display, Markdown, Latex, clear_output


In [2]:

# ---------- Signal generation: sharp transient + noise + linear trend ----------

def ricker(t, t0, f0):
    tau = t - t0
    a = (np.pi * f0 * tau)**2
    return (1 - 2*a) * np.exp(-a)

def make_test_signal(fs=100.0, T=20.0, t_on=8.0, seed=0,
                     trend_slope=0.02, noise_std=0.05,
                     pulse_amp=1.0, pulse_f0=6.0):
    t = np.arange(0, T, 1/fs)
    rng = np.random.default_rng(seed)

    x = np.zeros_like(t)

    # Step-like onset + sharp pulse
    x[t >= t_on] += 0.8
    x += pulse_amp * 0.5 * ricker(t, t_on, pulse_f0)

    # Linear trend
    x += trend_slope * (t - t.mean())

    # Noise
    x += noise_std * rng.standard_normal(len(t))

    return t, x

def make_trace(x, fs):
    tr = Trace(data=np.asarray(x, dtype=np.float64).copy())
    tr.stats.sampling_rate = float(fs)
    tr.stats.starttime = UTCDateTime(0)
    return tr

def amp_spectrum(x, fs, nfft=None):
    x = np.asarray(x)
    if nfft is None:
        nfft = int(2**np.ceil(np.log2(len(x))))
    X = np.fft.rfft(x, n=nfft)
    f = np.fft.rfftfreq(nfft, d=1/fs)
    return f, np.abs(X)



## 1) Filter helpers

ObsPy’s `Trace.filter()` applies an IIR filter (commonly Butterworth) and can optionally run it forward-backward for zero-phase filtering.

We:
- apply the filter to the signal using **ObsPy**
- compute equivalent **(b, a)** coefficients using SciPy so we can show DSP representations

This keeps filtering “in ObsPy” while still teaching the math objects students will see in DSP.


In [3]:

def obspy_apply_filter(tr, filt_kind, corners, zerophase, freq=None, freqmin=None, freqmax=None,
                       ftype="butter", rp=1.0, rs=40.0):
    out = tr.copy()
    # ObsPy forwards extra kwargs (ftype/rp/rs) into obspy.signal.filter.*
    if filt_kind == "lowpass":
        out.filter("lowpass", freq=freq, corners=corners, zerophase=zerophase, ftype=ftype, rp=rp, rs=rs)
    elif filt_kind == "highpass":
        out.filter("highpass", freq=freq, corners=corners, zerophase=zerophase, ftype=ftype, rp=rp, rs=rs)
    elif filt_kind == "bandpass":
        out.filter("bandpass", freqmin=freqmin, freqmax=freqmax, corners=corners, zerophase=zerophase,
                   ftype=ftype, rp=rp, rs=rs)
    else:
        raise ValueError("filt_kind must be lowpass/highpass/bandpass")
    return out

def equivalent_iir_ba(fs, filt_kind, corners, freq=None, freqmin=None, freqmax=None,
                      ftype="butter", rp=1.0, rs=40.0):
    """Build an IIR filter design (b, a) for displaying representations."""
    nyq = 0.5 * fs

    if filt_kind == "lowpass":
        Wn = freq / nyq
        btype = "lowpass"
    elif filt_kind == "highpass":
        Wn = freq / nyq
        btype = "highpass"
    elif filt_kind == "bandpass":
        Wn = [freqmin / nyq, freqmax / nyq]
        btype = "bandpass"
    else:
        raise ValueError("bad filt_kind")

    b, a = signal.iirfilter(
        N=int(corners),
        Wn=Wn,
        btype=btype,
        ftype=ftype,
        rp=float(rp),
        rs=float(rs),
        output="ba"
    )

    # Normalize so a[0] = 1
    b = b / a[0]
    a = a / a[0]
    meta = dict(Wn=Wn, btype=btype, ftype=ftype, rp=rp, rs=rs)
    return b, a, meta

def format_difference_equation(b, a, max_terms=10):
    b = np.asarray(b); a = np.asarray(a)
    nb = min(len(b), max_terms)
    na = min(len(a), max_terms)

    bx_terms = []
    for k in range(nb):
        coef = b[k]
        if np.isclose(coef, 0): 
            continue
        bx_terms.append(f"{coef:+.4g}\\,x[n-{k}]")
    ay_terms = []
    for k in range(1, na):
        coef = a[k]
        if np.isclose(coef, 0):
            continue
        ay_terms.append(f"{coef:+.4g}\\,y[n-{k}]")
    bx = " ".join(bx_terms) if bx_terms else "0"
    ay = " ".join(ay_terms) if ay_terms else "0"
    return f"y[n] = ({bx}) - ({ay})"

def format_Hz(b, a, max_terms=10):
    b = np.asarray(b); a = np.asarray(a)
    nb = min(len(b), max_terms)
    na = min(len(a), max_terms)

    def poly(coeffs, nterms):
        terms = []
        for k in range(nterms):
            c = coeffs[k]
            if np.isclose(c, 0):
                continue
            if k == 0:
                terms.append(f"{c:.4g}")
            else:
                terms.append(f"{c:+.4g} z^{{-{k}}}")
        return " ".join(terms) if terms else "0"

    num = poly(b, nb)
    den = poly(a, na)
    return f"H(z)=\\frac{{{num}}}{{{den}}}"

def impulse_response_from_ba(b, a, n=256):
    x = np.zeros(n); x[0] = 1.0
    return signal.lfilter(b, a, x)

def plot_pz_from_ba(b, a, title="Pole–zero plot (z-plane)"):
    z, p, _k = signal.tf2zpk(b, a)

    fig, ax = plt.subplots(figsize=(5, 5))
    th = np.linspace(0, 2*np.pi, 800)
    ax.plot(np.cos(th), np.sin(th))
    ax.axhline(0); ax.axvline(0)

    if len(z):
        ax.scatter(np.real(z), np.imag(z), facecolors='none', edgecolors='C0', s=60, label='zeros')
    if len(p):
        ax.scatter(np.real(p), np.imag(p), marker='x', color='C1', s=60, label='poles')

    ax.set_aspect('equal', 'box')
    ax.set_title(title)
    ax.set_xlabel("Re{z}"); ax.set_ylabel("Im{z}")
    ax.set_xlim(-1.5, 1.5); ax.set_ylim(-1.5, 1.5)
    ax.grid(True, alpha=0.25)
    ax.legend()
    plt.show()

def plot_freq_response_from_ba(b, a, fs, title="Frequency response"):
    w, h = signal.freqz(b, a, worN=4096, fs=fs)

    fig, ax = plt.subplots(figsize=(10, 3))
    ax.plot(w, 20*np.log10(np.maximum(np.abs(h), 1e-12)))
    ax.set_title(title + " (magnitude)")
    ax.set_xlabel("Frequency (Hz)")
    ax.set_ylabel("Magnitude (dB)")
    ax.set_xlim(0, fs/2)
    ax.grid(True, alpha=0.3)
    plt.show()

    fig, ax = plt.subplots(figsize=(10, 3))
    ax.plot(w, np.unwrap(np.angle(h)))
    ax.set_title(title + " (phase)")
    ax.set_xlabel("Frequency (Hz)")
    ax.set_ylabel("Phase (rad)")
    ax.set_xlim(0, fs/2)
    ax.grid(True, alpha=0.3)
    plt.show()



## 2) Interactive explorer

**Top:** choose the filter  
**Middle:** multiple representations of the same filter  
**Bottom:** raw vs filtered around the transient (and spectra overlay)


In [4]:

# -------------------- Widgets --------------------

w_kind = widgets.Dropdown(
    options=[("Lowpass", "lowpass"), ("Highpass", "highpass"), ("Bandpass", "bandpass")],
    value="bandpass",
    description="Filter:"
)

w_apply = widgets.ToggleButtons(
    options=[("One-way (causal)", False), ("Two-way (zero-phase)", True)],
    value=False,
    description="Apply:"
)

w_ftype = widgets.Dropdown(
    options=[("Butterworth", "butter"),
             ("Chebyshev I", "cheby1"),
             ("Chebyshev II", "cheby2"),
             ("Elliptic", "ellip"),
             ("Bessel", "bessel")],
    value="butter",
    description="Family:"
)

w_corners = widgets.IntSlider(value=4, min=1, max=12, step=1, description="Corners:")
w_freq = widgets.FloatSlider(value=2.0, min=0.1, max=20.0, step=0.1, description="Freq (Hz):")
w_fmin = widgets.FloatSlider(value=0.8, min=0.05, max=20.0, step=0.05, description="Fmin (Hz):")
w_fmax = widgets.FloatSlider(value=6.0, min=0.1, max=40.0, step=0.1, description="Fmax (Hz):")

w_rp = widgets.FloatSlider(value=1.0, min=0.1, max=5.0, step=0.1, description="rp (dB):")
w_rs = widgets.FloatSlider(value=40.0, min=10.0, max=100.0, step=1.0, description="rs (dB):")

# Signal controls
w_fs = widgets.Dropdown(options=[50.0, 100.0, 200.0], value=100.0, description="Fs (Hz):")
w_seed = widgets.IntSlider(value=0, min=0, max=50, step=1, description="Noise seed:")
w_trend = widgets.FloatSlider(value=0.02, min=-0.1, max=0.1, step=0.005, description="Trend:")
w_noise = widgets.FloatSlider(value=0.05, min=0.0, max=0.3, step=0.005, description="Noise σ:")

w_zoom = widgets.FloatRangeSlider(value=[7.2, 9.0], min=0.0, max=20.0, step=0.05, description="Zoom (s):")

ui = widgets.VBox([
    widgets.HBox([w_kind, w_apply]),
    widgets.HBox([w_ftype, w_corners]),
    widgets.HBox([w_freq]),
    widgets.HBox([w_fmin, w_fmax]),
    widgets.HBox([w_rp, w_rs]),
    widgets.HBox([w_fs, w_seed, w_trend, w_noise]),
    w_zoom
])

out = widgets.Output()
display(ui, out)

def _update_visibility(*args):
    # low/high use freq; band uses fmin/fmax
    if w_kind.value in ("lowpass", "highpass"):
        w_freq.layout.display = "block"
        w_fmin.layout.display = "none"
        w_fmax.layout.display = "none"
    else:
        w_freq.layout.display = "none"
        w_fmin.layout.display = "block"
        w_fmax.layout.display = "block"

    # only show rp/rs when relevant
    if w_ftype.value == "cheby1":
        w_rp.layout.display = "block"
        w_rs.layout.display = "none"
    elif w_ftype.value == "cheby2":
        w_rp.layout.display = "none"
        w_rs.layout.display = "block"
    elif w_ftype.value == "ellip":
        w_rp.layout.display = "block"
        w_rs.layout.display = "block"
    else:
        w_rp.layout.display = "none"
        w_rs.layout.display = "none"

_update_visibility()
w_kind.observe(_update_visibility, "value")
w_ftype.observe(_update_visibility, "value")


VBox(children=(HBox(children=(Dropdown(description='Filter:', index=2, options=(('Lowpass', 'lowpass'), ('High…

Output()

In [5]:

def render():
    with out:
        clear_output(wait=True)

        fs = float(w_fs.value)
        nyq = 0.5 * fs

        # Build signal
        t, x = make_test_signal(fs=fs, T=20.0, t_on=8.0, seed=int(w_seed.value),
                                trend_slope=float(w_trend.value), noise_std=float(w_noise.value),
                                pulse_amp=1.0, pulse_f0=6.0)
        tr = make_trace(x, fs)

        kind = w_kind.value
        corners = int(w_corners.value)
        zerophase = bool(w_apply.value)
        ftype = w_ftype.value
        rp = float(w_rp.value)
        rs = float(w_rs.value)

        # Validate frequencies
        if kind == "bandpass":
            fmin = float(w_fmin.value)
            fmax = float(w_fmax.value)
            if fmax <= fmin:
                display(Markdown("**Error:** Bandpass requires Fmax > Fmin.")); return
            if fmax >= nyq:
                display(Markdown("**Error:** Fmax must be < Nyquist (Fs/2).")); return
            if fmin <= 0:
                display(Markdown("**Error:** Fmin must be > 0.")); return
        else:
            freq = float(w_freq.value)
            if freq >= nyq:
                display(Markdown("**Error:** Freq must be < Nyquist (Fs/2).")); return
            if freq <= 0:
                display(Markdown("**Error:** Freq must be > 0.")); return

        # Apply filter using ObsPy
        if kind == "bandpass":
            tr_f = obspy_apply_filter(tr, kind, corners, zerophase, freqmin=fmin, freqmax=fmax,
                                      ftype=ftype, rp=rp, rs=rs)
            b, a, _meta = equivalent_iir_ba(fs, kind, corners, freqmin=fmin, freqmax=fmax,
                                           ftype=ftype, rp=rp, rs=rs)
            filt_desc = f"{kind} {fmin:.3g}–{fmax:.3g} Hz"
        else:
            tr_f = obspy_apply_filter(tr, kind, corners, zerophase, freq=freq,
                                      ftype=ftype, rp=rp, rs=rs)
            b, a, _meta = equivalent_iir_ba(fs, kind, corners, freq=freq,
                                           ftype=ftype, rp=rp, rs=rs)
            filt_desc = f"{kind} {freq:.3g} Hz"

        mode = "two-way (zero-phase)" if zerophase else "one-way (causal)"
        title = f"{filt_desc} | corners={corners} | family={ftype} | apply={mode}"
        display(Markdown(f"# {title}"))

        # ---------------- Middle: filter representations ----------------
        display(Markdown("## Filter representations"))

        # Impulse response
        imp = impulse_response_from_ba(b, a, n=256)
        t_imp = np.arange(len(imp)) / fs
        fig, ax = plt.subplots(figsize=(10, 3))
        ax.plot(t_imp, imp)
        ax.set_title("Impulse response h[n]")
        ax.set_xlabel("Time (s)")
        ax.set_ylabel("h[n]")
        ax.grid(True, alpha=0.3)
        plt.show()

        # Frequency response
        plot_freq_response_from_ba(b, a, fs, title="Frequency response of H(z)")

        # Difference equation + H(z)
        display(Markdown("### Difference equation")); display(Latex(format_difference_equation(b, a)))
        display(Markdown("### z-transform / transfer function")); display(Latex(format_Hz(b, a)))

        # Pole–zero
        plot_pz_from_ba(b, a, title="Pole–zero plot (z-plane)")

        # ---------------- Bottom: effect on signal ----------------
        display(Markdown("## Effect on the signal"))

        z0, z1 = w_zoom.value
        mask = (t >= z0) & (t <= z1)

        fig, ax = plt.subplots(figsize=(10, 3))
        ax.plot(t[mask], tr.data[mask], label="raw")
        ax.plot(t[mask], tr_f.data[mask], label="filtered")
        ax.set_title("Zoom around transient: raw vs filtered")
        ax.set_xlabel("Time (s)")
        ax.set_ylabel("Amplitude")
        ax.grid(True, alpha=0.3)
        ax.legend()
        plt.show()

        # Spectra overlay
        f_raw, mag_raw = amp_spectrum(tr.data, fs)
        f_fil, mag_fil = amp_spectrum(tr_f.data, fs)

        fig, ax = plt.subplots(figsize=(10, 3))
        ax.plot(f_raw, mag_raw, label="raw")
        ax.plot(f_fil, mag_fil, label="filtered")
        ax.set_title("Spectra overlay: raw vs filtered")
        ax.set_xlabel("Frequency (Hz)")
        ax.set_ylabel("|X(f)|")
        ax.set_xlim(0, fs/2)
        ax.grid(True, alpha=0.3)
        ax.legend()
        plt.show()

# Hook widget changes
for w in [w_kind, w_apply, w_ftype, w_corners, w_freq, w_fmin, w_fmax, w_rp, w_rs, w_fs, w_seed, w_trend, w_noise, w_zoom]:
    w.observe(lambda change: render(), "value")

render()



## 3) Student prompts

- Set **bandpass**, then increase `corners` and compare:
  - one-way vs two-way
  - ringing and **pre-ringing** (energy before onset) in the zoom view

- Try **highpass** to remove the trend without destroying the transient.

- Compare filter families:
  - Butterworth (no ripple)
  - Chebyshev / Elliptic (ripple or sharper transitions)
  - Bessel (gentler magnitude, better phase behavior)

- Change Fs and discuss:
  - why corners near Nyquist can misbehave
  - how the same corner frequency means a different normalized frequency as Fs changes
