
# 🎧 Live System‑Audio Analysis (Notebook)

This notebook lets you **capture system audio (loopback)** and display **live analysis** (level meter, waveform, spectrum, and basic features) directly in the cell output.

## ✅ What you'll get
- Device discovery + easy selection (works on Windows, macOS, Linux — see notes below)
- Real‑time capture via `sounddevice` (PortAudio) or loopback drivers
- Live plots: waveform and spectrum (FFT)
- Live metrics: RMS / peak (dBFS), spectral centroid/rolloff, and a rough **BPM** estimate over a sliding window
- A big red **Stop** button (or just interrupt the cell)

---

## 🖥️ OS-specific notes

### Windows (recommended)
- Works out of the box via **WASAPI Loopback**. You can usually select a device that contains the word **"Loopback"** in its name.

### macOS
- macOS **does not provide loopback by default**. Install a virtual device such as **BlackHole 2ch** (free, from Existential Audio).
- Create a **Multi-Output Device** (Audio MIDI Setup) if you want to **listen to speakers _and_ capture** simultaneously. Set system output to that device, and select **BlackHole** as the input device for this notebook.
- In the configuration cell, set `DEVICE_NAME = "BlackHole"` (or whatever your loopback device is named).

### Linux (PulseAudio / PipeWire)
- Use the **monitor** of your sink (e.g., `"Monitor of Built-in Audio Analog Stereo"`). The code auto-searches for `"monitor"` devices, but you can set `DEVICE_NAME` explicitly.

---

## 👉 How to use
1. Run the **Install dependencies** cell (just once per environment).
2. **List devices** and note the name of your loopback device.
3. Set `DEVICE_NAME` in the **Configuration** cell (or leave as `None` to auto-pick likely loopback devices).
4. Run the **Start live capture** cell. Click **Stop** to end, or press **Interrupt** in the toolbar.

> Tip: If plots don't update smoothly in JupyterLab, install/enable the `ipympl` widget backend, or run in the classic notebook. This notebook falls back to simple redraws with `plt.pause(...)` which work broadly.


In [None]:

# %% Install dependencies (run once)
# You may skip re-running this if already installed.
import sys
!{sys.executable} -m pip install --quiet sounddevice soundcard numpy scipy matplotlib librosa ipywidgets
# Optional but helpful backends for interactive plotting in JupyterLab:
# !{sys.executable} -m pip install --quiet ipympl
# Then enable widgets (in JupyterLab):
# !{sys.executable} -m jupyter nbextension enable --py widgetsnbextension


In [None]:

# %% Imports
import platform
import time
import math
import queue
import threading
import numpy as np
import sounddevice as sd
import soundcard as sc
from scipy.signal import get_window
from IPython.display import clear_output, display
import ipywidgets as widgets
import matplotlib.pyplot as plt

# Try to prefer an interactive backend if available; otherwise default to inline.
try:
    get_ipython().run_line_magic('matplotlib', 'widget')
except Exception:
    try:
        get_ipython().run_line_magic('matplotlib', 'notebook')
    except Exception:
        get_ipython().run_line_magic('matplotlib', 'inline')

# Librosa features (tempo, spectral centroid/rolloff)
import librosa
import librosa.feature
import librosa.beat


In [None]:

# %% List audio devices (inputs/outputs) to help you pick the right one
import pandas as pd

def list_sd_devices():
    devs = sd.query_devices()
    rows = []
    for idx, d in enumerate(devs):
        hostapi_name = sd.query_hostapis()[d['hostapi']]['name']
        rows.append({
            'index': idx,
            'name': d['name'],
            'max_input_channels': d['max_input_channels'],
            'max_output_channels': d['max_output_channels'],
            'default_samplerate': d['default_samplerate'],
            'hostapi': hostapi_name
        })
    return pd.DataFrame(rows)

def list_sc_devices():
    # soundcard lists microphones and speakers separately
    mics = [{'type': 'mic', 'name': m.name, 'id': m.id, 'channels': m.channels} for m in sc.all_microphones()]
    spks = [{'type': 'speaker', 'name': s.name, 'id': s.id, 'channels': s.channels} for s in sc.all_speakers()]
    return pd.DataFrame(mics + spks)

print("Platform:", platform.platform())

try:
    from caas_jupyter_tools import display_dataframe_to_user
    df_sd = list_sd_devices()
    display_dataframe_to_user("sounddevice_devices", df_sd)
    df_sc = list_sc_devices()
    display_dataframe_to_user("soundcard_devices", df_sc)
except Exception as e:
    print("sounddevice devices:")
    display(list_sd_devices())
    print("\nsoundcard devices:")
    display(list_sc_devices())


In [None]:

# %% Configuration
# Try setting this to something like "Loopback", "BlackHole", "VB-Audio", "Soundflower", or "monitor"
DEVICE_NAME = None  # e.g., "BlackHole", "Loopback", "monitor". Leave as None to auto-pick.
SAMPLE_RATE = 48000
CHANNELS = 2              # mono=1, stereo=2
BLOCKSIZE = 1024          # frames per block; smaller = more updates
RINGBUFFER_SECS = 12      # seconds kept in rolling buffer for metrics/BPM
PRINT_EVERY = 10          # print metrics every N blocks
WINDOW_FOR_FFT = 4096     # window length for spectrum (powers of two are fastest)

