# Adaptive Noise Cancellation Using LMS and RLS Filters

This notebook implements adaptive noise cancellation algorithms to remove unwanted noise from speech signals using:
- **LMS (Least Mean Square)** adaptive filter
- **RLS (Recursive Least Squares)** adaptive filter

Two implementations are provided:
1. Custom implementation (pure Python)
2. Padasip library implementation


In [None]:
# Install required packages
!pip install padasip numpy matplotlib soundfile -q


In [None]:
# Import libraries
import numpy as np
import matplotlib.pyplot as plt
import wave
import struct
import math
from pathlib import Path
from typing import List, Tuple
import os

try:
    import padasip as pa
    PADASIP_AVAILABLE = True
except ImportError:
    PADASIP_AVAILABLE = False
    print("Warning: padasip not available. Only custom implementation will work.")


### Alternative: Load from Google Drive

If you prefer to use files from Google Drive instead of uploading, uncomment and run the cell below:


In [None]:
# Alternative: Load from Google Drive (uncomment to use)
# from google.colab import drive
# drive.mount('/content/drive')
# 
# # Update these paths to your files in Google Drive
# noisy_filename = '/content/drive/MyDrive/path/to/your/noisy_audio.wav'
# noise_filename = '/content/drive/MyDrive/path/to/your/noise_reference.wav'
# 
# print(f"✓ Files loaded from Google Drive: {noisy_filename}, {noise_filename}")


## Upload Audio Files

Upload your audio files:
- **Noisy speech**: The audio signal with noise
- **Noise reference**: The reference noise signal


In [None]:
# Upload audio files (Colab-specific)
from google.colab import files
import io

print("Upload noisy speech file:")
uploaded_noisy = files.upload()
noisy_filename = list(uploaded_noisy.keys())[0]

print("\nUpload noise reference file:")
uploaded_noise = files.upload()
noise_filename = list(uploaded_noise.keys())[0]

print(f"\n✓ Files uploaded: {noisy_filename}, {noise_filename}")


## Helper Functions


