In [None]:
!pip install pyannote.audio

## **IMPORT LIBRARIES**

In [1]:
import numpy as np
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
import numpy as np
import os
from scipy.io import wavfile
import scipy.signal as signal
from google.colab import files
from pyannote.audio import Pipeline
from pyannote.core import Segment


from IPython.display import Audio

ModuleNotFoundError: No module named 'google.colab'

## **AUDIO PLOTTER**

In [2]:
def plot_overlay_waveform(y_input, y_denoised, sr, title="Audio Comparison"):

    # Ensure same length
    min_len = min(len(y_input), len(y_denoised))
    y_input = y_input[:min_len]
    y_denoised = y_denoised[:min_len]

    # Time axis
    t = np.arange(min_len) / sr

    # Plot overlay
    plt.figure(figsize=(12, 4))
    plt.plot(t, y_input, alpha=0.6, label="Input (Noisy)", color="red")
    plt.plot(t, y_denoised, alpha=0.8, label="Denoised", color="blue")
    plt.xlabel("Time [s]")
    plt.ylabel("Amplitude")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

ALL NEW ADDITIONS GO BELOW HERE

## Noise Profile Extractor

In [None]:
def extract_noise_profile(input_signal):
  pipeline = Pipeline.from_pretrained(
      "pyannote/voice-activity-detection",
      use_auth_token="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"
  )
  #Load the audio file
  audio, sample_rate = sf.read(input_signal)
  output = pipeline(input_signal)
  
  #Add segments with no speech to noise profile
  no_speech_segments = []
  last_end = 0.0
  for speech in output.get_timeline().support():
      if speech.start > last_end:
          no_speech_segments.append(Segment(last_end,speech.start))
      last_end = speech.end
  
  # Add final segment if there's silence at the end
  audio_duration = len(audio) / sample_rate
  if last_end < audio_duration:
    no_speech_segments.append(Segment(last_end, audio_duration))
  
  #Extract noise profile from quite segments
  noise_samples = []
  for segment in no_speech_segments:
      start = int(segment.start * sample_rate)  # Convert to sample index
      end = int(segment.end * sample_rate)
      if end > start:
       noise_samples.append(audio[start:end])
  
  if noise_samples:
      noise_profile = np.concatenate(noise_samples)
      print(f"Input length: {len(audio)} samples, Noise profile length: {len(noise_profile)} samples")
      return noise_profile
  else:
      #use first 500ms as noise estimate
      noise_profile = audio[:int(0.5*sample_rate)]
      print(f"Input length: {len(audio)} samples, Noise profile length: {len(noise_profile)} samples")
      return noise_profile

## Spectral Subtractor

In [None]:
#get the current working directory
base_path = os.getcwd()
output_path = os.path.join(base_path,"subtracted_signal.wav")

from google.colab import files

#Resample input file, return resampled signal in same format
def resample(input_signal, old_sample_rate, new_sample_rate):
    if old_sample_rate == new_sample_rate:
        return input_signal, old_sample_rate
    else:
        resampled_signal = signal.resample_poly(input_signal, new_sample_rate, old_sample_rate)
        return resampled_signal.astype(input_signal.dtype)

#Only perform stft on mono audio
def stft(audio, dimensions):
    dimensions = audio.ndim
    if dimensions == 1:
        transform = librosa.stft(audio) #mono audio
        return transform
    else:
       #convert to mono
       audio_mono = librosa.to_mono(audio.T)  # Transpose to ensure correct shape
       transform = librosa.stft(audio_mono)
       return transform

def spectral_subtraction(noise_profile_n, input_signal_n):
    N = stft(noise_profile_n, noise_profile_n.ndim)
    if N is None:
        print("Error: STFT failed for noise profile")
        return None
    mN = np.abs(N)

    Y = stft(input_signal_n, input_signal_n.ndim)
    if Y is None:
        print("Error: STFT failed for input signal")
        return None
    mY = np.abs(Y)
    pY = np.angle(Y)
    poY = np.exp(1j * pY)

    noise_mean = np.mean(mN, axis=1, dtype="float64")
    noise_mean = noise_mean[:, np.newaxis]
    output_X = mY - noise_mean
    X = np.clip(output_X, a_min=0, a_max=None)
    X = X * poY

    output_signal = librosa.istft(X)
    return output_signal

