In [119]:
# Check for the availability of GPU
import torch
torch.cuda.is_available()

True

In [123]:
# !pip install torch torchaudio librosa soundfile numpy pesq sounddevice scipy

In [2]:
# !pip install pesq

In [None]:
import os
import librosa
import numpy as np
import soundfile as sf
import random
from glob import glob
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from pesq import pesq
from scipy.io import wavfile

In [120]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
clips_path = os.path.join('/content/drive/MyDrive/DNS', 'clips')
noise_path = os.path.join('/content/drive/MyDrive/DNS', 'noise')
output_path = os.path.join('/content/drive/MyDrive/DNS', 'output')

In [122]:
clean_files = [os.path.join(clips_path,f) for f in os.listdir(clips_path) if os.path.isfile(os.path.join(clips_path, f))]
clean_files.sort()
noisy_files = [os.path.join(noise_path,f) for f in os.listdir(noise_path) if os.path.isfile(os.path.join(noise_path, f))]
noisy_files.sort()

# Ensure clean_files and noisy_files have the same length
min_len = min(len(clean_files), len(noisy_files))
clean_files = clean_files[:min_len]
noisy_files = noisy_files[:min_len]

print(len(clean_files), len(noisy_files))

915 915


In [126]:
# pre process the data, combining clear_voice and noise for training
def load_audio(file_path, sr=16000):
  try:
    audio, _ = librosa.load(file_path, sr=sr, mono=True)
    return audio
  except Exception as e:
    print(f"Error loading audio file {file_path}: {e}")
    return None

def normalize_audio(y):
  if y is None or len(y) == 0:
    return y
  return y / np.max(np.abs(y))

def mix_audio(speech, noise, snr_db):
  """
  Mixes speech with noise at given SNR (Signal-to-Noise Ratio)
  """
  if speech is None or noise is None:
    return None
  speech_power = np.mean(speech ** 2)
  noise_power = np.mean(noise ** 2)

  # Avoid division by zero
  if noise_power == 0:
    noise_power = 1e-10

  target_noise_power = speech_power / (10 ** (snr_db / 10))
  noise = noise * np.sqrt(target_noise_power / noise_power)
  mixed = speech + noise
  return normalize_audio(mixed)

def extract_mfcc(y, sr=16000, n_mfcc=40):
  if y is None:
    return None
  mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
  return mfcc.T

def process_pair(speech_path, noise_path, output_dir, snr_db=5):
  speech = load_audio(speech_path)
  noise = load_audio(noise_path)

  if speech is None or noise is None:
    print(f"Skipping pair due to audio loading error: {speech_path}, {noise_path}")
    return None, None

  # Trim noise to speech length or pad speech with silence
  if len(noise) < len(speech):
      noise = np.tile(noise, int(np.ceil(len(speech) / len(noise))))
  elif len(noise) > len(speech):
      speech = np.pad(speech, (0, len(noise) - len(speech)), 'constant')

  noise = noise[:len(speech)]
  mixed = mix_audio(speech, noise, snr_db)

  base_name = os.path.basename(speech_path).replace('.mp3', '.wav')
  try:
      sf.write(os.path.join(output_dir, f'clean_{base_name}'), speech, 16000)
      sf.write(os.path.join(output_dir, f'noisy_{base_name}'), mixed, 16000)
  except Exception as e:
      print(f"Error writing audio files: {e}")
      return None, None

  return speech, mixed

def preprocess_dataset(speech_folder, noise_folder, output_dir, sample_count=1000):
  speech_files = glob(os.path.join(speech_folder, '*.mp3'))
  noise_files = glob(os.path.join(noise_folder, '**/*.wav'), recursive=True)

  os.makedirs(output_dir, exist_ok=True)

  for i in range(sample_count):
      if not speech_files or not noise_files:
          print("No speech or noise files found. Exiting.")
          break

      s_path = random.choice(speech_files)
      n_path = random.choice(noise_files)
      process_pair(s_path, n_path, output_dir)

if __name__ == "__main__":
  preprocess_dataset(
      speech_folder=clips_path,
      noise_folder=noise_path,
      output_dir=output_path,
      sample_count=10
  )

In [127]:
class AudioDataset(Dataset):
    def __init__(self, clean_files, noisy_files, sr=16000, n_mfcc=40, seq_len=256):
        self.clean_files = clean_files
        self.noisy_files = noisy_files
        self.sr = sr
        self.n_mfcc = n_mfcc
        self.seq_len = seq_len

    def __len__(self):
        return min(len(self.clean_files), len(self.noisy_files))

    def __getitem__(self, idx):
        try:
            clean_path = self.clean_files[idx]
            noisy_path = self.noisy_files[idx]

            clean, _ = librosa.load(clean_path, sr=self.sr)
            noisy, _ = librosa.load(noisy_path, sr=self.sr)

            # Ensure both clean and noisy audio have the same length
            min_len = min(len(clean), len(noisy))
            clean = clean[:min_len]
            noisy = noisy[:min_len]

            clean_mfcc = librosa.feature.mfcc(y=clean, sr=self.sr, n_mfcc=self.n_mfcc)
            noisy_mfcc = librosa.feature.mfcc(y=noisy, sr=self.sr, n_mfcc=self.n_mfcc)

             # Pad or truncate to the fixed sequence length
            if clean_mfcc.shape[1] < self.seq_len:
                clean_mfcc = np.pad(clean_mfcc, ((0, 0), (0, self.seq_len - clean_mfcc.shape[1])), mode='constant')
            else:
                clean_mfcc = clean_mfcc[:, :self.seq_len]

            if noisy_mfcc.shape[1] < self.seq_len:
                noisy_mfcc = np.pad(noisy_mfcc, ((0, 0), (0, self.seq_len - noisy_mfcc.shape[1])), mode='constant')
            else:
                noisy_mfcc = noisy_mfcc[:, :self.seq_len]

            return torch.tensor(noisy_mfcc).unsqueeze(0).float(), torch.tensor(clean_mfcc).unsqueeze(0).float()

        except Exception as e:
            print(f"Error processing item {idx}: {e}")
            # Return empty tensors with the expected shape in case of error
            return torch.empty(1, self.n_mfcc, self.seq_len), torch.empty(1, self.n_mfcc, self.seq_len)