# Feature config
BPM_MIN = 60
BPM_MAX = 200


In [None]:

# %% Helper: choose an input device for loopback/system capture
def choose_input_device(preferred_name=None):
    devs = sd.query_devices()
    name_lower = preferred_name.lower() if preferred_name else None

    # 1) Explicit name match first
    if name_lower:
        for idx, d in enumerate(devs):
            if d['max_input_channels'] > 0 and name_lower in d['name'].lower():
                return idx, d

    # 2) Auto-pick likely loopback devices (Windows WASAPI)
    for idx, d in enumerate(devs):
        if d['max_input_channels'] > 0 and ('loopback' in d['name'].lower() or 'wasapi' in sd.query_hostapis()[d['hostapi']]['name'].lower() and 'loopback' in d['name'].lower()):
            return idx, d

    # 3) Linux PulseAudio "monitor"
    for idx, d in enumerate(devs):
        if d['max_input_channels'] > 0 and 'monitor' in d['name'].lower():
            return idx, d

    # 4) Fallback: any input device (user may have routed system-through-virtual)
    for idx, d in enumerate(devs):
        if d['max_input_channels'] >= CHANNELS:
            return idx, d

    raise RuntimeError("No suitable input device found. Please set DEVICE_NAME to a working loopback/virtual device.")
    
dev_index, dev_info = choose_input_device(DEVICE_NAME)
print("Using input device:", dev_info)


In [None]:

# %% Live capture + analysis
q = queue.Queue()
RUN = True
block_count = 0

# Rolling buffer to keep last RINGBUFFER_SECS seconds for analysis
ring_len = int(RINGBUFFER_SECS * SAMPLE_RATE)
ring = np.zeros((ring_len, CHANNELS), dtype=np.float32)
ring_write_pos = 0

# Widgets
stop_btn = widgets.Button(description="Stop", button_style='danger', icon='stop')
out_metrics = widgets.Output()
controls = widgets.HBox([stop_btn])
display(controls, out_metrics)

def on_stop_clicked(b):
    global RUN
    RUN = False
stop_btn.on_click(on_stop_clicked)

# Figures: separate figures for waveform and spectrum
fig_wave = plt.figure(figsize=(8, 3))
ax_wave = fig_wave.add_subplot(111)
ax_wave.set_title("Waveform (rolling)")
ax_wave.set_xlabel("Time (samples)")
ax_wave.set_ylabel("Amplitude")
line_wave, = ax_wave.plot(np.zeros(ring_len))

fig_spec = plt.figure(figsize=(8, 3))
ax_spec = fig_spec.add_subplot(111)
ax_spec.set_title("Spectrum (last window)")
ax_spec.set_xlabel("Frequency (Hz)")
ax_spec.set_ylabel("Magnitude")
freqs = np.fft.rfftfreq(WINDOW_FOR_FFT, 1.0/SAMPLE_RATE)
line_spec, = ax_spec.plot(freqs, np.zeros_like(freqs))

# Audio callback pushes blocks into queue
def audio_callback(indata, frames, time_info, status):
    if status:
        # You can print(status) if needed for debugging
        pass
    q.put(indata.copy())

stream = sd.InputStream(device=dev_index, channels=CHANNELS, samplerate=SAMPLE_RATE,
                        blocksize=BLOCKSIZE, dtype='float32', callback=audio_callback)

def dbfs(x):
    # x: float32 [-1,1]
    rms = np.sqrt(np.mean(np.square(x)) + 1e-12)
    peak = np.max(np.abs(x) + 1e-12)
    return 20*np.log10(rms), 20*np.log10(peak)