def process_audio(input_file, noise_profile, desired_FS):
    y, fs_y = sf.read(input_file)
    input_dimensions = y.ndim

    if(fs_y != desired_FS):
        y = resample(y,fs_y,desired_FS)

    if noise_profile is None:
        n = extract_noise_profile(input_file)
        fs_n = fs_y
    else:
        n, fs_n = sf.read(noise_profile)
        if(fs_n != desired_FS):
            n = resample(n, fs_n, desired_FS)

    profile_dimensions = n.ndim

    assert profile_dimensions <= 2, "Only mono and stereo files supported for noise profile."
    assert input_dimensions <= 2, "Only mono and stereo files supported for input signal."

    if (profile_dimensions > input_dimensions):
        num_channels = profile_dimensions
        y = np.array([y,y], ndmin=num_channels)
        y = np.moveaxis(y, 0, 1)
    else:
        num_channels = input_dimensions
        n = np.array([n,n], ndmin = num_channels)
        n = np.moveaxis(n, 0, 1)

    for c in range(num_channels):
        if num_channels == 1:
            noise_channel = n
            input_channel = y
        else:
            noise_channel = n[:, c]
            input_channel = y[:, c]
        single_channel_output = spectral_subtraction(noise_channel, input_channel)
        if single_channel_output is None:
            print(f"Error processing channel {c}")
            return
        if (c==0):
            output_x = np.zeros((len(single_channel_output), num_channels))
        output_x[:,c] = single_channel_output

    if (num_channels > 1):
        output_x = np.mean(output_x, axis=1)

    # Save output to Colab file system and download
    sf.write(output_path,output_x,desired_FS,format='WAV')
    print(f"Saved output to {output_path}")
    files.download(output_path)
    return

## **WIENER FILTERING IMPLEMENTATION FOR STATIONARY NOISE**

In [3]:
def wiener_denoise(
    input_path: str,
    output_path: str,
    noise_duration: float = 0.5,
    n_fft: int = 1024,
    hop_length: int = 512
):

    y_noisy, sr = librosa.load(input_path, sr=None)

    D = librosa.stft(y_noisy, n_fft=n_fft, hop_length=hop_length)
    magnitude, phase = np.abs(D), np.angle(D)

    noise_frames = magnitude[:, :int(noise_duration * sr / hop_length)]
    noise_power = np.mean(noise_frames**2, axis=1, keepdims=True)

    signal_power = magnitude**2
    alpha = 12
    H = signal_power / (signal_power + (alpha * noise_power) + 1e-12)  # avoid divison by zero
    D_denoised = H * D

    y_denoised = librosa.istft(D_denoised, hop_length=hop_length)

    sf.write(output_path, y_denoised, sr)
    print(f"[INFO] Denoised audio saved at: {output_path} with alpha = {alpha}")

    return y_denoised, sr

## **PERFORM WIENER FILTERING**

In [4]:
uploaded = files.upload()
input_path = list(uploaded.keys())[0]

y_denoised, sr = wiener_denoise(
    input_path,
    output_path="denoised_wiener_audio.wav"
)

Saving Fan_Noise.wav to Fan_Noise.wav
[INFO] Denoised audio saved at: denoised_wiener_audio.wav with alpha = 12


In [None]:
Audio("Fan_Noise.wav") #LISTEN TO NOISY AUDIO

In [None]:
Audio("denoised_wiener_audio.wav") #LISTEN TO DENOISED AUDIO

In [None]:
y_input, sr = librosa.load("Fan_Noise.wav", sr=None)
y_denoised, _ = librosa.load("denoised_wiener_audio.wav", sr=None)

plot_overlay_waveform(y_input, y_denoised,sr)

## Perform Spectral Subtraction

In [None]:

process_audio(input_path, noise_profile=None, desired_FS=16000)

In [None]:
Audio("p232_074.wav")

In [None]:
Audio("subtracted_signal.wav")

In [None]:
y_input, sr = librosa.load("p232_014.wav", sr=16000)
y_denoised, _ = librosa.load("subtracted_signal.wav", sr=16000)