In [None]:
def read_wav(path: str) -> Tuple[int, List[float]]:
    """Return sample_rate and mono float samples in [-1, 1]."""
    with wave.open(path, "rb") as wf:
        channels = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        rate = wf.getframerate()
        frames = wf.getnframes()
        raw = wf.readframes(frames)

    if sampwidth != 2:
        raise ValueError(f"Only 16‑bit PCM supported, got {sampwidth * 8} bits")

    samples = struct.unpack("<" + "h" * (len(raw) // 2), raw)
    if channels == 2:
        # Average the two channels to mono.
        mono = [(samples[i] + samples[i + 1]) / 2 for i in range(0, len(samples), 2)]
    else:
        mono = list(samples)

    norm = 32768.0
    return rate, [s / norm for s in mono]


def read_wav_numpy(path: str) -> Tuple[int, np.ndarray]:
    """Return sample_rate and mono float samples in [-1, 1] using numpy."""
    with wave.open(path, "rb") as wf:
        channels = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        rate = wf.getframerate()
        frames = wf.getnframes()
        raw = wf.readframes(frames)

    if sampwidth != 2:
        raise ValueError(f"Only 16-bit PCM supported, got {sampwidth * 8} bits")

    samples = np.frombuffer(raw, dtype="<i2").astype(np.float64)
    if channels == 2:
        samples = samples.reshape(-1, 2).mean(axis=1)

    norm = 32768.0
    return rate, samples / norm


def write_wav(path: str, rate: int, samples: List[float]) -> None:
    """Write mono 16-bit PCM wav from normalized floats."""
    # Avoid clipping.
    peak = max(max((abs(s) for s in samples), default=0.0), 1e-12)
    if peak > 1.0:
        scale = 1.0 / peak
        samples = [s * scale for s in samples]
    int_samples = [int(max(-1.0, min(1.0, s)) * 32767) for s in samples]
    raw = struct.pack("<" + "h" * len(int_samples), *int_samples)
    with wave.open(path, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(rate)
        wf.writeframes(raw)


def write_wav_numpy(path: str, rate: int, samples: np.ndarray) -> None:
    """Write mono 16-bit PCM wav from normalized floats using numpy."""
    if samples.size == 0:
        raise ValueError("No samples to write.")
    peak = np.max(np.abs(samples))
    if peak > 1.0:
        samples = samples / peak
    int_samples = np.clip(samples, -1.0, 1.0)
    int_samples = (int_samples * 32767).astype("<i2")
    with wave.open(path, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(rate)
        wf.writeframes(int_samples.tobytes())


In [None]:
def remove_dc(signal):
    """Remove DC component by subtracting mean."""
    if isinstance(signal, np.ndarray):
        return signal - np.mean(signal)
    if not signal:
        return signal
    mean = sum(signal) / len(signal)
    return [s - mean for s in signal]


def normalize(signal, peak_target: float = 0.99):
    """Scale to a target peak magnitude."""
    if isinstance(signal, np.ndarray):
        if signal.size == 0:
            return signal
        peak = np.max(np.abs(signal))
        if peak == 0:
            return signal
        scale = peak_target / peak
        return signal * scale
    
    if not signal:
        return signal
    peak = max(abs(s) for s in signal)
    if peak == 0:
        return signal
    scale = peak_target / peak
    return [s * scale for s in signal]


def rms(signal):
    """Root-mean-square level."""
    if isinstance(signal, np.ndarray):
        if signal.size == 0:
            return 0.0
        return math.sqrt(float(np.mean(signal * signal)))
    
    if not signal:
        return 0.0
    return math.sqrt(sum(s * s for s in signal) / len(signal))


## Custom Implementation (Pure Python)


In [None]:
def lms_filter(
    noise: List[float], target: List[float], order: int = 12, mu: float = 0.005
) -> List[float]:
    """Least-mean-square adaptive filter returning the error (cleaned output)."""
    w = [0.0] * order
    out: List[float] = []
    for n in range(len(target)):
        x = [noise[n - k] if n - k >= 0 else 0.0 for k in range(order)]
        y = sum(w[i] * x[i] for i in range(order))
        e = target[n] - y
        for i in range(order):
            w[i] += 2 * mu * e * x[i]
        out.append(e)
    return out


def rls_filter(
    noise: List[float],
    target: List[float],
    order: int = 12,
    lam: float = 0.95,
    delta: float = 0.1,
) -> List[float]:
    """Recursive-least-square adaptive filter returning the error output."""
    w = [0.0] * order
    # Initialize inverse correlation matrix.
    P = [[0.0] * order for _ in range(order)]
    for i in range(order):
        P[i][i] = 1.0 / delta

    out: List[float] = []
    for n in range(len(target)):
        x = [noise[n - k] if n - k >= 0 else 0.0 for k in range(order)]
        # k = P x / (lambda + x^T P x)
        Px = [sum(P[i][j] * x[j] for j in range(order)) for i in range(order)]
        xPx = sum(x[i] * Px[i] for i in range(order))
        denom = lam + xPx
        k = [Px[i] / denom for i in range(order)]

        y = sum(w[i] * x[i] for i in range(order))
        e = target[n] - y
        for i in range(order):
            w[i] += k[i] * e

        # P = (P - k x^T P) / lambda
        xTP = [sum(x[j] * P[j][i] for j in range(order)) for i in range(order)]
        for i in range(order):
            for j in range(order):
                P[i][j] = (P[i][j] - k[i] * xTP[j]) / lam

        out.append(e)
    return out


In [None]:
# Load audio files
rate_noisy, noisy = read_wav(noisy_filename)
rate_noise, noise = read_wav(noise_filename)

if rate_noisy != rate_noise:
    raise ValueError("Sample rates must match between noisy and noise files.")

length = min(len(noisy), len(noise))
noisy = noisy[:length]
noise = noise[:length]

# Preprocess to improve convergence
noisy = normalize(remove_dc(noisy))
noise = normalize(remove_dc(noise))

print(f"✓ Loaded audio files")
print(f"  Sample rate: {rate_noisy} Hz")
print(f"  Duration: {length/rate_noisy:.2f} seconds")
print(f"  Input RMS: {rms(noisy):.4f}")


In [None]:
# Set filter parameters
LMS_ORDER = 12
LMS_MU = 0.0025
RLS_ORDER = 10
RLS_LAM = 0.999
RLS_DELTA = 0.1

print("Running custom implementation filters...")
lms_clean = lms_filter(noise, noisy, order=LMS_ORDER, mu=LMS_MU)
rls_clean = rls_filter(noise, noisy, order=RLS_ORDER, lam=RLS_LAM, delta=RLS_DELTA)

print(f"✓ LMS filter complete (order={LMS_ORDER}, mu={LMS_MU})")
print(f"  Output RMS: {rms(lms_clean):.4f}")
print(f"✓ RLS filter complete (order={RLS_ORDER}, lam={RLS_LAM}, delta={RLS_DELTA})")
print(f"  Output RMS: {rms(rls_clean):.4f}")


In [None]:
# Save output files
os.makedirs("outputs_basic", exist_ok=True)
write_wav("outputs_basic/audio_lms.wav", rate_noisy, lms_clean)
write_wav("outputs_basic/audio_rls.wav", rate_noisy, rls_clean)
print("✓ Saved output files:")
print("  - outputs_basic/audio_lms.wav")
print("  - outputs_basic/audio_rls.wav")


## Padasip Implementation (if available)


In [None]:
if PADASIP_AVAILABLE:
    def lms_padasip(noisy: np.ndarray, noise: np.ndarray, order: int, mu: float) -> np.ndarray:
        X = pa.preprocess.input_from_history(noise, order)
        d = noisy[order - 1 :]
        f = pa.filters.FilterLMS(n=order, mu=mu, w="random")
        y, e, _ = f.run(d, X)
        return e  # error is the cleaned speech

    def rls_padasip(noisy: np.ndarray, noise: np.ndarray, order: int, lam: float, delta: float) -> np.ndarray:
        X = pa.preprocess.input_from_history(noise, order)
        d = noisy[order - 1 :]
        # padasip FilterRLS uses mu as forgetting factor and eps as initial delta.
        f = pa.filters.FilterRLS(n=order, mu=lam, eps=delta, w="random")
        y, e, _ = f.run(d, X)
        return e
    
    # Load audio files for padasip (numpy arrays)
    rate_noisy_pada, noisy_pada = read_wav_numpy(noisy_filename)
    rate_noise_pada, noise_pada = read_wav_numpy(noise_filename)
    
    length_pada = min(len(noisy_pada), len(noise_pada))
    noisy_pada = noisy_pada[:length_pada]
    noise_pada = noise_pada[:length_pada]
    
    # Preprocess
    noisy_pada = normalize(remove_dc(noisy_pada))
    noise_pada = normalize(remove_dc(noise_pada))
    
    # Set parameters
    LMS_ORDER_PADA = 32
    LMS_MU_PADA = 0.01
    RLS_ORDER_PADA = 16
    RLS_LAM_PADA = 0.995
    RLS_DELTA_PADA = 0.01
    
    print("Running padasip filters...")
    lms_clean_pada = lms_padasip(noisy_pada, noise_pada, order=LMS_ORDER_PADA, mu=LMS_MU_PADA)
    rls_clean_pada = rls_padasip(noisy_pada, noise_pada, order=RLS_ORDER_PADA, lam=RLS_LAM_PADA, delta=RLS_DELTA_PADA)
    
    print(f"✓ LMS (padasip) complete (order={LMS_ORDER_PADA}, mu={LMS_MU_PADA})")
    print(f"  Output RMS: {rms(lms_clean_pada):.4f}")
    print(f"✓ RLS (padasip) complete (order={RLS_ORDER_PADA}, lam={RLS_LAM_PADA}, delta={RLS_DELTA_PADA})")
    print(f"  Output RMS: {rms(rls_clean_pada):.4f}")
    
    # Save outputs
    os.makedirs("outputs_padasip", exist_ok=True)
    write_wav_numpy("outputs_padasip/audio_lms_pada.wav", rate_noisy_pada, lms_clean_pada)
    write_wav_numpy("outputs_padasip/audio_rls_pada.wav", rate_noisy_pada, rls_clean_pada)
    print("✓ Saved padasip output files:")
    print("  - outputs_padasip/audio_lms_pada.wav")
    print("  - outputs_padasip/audio_rls_pada.wav")
else:
    print("⚠ Padasip not available. Skipping padasip implementation.")


## Visualization


In [None]:
def running_rms(signal, window: int):
    """Running RMS with a sliding window."""
    if isinstance(signal, np.ndarray):
        if window <= 0:
            raise ValueError("window must be positive")
        if signal.size == 0:
            return np.array([], dtype=float)
        sq = signal * signal
        cumsum = np.cumsum(sq)
        out = np.empty_like(signal)
        for i in range(len(signal)):
            start = i - window + 1
            if start <= 0:
                total = cumsum[i]
                count = i + 1
            else:
                total = cumsum[i] - cumsum[start - 1]
                count = window
            out[i] = math.sqrt(total / count)
        return out
    else:
        if window <= 0:
            raise ValueError("window must be positive")
        if not signal:
            return []
        sq = [s * s for s in signal]
        cumsum = []
        total = 0.0
        for v in sq:
            total += v
            cumsum.append(total)
        out: List[float] = []
        for i in range(len(signal)):
            start = i - window + 1
            if start <= 0:
                total_window = cumsum[i]
                count = i + 1
            else:
                total_window = cumsum[i] - cumsum[start - 1]
                count = window
            out.append(math.sqrt(total_window / count))
        return out


In [None]:
# Plot signals comparison
segment = min(len(noisy), rate_noisy * 2)  # first 2 seconds
t = [i / rate_noisy for i in range(segment)]

plt.figure(figsize=(12, 8))

plt.subplot(3, 1, 1)
plt.plot(t, noisy[:segment], linewidth=0.7, color='blue')
plt.title("Original Noisy Speech", fontsize=12, fontweight='bold')
plt.ylabel("Amplitude")
plt.grid(True, alpha=0.3)

plt.subplot(3, 1, 2)
plt.plot(t[:len(lms_clean[:segment])], lms_clean[:segment], color="green", linewidth=0.7)
plt.title("LMS Cleaned Output (Custom)", fontsize=12, fontweight='bold')
plt.ylabel("Amplitude")
plt.grid(True, alpha=0.3)

plt.subplot(3, 1, 3)
plt.plot(t[:len(rls_clean[:segment])], rls_clean[:segment], color="orange", linewidth=0.7)
plt.title("RLS Cleaned Output (Custom)", fontsize=12, fontweight='bold')
plt.xlabel("Time [s]")
plt.ylabel("Amplitude")
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig("outputs_basic/signals.png", dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved plot to outputs_basic/signals.png")


In [None]:
# Plot convergence curves
window = max(1, int(rate_noisy * 0.05))  # 50ms window
lms_curve = running_rms(lms_clean, window)
rls_curve = running_rms(rls_clean, window)
length = min(len(lms_curve), len(rls_curve))
times = [i / rate_noisy for i in range(length)]

plt.figure(figsize=(10, 5))
plt.semilogy(times, lms_curve[:length], label="LMS", linewidth=1.5, color='green')
plt.semilogy(times, rls_curve[:length], label="RLS", linewidth=1.5, color='orange')
plt.xlabel("Time [s]", fontsize=11)
plt.ylabel("Running RMS (log scale)", fontsize=11)
plt.title("Convergence Comparison - Custom Implementation", fontsize=12, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, which="both", linestyle="--", alpha=0.5)
plt.tight_layout()
plt.savefig("outputs_basic/convergence.png", dpi=150, bbox_inches='tight')
plt.show()
print("✓ Saved convergence plot to outputs_basic/convergence.png")

# Save convergence data
import csv
with open("outputs_basic/convergence.csv", "w", encoding="utf-8", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["time_sec", "lms_running_rms", "rls_running_rms"])
    for t, a, b in zip(times, lms_curve[:length], rls_curve[:length]):
        writer.writerow([t, a, b])
print("✓ Saved convergence data to outputs_basic/convergence.csv")


In [None]:
if PADASIP_AVAILABLE:
    # Plot padasip signals
    segment_pada = min(len(noisy_pada), rate_noisy_pada * 2)
    t_pada = np.arange(segment_pada) / rate_noisy_pada
    
    plt.figure(figsize=(12, 8))
    
    plt.subplot(3, 1, 1)
    plt.plot(t_pada, noisy_pada[:segment_pada], linewidth=0.7, color='blue')
    plt.title("Original Noisy Speech", fontsize=12, fontweight='bold')
    plt.ylabel("Amplitude")
    plt.grid(True, alpha=0.3)
    
    plt.subplot(3, 1, 2)
    plt.plot(t_pada[:len(lms_clean_pada[:segment_pada])], lms_clean_pada[:segment_pada], 
             color="green", linewidth=0.7)
    plt.title("LMS Cleaned Output (Padasip)", fontsize=12, fontweight='bold')
    plt.ylabel("Amplitude")
    plt.grid(True, alpha=0.3)
    
    plt.subplot(3, 1, 3)
    plt.plot(t_pada[:len(rls_clean_pada[:segment_pada])], rls_clean_pada[:segment_pada], 
             color="orange", linewidth=0.7)
    plt.title("RLS Cleaned Output (Padasip)", fontsize=12, fontweight='bold')
    plt.xlabel("Time [s]")
    plt.ylabel("Amplitude")
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig("outputs_padasip/signals_pada.png", dpi=150, bbox_inches='tight')
    plt.show()
    print("✓ Saved padasip plot to outputs_padasip/signals_pada.png")
    
    # Plot padasip convergence
    window_pada = max(1, int(rate_noisy_pada * 0.05))
    lms_curve_pada = running_rms(lms_clean_pada, window_pada)
    rls_curve_pada = running_rms(rls_clean_pada, window_pada)
    length_pada = min(len(lms_curve_pada), len(rls_curve_pada))
    times_pada = np.arange(length_pada) / rate_noisy_pada
    
    plt.figure(figsize=(10, 5))
    plt.semilogy(times_pada, lms_curve_pada[:length_pada], label="LMS", 
                 linewidth=1.5, color='green')
    plt.semilogy(times_pada, rls_curve_pada[:length_pada], label="RLS", 
                 linewidth=1.5, color='orange')
    plt.xlabel("Time [s]", fontsize=11)
    plt.ylabel("Running RMS (log scale)", fontsize=11)
    plt.title("Convergence Comparison - Padasip Implementation", fontsize=12, fontweight='bold')
    plt.legend(fontsize=11)
    plt.grid(True, which="both", linestyle="--", alpha=0.5)
    plt.tight_layout()
    plt.savefig("outputs_padasip/convergence_pada.png", dpi=150, bbox_inches='tight')
    plt.show()
    print("✓ Saved padasip convergence plot to outputs_padasip/convergence_pada.png")
    
    # Save padasip convergence data
    with open("outputs_padasip/convergence_pada.csv", "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["time_sec", "lms_running_rms", "rls_running_rms"])
        for t, a, b in zip(times_pada, lms_curve_pada[:length_pada], rls_curve_pada[:length_pada]):
            writer.writerow([t, a, b])
    print("✓ Saved padasip convergence data to outputs_padasip/convergence_pada.csv")


## Download Results

Download the processed audio files and plots:


In [None]:
# Download output files
from google.colab import files

print("Downloading custom implementation outputs...")
files.download("outputs_basic/audio_lms.wav")
files.download("outputs_basic/audio_rls.wav")
files.download("outputs_basic/signals.png")
files.download("outputs_basic/convergence.png")
files.download("outputs_basic/convergence.csv")

if PADASIP_AVAILABLE:
    print("\nDownloading padasip implementation outputs...")
    files.download("outputs_padasip/audio_lms_pada.wav")
    files.download("outputs_padasip/audio_rls_pada.wav")
    files.download("outputs_padasip/signals_pada.png")
    files.download("outputs_padasip/convergence_pada.png")
    files.download("outputs_padasip/convergence_pada.csv")

print("\n✓ All files downloaded!")


# Summary
print("=" * 60)
print("SUMMARY")
print("=" * 60)
print("\n### Custom Implementation Results:")
print(f"  Input RMS: {rms(noisy):.4f}")
print(f"  LMS Output RMS: {rms(lms_clean):.4f} (order={LMS_ORDER}, μ={LMS_MU})")
print(f"  RLS Output RMS: {rms(rls_clean):.4f} (order={RLS_ORDER}, λ={RLS_LAM}, δ={RLS_DELTA})")

print("\n### Parameters Used:")
print(f"  LMS: Order = {LMS_ORDER}, Step size (μ) = {LMS_MU}")
print(f"  RLS: Order = {RLS_ORDER}, Forgetting factor (λ) = {RLS_LAM}, Initialization (δ) = {RLS_DELTA}")

if PADASIP_AVAILABLE:
    print("\n### Padasip Implementation Results:")
    print(f"  LMS Output RMS: {rms(lms_clean_pada):.4f} (order={LMS_ORDER_PADA}, μ={LMS_MU_PADA})")
    print(f"  RLS Output RMS: {rms(rls_clean_pada):.4f} (order={RLS_ORDER_PADA}, λ={RLS_LAM_PADA}, δ={RLS_DELTA_PADA})")

print("\n" + "=" * 60)
print("You can adjust the filter parameters in the code cells above to experiment with different settings.")