def compute_features(buf_mono, sr):
    # Spectral features (centroid, rolloff)
    S = np.abs(librosa.stft(buf_mono, n_fft=WINDOW_FOR_FFT, hop_length=WINDOW_FOR_FFT//4, window='hann'))
    centroid = librosa.feature.spectral_centroid(S=S, sr=sr).mean()
    rolloff = librosa.feature.spectral_rolloff(S=S, sr=sr, roll_percent=0.85).mean()

    # Rough BPM (tempo) over the window
    # Note: tempo estimation is noisy on short windows; this is a rough guide.
    # Use onset envelope to stabilize
    onset_env = librosa.onset.onset_strength(y=buf_mono, sr=sr)
    tempo = librosa.beat.tempo(onset_envelope=onset_env, sr=sr, aggregate=np.median, start_bpm=BPM_MIN, max_tempo=BPM_MAX)
    bpm = float(tempo[0]) if tempo.size else float('nan')
    return centroid, rolloff, bpm

try:
    with stream:
        last_print = 0
        while RUN:
            try:
                block = q.get(timeout=0.5)
            except queue.Empty:
                continue

            n = len(block)
            # Write into ring buffer (wrap-around)
            if ring_write_pos + n <= ring_len:
                ring[ring_write_pos:ring_write_pos+n] = block
            else:
                first = ring_len - ring_write_pos
                ring[ring_write_pos:] = block[:first]
                ring[:n-first] = block[first:]
            ring_write_pos = (ring_write_pos + n) % ring_len

            # Prepare rolling waveform (mono for display)
            ring_view = np.concatenate((ring[ring_write_pos:], ring[:ring_write_pos]), axis=0)
            mono = ring_view.mean(axis=1)

            # Update waveform
            line_wave.set_ydata(mono)
            ax_wave.set_ylim(-1.0, 1.0)
            ax_wave.set_xlim(0, ring_len)
            fig_wave.canvas.draw()
            plt.pause(0.001)

            # Spectrum on the last WINDOW_FOR_FFT samples
            if ring_len >= WINDOW_FOR_FFT:
                recent = mono[-WINDOW_FOR_FFT:]
                win = get_window('hann', WINDOW_FOR_FFT, fftbins=True)
                recent_win = recent * win
                spec = np.abs(np.fft.rfft(recent_win))
                line_spec.set_ydata(spec)
                ax_spec.set_xlim(0, SAMPLE_RATE/2)
                ax_spec.set_ylim(0, max(1e-6, spec.max()))
                fig_spec.canvas.draw()
                plt.pause(0.001)

            # Metrics
            block_mono = block.mean(axis=1)
            rms_db, peak_db = dbfs(block_mono)
            block_count += 1

            # Print metrics periodically
            if block_count % PRINT_EVERY == 0:
                # Compute features over the entire ring (mono)
                try:
                    centroid, rolloff, bpm = compute_features(mono.astype(np.float32), SAMPLE_RATE)
                except Exception:
                    centroid, rolloff, bpm = float('nan'), float('nan'), float('nan')

                with out_metrics:
                    clear_output(wait=True)
                    print(f"Blocks processed: {block_count}")
                    print(f"RMS: {rms_db:+.1f} dBFS | Peak: {peak_db:+.1f} dBFS")
                    print(f"Spectral centroid: {centroid:,.0f} Hz | Rolloff (85%): {rolloff:,.0f} Hz")
                    if math.isfinite(bpm):
                        print(f"Estimated Tempo: ~{bpm:.0f} BPM (rough)")
                    else:
                        print("Estimated Tempo: n/a (insufficient signal)")

except KeyboardInterrupt:
    RUN = False
    print("Stopped by user.")
finally:
    RUN = False
    try:
        stream.stop()
        stream.close()
    except Exception:
        pass
    print("Stream closed.")


In [None]:

# %% Optional: Persist rolling metrics to JSONL for other apps to consume (e.g., DMX/lighting)
# Run this cell to start logging; re-run to stop.
import json
from datetime import datetime

LOG_PATH = "live_metrics.jsonl"
LOGGING = False

def log_metrics_loop(duration_secs=60):
    global LOGGING
    LOGGING = True
    t0 = time.time()
    print(f"Logging to {LOG_PATH} for up to {duration_secs}s (Ctrl+C to stop earlier).")
    try:
        with open(LOG_PATH, "a", encoding="utf-8") as f:
            while LOGGING and (time.time() - t0) < duration_secs:
                time.sleep(1.0)
                # Snapshot of last-known metrics (read from out_metrics display is not trivial)
                # Here we recompute quickly on the ring buffer
                ring_view = np.concatenate((ring[ring_write_pos:], ring[:ring_write_pos]), axis=0)
                mono = ring_view.mean(axis=1).astype(np.float32)
                rms_db, peak_db = dbfs(mono)
                try:
                    centroid, rolloff, bpm = compute_features(mono, SAMPLE_RATE)
                except Exception:
                    centroid, rolloff, bpm = float('nan'), float('nan'), float('nan')
                rec = {
                    "ts": datetime.utcnow().isoformat() + "Z",
                    "rms_db": float(rms_db),
                    "peak_db": float(peak_db),
                    "centroid_hz": float(centroid),
                    "rolloff_hz": float(rolloff),
                    "bpm": float(bpm) if math.isfinite(bpm) else None,
                }
                f.write(json.dumps(rec) + "\n")
        print("Done logging.")
    except KeyboardInterrupt:
        print("Stopped logging (user).")
        LOGGING = False

# To start logging for 5 minutes, run:
# log_metrics_loop(duration_secs=300)


In [None]:

# %% (Optional) Generate a test tone you can route to your output (for sanity check)
import numpy as np
import sounddevice as sd

DUR = 3.0
FREQ = 440.0
AMP = 0.1

t = np.linspace(0, DUR, int(DUR*SAMPLE_RATE), endpoint=False)
tone = (AMP * np.sin(2*np.pi*FREQ*t)).astype(np.float32)

print("Playing 3s A4 test tone…")
sd.play(tone, samplerate=SAMPLE_RATE)
sd.wait()
print("Done.")