plot_overlay_waveform(y_input, y_denoised,sr)

## **Audio Effects**

In [None]:
class SimpleAudioFilters:
    def __init__(self, sample_rate):
        self.sample_rate = sample_rate

    def low_pass_filter(self, audio_data, cutoff_freq=1000):
        nyquist = self.sample_rate / 2
        normalized_cutoff = cutoff_freq / nyquist
        b, a = signal.butter(4, normalized_cutoff, btype='low')
        return signal.filtfilt(b, a, audio_data)

    def high_pass_filter(self, audio_data, cutoff_freq=300):
        nyquist = self.sample_rate / 2
        normalized_cutoff = cutoff_freq / nyquist
        b, a = signal.butter(4, normalized_cutoff, btype='high')
        return signal.filtfilt(b, a, audio_data)

    def echo_effect(self, audio_data, delay_ms=500, decay=0.5):
        delay_samples = int(delay_ms * self.sample_rate / 1000)
        output = np.zeros(len(audio_data) + delay_samples)
        output[:len(audio_data)] = audio_data
        output[delay_samples:delay_samples + len(audio_data)] += audio_data * decay
        return output[:len(audio_data)]

    def pitch_shift(self, audio_data, pitch_factor=1.2):
        original_length = len(audio_data)
        new_length = int(original_length / pitch_factor)
        resampled = signal.resample(audio_data, new_length)

        if len(resampled) > original_length:
            return resampled[:original_length]
        else:
            padded = np.zeros(original_length)
            padded[:len(resampled)] = resampled
            return padded

    def tremolo_effect(self, audio_data, rate=5, depth=0.5):
        t = np.arange(len(audio_data)) / self.sample_rate
        modulation = 1 + depth * np.sin(2 * np.pi * rate * t)
        return audio_data * modulation

    def ring_modulation(self, audio_data, carrier_freq=440):
        t = np.arange(len(audio_data)) / self.sample_rate
        carrier = np.sin(2 * np.pi * carrier_freq * t)
        return audio_data * carrier

    def hard_clipping(self, audio_data, threshold=0.1):
        return np.clip(audio_data, -threshold, threshold)

    def telephone_filter(self, audio_data):
        filtered = self.low_pass_filter(
            self.high_pass_filter(audio_data, 300), 3400)

        distorted = np.tanh(filtered * 2) * 0.7

        noise = np.random.normal(0, 0.01, len(distorted))

        return distorted + noise

## **Plot Filter Response**

In [None]:
def plot_filter_response(original, filtered, sample_rate, title="Filter Response"):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Time domain (first 1000 samples)
    time_orig = np.arange(len(original)) / sample_rate
    time_filt = np.arange(len(filtered)) / sample_rate

    axes[0, 0].plot(time_orig[:1000], original[:1000])
    axes[0, 0].set_title("Original Signal (Time)")
    axes[0, 0].set_xlabel("Time (s)")
    axes[0, 0].set_ylabel("Amplitude")

    axes[0, 1].plot(time_filt[:1000], filtered[:1000])
    axes[0, 1].set_title("Filtered Signal (Time)")
    axes[0, 1].set_xlabel("Time (s)")
    axes[0, 1].set_ylabel("Amplitude")

    # Frequency domain
    fft_orig = np.fft.fft(original)
    fft_filt = np.fft.fft(filtered)
    freqs = np.fft.fftfreq(len(original), 1 / sample_rate)
    n = len(freqs) // 2  # positive freqs only

    axes[1, 0].plot(freqs[:n], 20 * np.log10(np.abs(fft_orig[:n]) + 1e-10))
    axes[1, 0].set_title("Original Spectrum")
    axes[1, 0].set_xlabel("Frequency (Hz)")
    axes[1, 0].set_ylabel("Magnitude (dB)")
    axes[1, 0].grid(True)

    axes[1, 1].plot(freqs[:n], 20 * np.log10(np.abs(fft_filt[:n]) + 1e-10))
    axes[1, 1].set_title("Filtered Spectrum")
    axes[1, 1].set_xlabel("Frequency (Hz)")
    axes[1, 1].set_ylabel("Magnitude (dB)")
    axes[1, 1].grid(True)

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