In [128]:
import torch.nn as nn

class CNNDenoiser(nn.Module):
    def __init__(self):
        super(CNNDenoiser, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=1, padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [129]:
# --- Config ---
EPOCHS = 10
BATCH_SIZE = 8
DATA_DIR = output_path  # Assuming output_path is defined elsewhere
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- Dataset & Loader ---
# Assuming clean_files and noisy_files are defined elsewhere
dataset = AudioDataset(clean_files, noisy_files)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

# --- Model, Loss, Optimizer ---
model = CNNDenoiser().to(DEVICE)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# --- Training Loop ---
for epoch in range(EPOCHS):
    total_loss = 0
    for i, (noisy, clean) in enumerate(loader):
        noisy, clean = noisy.to(DEVICE), clean.to(DEVICE)

        optimizer.zero_grad()
        output = model(noisy)

        # Ensure shapes match before calculating loss
        if output.shape != clean.shape:
            min_time_dim = min(output.shape[-1], clean.shape[-1])  # Assuming time dimension is the last
            output = output[:, :, :, :min_time_dim]
            clean = clean[:, :, :, :min_time_dim]

        loss = criterion(output, clean)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"[Epoch {epoch + 1}/{EPOCHS}] Loss: {total_loss / (i + 1):.4f}")  # Average loss per batch

# --- Save the model ---
torch.save(model.state_dict(), "cnn_denoiser.pth")

[Epoch 1/10] Loss: 1315.7945
[Epoch 2/10] Loss: 914.5612
[Epoch 3/10] Loss: 868.0090
[Epoch 4/10] Loss: 831.2142
[Epoch 5/10] Loss: 813.3644
[Epoch 6/10] Loss: 795.4296
[Epoch 7/10] Loss: 798.7734
[Epoch 8/10] Loss: 776.3153
[Epoch 9/10] Loss: 757.7644
[Epoch 10/10] Loss: 777.4769


In [130]:
#compute SNR, PESQ

def compute_snr(clean, noisy):
  noise = noisy - clean
  snr = 10 * np.log10(np.sum(clean ** 2) / np.sum(noise ** 2) + 1e-10)
  return snr

def compute_pesq(ref_path, deg_path):
  sr_ref, ref = wavfile.read(ref_path)
  sr_deg, deg = wavfile.read(deg_path)

  assert sr_ref == sr_deg
  ref = ref.astype(np.float32)
  deg = deg.astype(np.float32)

  try:
      score = pesq(sr_ref, ref, deg, 'wb')  # Wideband mode
      return score
  except Exception as e:
      print("PESQ error:", e)
      return -1.0

In [131]:
def denoise_audio_file(model_path, noisy_path, output_path, sr=16000, n_mfcc=40, seq_len=256):  # Added seq_len
    # Load model
    model = CNNDenoiser()
    model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))  # Explicitly specify device
    model.eval()

    # Load audio
    noisy, _ = librosa.load(noisy_path, sr=sr)

    # Extract MFCC with padding/truncation
    noisy_mfcc = librosa.feature.mfcc(y=noisy, sr=sr, n_mfcc=n_mfcc)

    # Pad or truncate to the fixed sequence length
    if noisy_mfcc.shape[1] < seq_len:
        noisy_mfcc = np.pad(noisy_mfcc, ((0, 0), (0, seq_len - noisy_mfcc.shape[1])), mode='constant')
    else:
        noisy_mfcc = noisy_mfcc[:, :seq_len]

    noisy_tensor = torch.tensor(noisy_mfcc).unsqueeze(0).unsqueeze(0).float()

    # Predict clean MFCC
    with torch.no_grad():
        clean_tensor = model(noisy_tensor)
    clean_mfcc = clean_tensor.squeeze().numpy()

    # Convert MFCC to waveform
    clean_audio = librosa.feature.inverse.mfcc_to_audio(clean_mfcc, sr=sr)

    # Ensure clean_audio and noisy_audio have the same length
    min_len = min(len(clean_audio), len(noisy))
    clean_audio = clean_audio[:min_len]
    noisy = noisy[:min_len]

    # Save result
    sf.write(output_path, clean_audio, sr)
    print(f"[✓] Denoised audio saved to: {output_path}")

    return clean_audio, noisy, sr

if __name__ == "__main__":
    clean_audio, noisy_audio, sr = denoise_audio_file(
        model_path="cnn_denoiser.pth",
        noisy_path=os.path.join('/content/drive/MyDrive/DNS/', 'input_noisy.wav'),
        output_path=os.path.join('/content/drive/MyDrive/DNS/', 'output_clear.wav')
    )

    # Evaluate
    snr = compute_snr(clean_audio, noisy_audio)
    pesq = compute_pesq(os.path.join('/content/drive/MyDrive/DNS/', 'input_noisy.wav'), os.path.join('/content/drive/MyDrive/DNS/', 'output_clear.wav'))

    print(f"🔊 SNR: {snr:.2f} dB")
    print(f"🗣️ PESQ: {pesq:.2f}")

[✓] Denoised audio saved to: /content/drive/MyDrive/DNS/output_clear.wav
🔊 SNR: -30.79 dB
🗣️ PESQ: 1.67