## **Helper Functions**

In [None]:
def _to_float32(x):
    """Convert PCM or float to float32 in [-1, 1]."""
    if x.dtype == np.int16:
        return x.astype(np.float32) / 32768.0
    if x.dtype == np.int32:
        return x.astype(np.float32) / 2147483648.0
    if x.dtype == np.uint8:
        return (x.astype(np.float32) - 128.0) / 128.0
    return x.astype(np.float32)

def _from_float32(x_float, dtype=np.int16):
    """Convert float32 in [-1, 1] to desired PCM dtype (default int16)."""
    x = np.clip(x_float, -1.0, 1.0)
    if dtype == np.int16:
        return (x * 32767.0).astype(np.int16)
    if dtype == np.int32:
        return (x * 2147483647.0).astype(np.int32)
    if dtype == np.uint8:
        return (np.round((x * 127.0) + 128.0)).astype(np.uint8)
    return x.astype(dtype)

def _apply_channelwise(effect_fn, audio_float):
    """Apply effect to mono or each channel of a stereo/multi-channel signal."""
    if audio_float.ndim == 1:
        return effect_fn(audio_float)
    # For multi-channel: process each channel independently
    processed = np.empty_like(audio_float)
    for c in range(audio_float.shape[1]):
        processed[:, c] = effect_fn(audio_float[:, c])
    return processed

## **Perform Audio Effects**

In [None]:
if __name__ == "__main__":
    SCRIPT_DIR = os.getcwd()
    INPUT_WAV = os.path.join(SCRIPT_DIR, "Audio_Effects.wav")
    OUTPUT_WAV = os.path.join(SCRIPT_DIR, "effected_output.wav")

    sample_rate, audio = wavfile.read(INPUT_WAV)
    original_dtype = audio.dtype

    audio_f32 = _to_float32(audio)
    fx = SimpleAudioFilters(sample_rate)

    print("What type of filter do you want?\n" \
    "Echo Effect (0)\n" \
    "Pitch Shift (1)\n" \
    "Tremolo Effect (2)\n" \
    "Ring Modulation (3)\n" \
    "Hard Clipping Distortion (4)\n" \
    "Telephone Filter (5)\n")

    answer = int(input("Enter a number (0-5): ").strip())
    chosen_effect = None
    if answer == 0:
        delay = int(input("What should be the delay in millisecons?"))
        chosen_effect = lambda x: fx.echo_effect(x, delay_ms=delay)
    elif answer == 1:
        factor = float(input("What should be the pitch factor?"))
        chosen_effect = lambda x: fx.pitch_shift(x, pitch_factor=factor)
    elif answer == 2:
        rate = int(input("What should be the rate?"))
        depth = float(input("What should be the depth?"))
        chosen_effect = lambda x: fx.tremolo_effect(x, rate=rate, depth=depth)
    elif answer == 3:
        freq = int(input("What should be the carrier frequency?"))
        chosen_effect = lambda x: fx.ring_modulation(x, carrier_freq=freq)
    elif answer == 4:
        threshold = float(input("What should be the distortion threshold?"))
        chosen_effect = lambda x: fx.hard_clipping(x, threshold=threshold)
    elif answer == 5:
        chosen_effect = lambda x: fx.telephone_filter(x)

    if chosen_effect is None:
        raise ValueError("Invalid choice. Please run again and enter a number 0–5.")

    processed = _apply_channelwise(chosen_effect, audio_f32)

    peak = np.max(np.abs(processed))
    if peak > 1.0:
        processed = processed / peak * 0.99

    wavfile.write(OUTPUT_WAV, sample_rate, _from_float32(processed, dtype=np.int16))
    print(f"Done. Wrote: {OUTPUT_WAV}")
    plot_filter_response(audio_f32 if audio_f32.ndim == 1 else audio_f32[:,0],
                     processed if processed.ndim == 1 else processed[:,0],
                     sample_rate,
                     title=f"Effect {answer} Response")

In [None]:
Audio("Audio_Effects.wav")

In [None]:
Audio("effected_output.wav")