In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/trained-32kbps/WaveUNet_32kbps.pth
/kaggle/input/trained-64kbps/WaveUNet_64kbps.pth
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0005.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0080.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0041.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0056.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0028.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0045.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0031.opus
/kaggle/input/compressed-musan/compressed_musan/noise/sound-bible/compressed_8kbps/noise-sound-bible-0084.opus
/kaggle/input/

# MUSIC FOLDER (FROM MUSAN DATASET)

# STEP 1: 
# Compression at different bitrates (16kbps, 32kbps, 64kbps)

**COMPRESSING THE FMA FOLDER**

In [2]:
import os
import subprocess
import shutil

# Paths
DATA_PATH = "/kaggle/input/music-folder-musan/music"
OUTPUT_PATH = "/kaggle/working/compressed_music_fma"
TARGET_FOLDERS = ["fma"]

# Bitrates for music compression
BITRATES = {
    "16kbps": "16000",
    "32kbps": "32000",
    "64kbps": "64000"
}

# Create output directory if not exists
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Function to compress a single file
def compress_audio(input_file, output_file, bitrate):
    cmd = f"ffmpeg -i '{input_file}' -c:a libopus -b:a {bitrate} '{output_file}' -y"
    subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

# Resume check: Load last processed file if exists
resume_file = "/kaggle/working/music_fma_last_processed.txt"
last_processed = None
if os.path.exists(resume_file):
    with open(resume_file, "r") as f:
        last_processed = f.read().strip()

# Process files
processed_count = 0
resume_found = last_processed is None  # If no resume file, start immediately

for folder in TARGET_FOLDERS:
    folder_path = os.path.join(DATA_PATH, folder)
    if not os.path.exists(folder_path):
        continue
    
    for root, _, files in os.walk(folder_path):
        for file in sorted(files):  # Sorting ensures consistent processing order
            if file.endswith(".wav"):
                input_file = os.path.join(root, file)
                relative_path = os.path.relpath(root, DATA_PATH)  # Maintain hierarchy
                
                if not resume_found:
                    if input_file == last_processed:
                        resume_found = True  # Resume processing from the next file
                    continue
                
                # Apply compression for each bitrate
                for bitrate_label, bitrate_value in BITRATES.items():
                    output_dir = os.path.join(OUTPUT_PATH, f"compressed_{bitrate_label}", relative_path)
                    os.makedirs(output_dir, exist_ok=True)  # Create directories if missing
                    
                    output_file = os.path.join(output_dir, file.replace(".wav", ".opus"))
                    compress_audio(input_file, output_file, bitrate_value)

                processed_count += 1
                
                # Save progress every 50 files
                if processed_count % 50 == 0:
                    with open(resume_file, "w") as f:
                        f.write(input_file)
                    
                    # Zip and upload files every 50 processed
                    zip_path = "/kaggle/working/compressed_music_fma.zip"
                    shutil.make_archive(zip_path.replace(".zip", ""), 'zip', OUTPUT_PATH)
                    print("✅ 50 files compressed & backup saved!")

print("✅ Music compression complete!")

# Final dataset backup
final_zip_path = "/kaggle/working/compressed_music_fma_final.zip"
shutil.make_archive(final_zip_path.replace(".zip", ""), 'zip', OUTPUT_PATH)
print(f"✅ Final backup saved at: {final_zip_path}")

✅ 50 files compressed & backup saved!
✅ 50 files compressed & backup saved!
✅ Music compression complete!
✅ Final backup saved at: /kaggle/working/compressed_music_fma_final.zip


**COMPRESSING THE FMA-WESTERN-ART FOLDER**

In [1]:
import os
import subprocess
import shutil
  
# Paths
DATA_PATH = "/kaggle/input/music-folder-musan/music"
OUTPUT_PATH = "/kaggle/working/compressed_music_fma_western_art"
TARGET_FOLDERS = ["fma-western-art"]

# Bitrates for music compression
BITRATES = {
    "16kbps": "16000",
    "32kbps": "32000",
    "64kbps": "64000"
}

# Create output directory if not exists
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Function to compress a single file
def compress_audio(input_file, output_file, bitrate):
    cmd = f"ffmpeg -i '{input_file}' -c:a libopus -b:a {bitrate} '{output_file}' -y"
    subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

# Resume check: Load last processed file if exists
resume_file = "/kaggle/working/music_last_processed_fma_western_art.txt"
last_processed = None
if os.path.exists(resume_file):
    with open(resume_file, "r") as f:
        last_processed = f.read().strip()

# Process files
processed_count = 0
resume_found = last_processed is None  # If no resume file, start immediately

for folder in TARGET_FOLDERS:
    folder_path = os.path.join(DATA_PATH, folder)
    if not os.path.exists(folder_path):
        continue
    
    for root, _, files in os.walk(folder_path):
        for file in sorted(files):  # Sorting ensures consistent processing order
            if file.endswith(".wav"):
                input_file = os.path.join(root, file)
                relative_path = os.path.relpath(root, DATA_PATH)  # Maintain hierarchy
                
                if not resume_found:
                    if input_file == last_processed:
                        resume_found = True  # Resume processing from the next file
                    continue
                
                # Apply compression for each bitrate
                for bitrate_label, bitrate_value in BITRATES.items():
                    output_dir = os.path.join(OUTPUT_PATH, f"compressed_{bitrate_label}", relative_path)
                    os.makedirs(output_dir, exist_ok=True)  # Create directories if missing
                    
                    output_file = os.path.join(output_dir, file.replace(".wav", ".opus"))
                    compress_audio(input_file, output_file, bitrate_value)

                processed_count += 1
                
                # Save progress every 50 files
                if processed_count % 50 == 0:
                    with open(resume_file, "w") as f:
                        f.write(input_file)
                    
                    # Zip and upload files every 50 processed
                    zip_path = "/kaggle/working/compressed_music_fma_western_art.zip"
                    shutil.make_archive(zip_path.replace(".zip", ""), 'zip', OUTPUT_PATH)
                    print("✅ 50 files compressed & backup saved!")

print("✅ Music compression complete!")

# Final dataset backup
final_zip_path = "/kaggle/working/compressed_music_fma_western_artfinal.zip"
shutil.make_archive(final_zip_path.replace(".zip", ""), 'zip', OUTPUT_PATH)
print(f"✅ Final backup saved at: {final_zip_path}")

✅ 50 files compressed & backup saved!
✅ Music compression complete!
✅ Final backup saved at: /kaggle/working/compressed_music_fma_western_artfinal.zip


# STEP 2:
# Analysing the speech quality between the compressed and original music 

**SNR:**

**1. FMA - 16kbps**

In [3]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-folder/compressed_16kbps/fma"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-0000.wav: 8.29 dB
SNR for music-fma-0001.wav: 12.53 dB
SNR for music-fma-0002.wav: 12.60 dB
SNR for music-fma-0003.wav: 13.18 dB
SNR for music-fma-0004.wav: 12.02 dB
SNR for music-fma-0005.wav: 10.72 dB
SNR for music-fma-0006.wav: 9.36 dB
SNR for music-fma-0007.wav: 7.01 dB
SNR for music-fma-0008.wav: 7.94 dB
SNR for music-fma-0009.wav: 9.59 dB

✅ Average SNR for compressed dataset: 10.32 dB


**2. FMA - 32kbps**

In [4]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-folder/compressed_32kbps/fma"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-0000.wav: 14.42 dB
SNR for music-fma-0001.wav: 18.54 dB
SNR for music-fma-0002.wav: 18.39 dB
SNR for music-fma-0003.wav: 19.14 dB
SNR for music-fma-0004.wav: 17.91 dB
SNR for music-fma-0005.wav: 17.33 dB
SNR for music-fma-0006.wav: 15.06 dB
SNR for music-fma-0007.wav: 12.46 dB
SNR for music-fma-0008.wav: 13.76 dB
SNR for music-fma-0009.wav: 15.39 dB

✅ Average SNR for compressed dataset: 16.24 dB


**3. FMA - 64kbps**

In [5]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-folder/compressed_64kbps/fma"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-0000.wav: 23.90 dB
SNR for music-fma-0001.wav: 26.07 dB
SNR for music-fma-0002.wav: 23.65 dB
SNR for music-fma-0003.wav: 25.38 dB
SNR for music-fma-0004.wav: 23.22 dB
SNR for music-fma-0005.wav: 25.73 dB
SNR for music-fma-0006.wav: 23.81 dB
SNR for music-fma-0007.wav: 22.79 dB
SNR for music-fma-0008.wav: 24.03 dB
SNR for music-fma-0009.wav: 24.21 dB

✅ Average SNR for compressed dataset: 24.28 dB


**4. FMA-WESTERN-ART - 16kbps**

In [6]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma-western-art"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-western-art-folder/compressed_16kbps/fma-western-art"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-wa-0000.wav: 17.36 dB
SNR for music-fma-wa-0001.wav: 17.92 dB
SNR for music-fma-wa-0002.wav: 16.91 dB
SNR for music-fma-wa-0003.wav: 10.80 dB
SNR for music-fma-wa-0004.wav: 11.69 dB
SNR for music-fma-wa-0005.wav: 11.04 dB
SNR for music-fma-wa-0006.wav: 12.06 dB
SNR for music-fma-wa-0007.wav: 11.40 dB
SNR for music-fma-wa-0008.wav: 13.03 dB
SNR for music-fma-wa-0009.wav: 11.36 dB

✅ Average SNR for compressed dataset: 13.36 dB


**5. FMA-WESTERN-ART - 32kbps**

In [7]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma-western-art"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-western-art-folder/compressed_32kbps/fma-western-art"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-wa-0000.wav: 24.41 dB
SNR for music-fma-wa-0001.wav: 25.56 dB
SNR for music-fma-wa-0002.wav: 24.11 dB
SNR for music-fma-wa-0003.wav: 15.59 dB
SNR for music-fma-wa-0004.wav: 17.68 dB
SNR for music-fma-wa-0005.wav: 17.36 dB
SNR for music-fma-wa-0006.wav: 17.56 dB
SNR for music-fma-wa-0007.wav: 17.88 dB
SNR for music-fma-wa-0008.wav: 19.19 dB
SNR for music-fma-wa-0009.wav: 17.49 dB

✅ Average SNR for compressed dataset: 19.68 dB


**6. FMA-WESTERN-ART - 64kbps**

In [8]:
import os
import librosa
import numpy as np

# Function to compute SNR
def snr(original, compressed):
    # Ensure both signals have the same length
    min_len = min(len(original), len(compressed))
    original = original[:min_len]
    compressed = compressed[:min_len]
    
    noise = original - compressed
    eps = 1e-10  # Small value to prevent division by zero

    return 10 * np.log10(np.sum(original**2) / (np.sum(noise**2) + eps))

# Paths
ORIGINAL_PATH = "/kaggle/input/music-folder-musan/music/fma-western-art"  # Path to original dataset
COMPRESSED_PATH = "/kaggle/input/compressed-fma-western-art-folder/compressed_64kbps/fma-western-art"  # Example for 16kbps

# Process first 10 files for SNR evaluation
snr_values = []

for root, _, files in os.walk(ORIGINAL_PATH):
    for file in sorted(files):
        if file.endswith(".wav"):
            original_file = os.path.join(root, file)
            compressed_file = os.path.join(COMPRESSED_PATH, file.replace(".wav", ".opus"))
            
            # Load original and compressed audio
            try:
                original_audio, _ = librosa.load(original_file, sr=16000)  # Standard sampling rate
                compressed_audio, _ = librosa.load(compressed_file, sr=16000)

                # Compute SNR
                snr_value = snr(original_audio, compressed_audio)
                snr_values.append(snr_value)

                print(f"SNR for {file}: {snr_value:.2f} dB")
                
                if len(snr_values) == 10:  # Stop after 10 files
                    break
            except Exception as e:
                print(f"Error processing {file}: {e}")

    if len(snr_values) == 10:
        break

# Average SNR
if snr_values:
    avg_snr = np.mean(snr_values)
    print(f"\n✅ Average SNR for compressed dataset: {avg_snr:.2f} dB")
else:
    print("\n⚠️ No valid files processed for SNR calculation.")

SNR for music-fma-wa-0000.wav: 31.70 dB
SNR for music-fma-wa-0001.wav: 32.95 dB
SNR for music-fma-wa-0002.wav: 31.55 dB
SNR for music-fma-wa-0003.wav: 18.14 dB
SNR for music-fma-wa-0004.wav: 26.78 dB
SNR for music-fma-wa-0005.wav: 26.47 dB
SNR for music-fma-wa-0006.wav: 19.83 dB
SNR for music-fma-wa-0007.wav: 27.00 dB
SNR for music-fma-wa-0008.wav: 27.88 dB
SNR for music-fma-wa-0009.wav: 22.86 dB

✅ Average SNR for compressed dataset: 26.52 dB


# STEP 3:
# Installing Pytorch 

In [9]:
!pip install torch torchvision torchaudio --quiet

# STEP 4:
# Defining the custom Wave-U-Net Model

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import os
from torch.utils.data import Dataset, DataLoader
import torchaudio.transforms as T

# Custom Wave-U-Net model
class WaveUNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, depth=5, filters=24):
        super(WaveUNet, self).__init__()
        
        self.depth = depth
        self.encoders = nn.ModuleList()
        self.decoders = nn.ModuleList()
        
        # Encoding layers
        for i in range(depth):
            self.encoders.append(nn.Conv1d(in_channels, filters, kernel_size=15, stride=1, padding=7))
            in_channels = filters
            filters *= 2

        # Decoding layers
        filters //= 2
        for i in range(depth):
            self.decoders.append(nn.ConvTranspose1d(filters, in_channels, kernel_size=15, stride=1, padding=7))
            filters //= 2
            in_channels = filters

        self.final_layer = nn.Conv1d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        skips = []
        for encoder in self.encoders:
            x = torch.relu(encoder(x))
            skips.append(x)

        skips = skips[::-1]  # Reverse for decoding
        for i, decoder in enumerate(self.decoders):
            x = torch.relu(decoder(x) + skips[i])

        return self.final_layer(x)

# STEP 5:
# Training and Validating the Wave-U-Net Model on original and compressed Data Pairs 

In [11]:
import os
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import subprocess

# ✅ Paths
COMPRESSED_DIRS = {
    "16kbps": ["/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_16kbps/fma",
               "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_16kbps/fma-western-art", 
               "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_16kbps/hd-classical", 
               "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_16kbps/jamendo", 
               "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/16kbps"],

    "32kbps": ["/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_32kbps/fma", 
               "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_32kbps/fma-western-art", 
               "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_32kbps/hd-classical", 
               "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_32kbps/jamendo", 
               "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/32kbps"],

    "64kbps": ["/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_64kbps/fma", 
               "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_64kbps/fma-western-art", 
               "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_64kbps/hd-classical", 
               "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_64kbps/jamendo", 
               "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/64kbps"]
}

ORIGINAL_DIR = "/kaggle/input/music-folder-musan/music"

# ✅ Dataset Class
class MusicDataset(Dataset):
    def __init__(self, compressed_dirs, original_dir, bitrate="16kbps", sample_rate=16000, duration=5):
        self.compressed_files = []
        self.original_files = []
        self.sample_rate = sample_rate
        self.fixed_length = sample_rate * duration  

        original_files_map = {}
        for root, _, files in os.walk(original_dir):
            for file in files:
                if file.endswith(".wav"):
                    base_name = file.rsplit('.', 1)[0]
                    original_files_map[base_name] = os.path.join(root, file)

        for folder in compressed_dirs[bitrate]:  
            if os.path.exists(folder):
                for file in os.listdir(folder):
                    if file.endswith(".opus"):
                        base_name = file.rsplit('.', 1)[0]
                        if base_name in original_files_map:
                            self.compressed_files.append(os.path.join(folder, file))
                            self.original_files.append(original_files_map[base_name])

        print(f"✅ Loaded {len(self.original_files)} valid pairs for {bitrate}.")

    def __len__(self):
        return len(self.original_files)

    def __getitem__(self, idx):
        compressed_path = self.compressed_files[idx]
        original_path = self.original_files[idx]

        if compressed_path.endswith(".opus"):
            wav_filename = os.path.basename(compressed_path).replace(".opus", ".wav")
            wav_path = os.path.join("/kaggle/working/", wav_filename)

            if not os.path.exists(wav_path):
                subprocess.run(["ffmpeg", "-i", compressed_path, "-ar", str(self.sample_rate), "-ac", "1", wav_path], capture_output=True, text=True)

            compressed_path = wav_path  

        compressed_waveform, _ = torchaudio.load(compressed_path)
        original_waveform, _ = torchaudio.load(original_path)

        compressed_waveform = self._fix_length(compressed_waveform)
        original_waveform = self._fix_length(original_waveform)

        return compressed_waveform, original_waveform

    def _fix_length(self, waveform):
        num_samples = waveform.shape[1]
        if num_samples > self.fixed_length:
            return waveform[:, :self.fixed_length]  
        else:
            padding = torch.zeros((1, self.fixed_length - num_samples))  
            return torch.cat((waveform, padding), dim=1)  

# ✅ Model
class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        return self.decoder(self.encoder(x))

# ✅ Training Function with Validation
def train_model(train_loader, val_loader, model, criterion, optimizer, device, num_epochs, bitrate):
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {total_loss / len(train_loader)}")

        # ✅ Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item()

        print(f"Validation Loss: {val_loss / len(val_loader)}")

    # ✅ Save trained model
    save_dir = f"/kaggle/working/{bitrate}"
    os.makedirs(save_dir, exist_ok=True)
    model_path = os.path.join(save_dir, f"WaveUNet_{bitrate}.pth")
    torch.save(model.state_dict(), model_path)
    print(f"✅ Model saved at {model_path}")

# ✅ Main Function (Train/Val/Test Split)
def main():
    bitrates = ["16kbps", "32kbps", "64kbps"]
    batch_size = 8
    num_epochs = 10
    learning_rate = 0.001
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for bitrate in bitrates:
        print(f"\n🚀 Training for {bitrate}...\n")

        dataset = MusicDataset(COMPRESSED_DIRS, ORIGINAL_DIR, bitrate=bitrate)
        if len(dataset) == 0:
            print(f"❌ No valid audio pairs found for {bitrate}. Skipping...")
            continue

        train_size = int(0.7 * len(dataset))
        val_size = int(0.15 * len(dataset))
        test_size = len(dataset) - train_size - val_size  
        train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])

        train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

        model = WaveUNet()
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        train_model(train_loader, val_loader, model, criterion, optimizer, device, num_epochs, bitrate)

if __name__ == "__main__":
    main() 


🚀 Training for 16kbps...

✅ Loaded 368 valid pairs for 16kbps.
Epoch 1/10, Training Loss: 0.02317986873712278
Validation Loss: 0.007058463358719434
Epoch 2/10, Training Loss: 0.0036171891913523505
Validation Loss: 0.0017262889221975847
Epoch 3/10, Training Loss: 0.001803880707346693
Validation Loss: 0.0012439316155255906
Epoch 4/10, Training Loss: 0.0013522060480406226
Validation Loss: 0.0009409218826996428
Epoch 5/10, Training Loss: 0.001134531936699976
Validation Loss: 0.0007985629906345691
Epoch 6/10, Training Loss: 0.0009791198416761208
Validation Loss: 0.0007207736738824419
Epoch 7/10, Training Loss: 0.0009535414419863655
Validation Loss: 0.0008770527866935092
Epoch 8/10, Training Loss: 0.0009528014303544875
Validation Loss: 0.0006854374618602119
Epoch 9/10, Training Loss: 0.0008681185806353549
Validation Loss: 0.0006543771305587143
Epoch 10/10, Training Loss: 0.0008492001440820538
Validation Loss: 0.0006430027590665434
✅ Model saved at /kaggle/working/16kbps/WaveUNet_16kbps.pth


# STEP 6:
# Testing the model on Unseen Data 

The Reconstructed Outputs are Truncated to 15 seconds 

In [1]:
import os
import torch
import soundfile as sf
import librosa
import numpy as np
import torch.nn as nn
from torch.utils.data import random_split

# ✅ Define Wave-U-Net Model
class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# ✅ Paths
compressed_paths = {
    "16kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_16kbps/fma",
        "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_16kbps/fma-western-art",
        "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_16kbps/hd-classical",
        "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_16kbps/jamendo",
        "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/16kbps"
    ],
    "32kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_32kbps/fma",
        "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_32kbps/fma-western-art",
        "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_32kbps/hd-classical",
        "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_32kbps/jamendo",
        "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/32kbps"
    ],
    "64kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/music/fma/compressed_64kbps/fma",
        "/kaggle/input/compressed-musan/compressed_musan/music/fma-western-art/compressed_64kbps/fma-western-art",
        "/kaggle/input/compressed-musan/compressed_musan/music/hd-classical/compressed_64kbps/hd-classical",
        "/kaggle/input/compressed-musan/compressed_musan/music/jamendo/compressed_64kbps/jamendo",
        "/kaggle/input/compressed-musan/compressed_musan/music/rfm/kaggle/working/compressed_audio/64kbps"
    ]
}
original_path = "/kaggle/input/music-folder-musan/music"
reconstructed_path = "/kaggle/working/reconstructed_audio_output"  # 🔹 Updated output folder name
model_paths = {
    "16kbps": "/kaggle/input/trained-16kbps/WaveUNet_16kbps.pth",
    "32kbps": "/kaggle/input/trained-32kbps/WaveUNet_32kbps.pth",
    "64kbps": "/kaggle/input/trained-64kbps/WaveUNet_64kbps.pth"
}

# ✅ Ensure output directories exist
for bitrate in model_paths.keys():
    os.makedirs(os.path.join(reconstructed_path, bitrate), exist_ok=True)

# ✅ Dataset Class (Loads 15% Test Split)
class MusicDataset(torch.utils.data.Dataset):
    def __init__(self, compressed_dirs, original_dir, bitrate="16kbps"):
        self.compressed_files = []
        self.original_files = []

        original_files_map = {
            file.rsplit('.', 1)[0]: os.path.join(root, file)
            for root, _, files in os.walk(original_dir) for file in files if file.endswith(".wav")
        }

        for folder in compressed_dirs[bitrate]:
            if os.path.exists(folder):
                for file in os.listdir(folder):
                    if file.endswith(".opus"):
                        base_name = file.rsplit('.', 1)[0]
                        if base_name in original_files_map:
                            self.compressed_files.append(os.path.join(folder, file))
                            self.original_files.append(original_files_map[base_name])

    def __len__(self):
        return len(self.original_files)

    def __getitem__(self, idx):
        return self.compressed_files[idx], self.original_files[idx]

# ✅ Load dataset and extract 15% test set
def get_test_split(bitrate):
    dataset = MusicDataset(compressed_paths, original_path, bitrate)
    total_size = len(dataset)
    train_size = int(0.7 * total_size)
    val_size = int(0.15 * total_size)
    test_size = total_size - train_size - val_size  # Ensure 100% total

    _, _, test_set = random_split(dataset, [train_size, val_size, test_size])
    return test_set

# ✅ Load trained model (⚠️ Fixed Warning by setting weights_only=True)
def load_model(model_path):
    model = WaveUNet()
    model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"), weights_only=True))
    model.eval()
    return model

# ✅ Function to reconstruct and truncate audio to **15 seconds**
def reconstruct_audio(file_path, model, max_duration=15):  # 🔹 Increased to 15 sec
    wav, sr = librosa.load(file_path, sr=16000, mono=True)
    
    # 🔹 Truncate to max_duration seconds (15 sec)
    max_samples = sr * max_duration
    wav = wav[:max_samples]

    wav_tensor = torch.tensor(wav, dtype=torch.float32).unsqueeze(0).unsqueeze(1)  

    with torch.no_grad():
        reconstructed = model(wav_tensor)  

    return reconstructed.squeeze().numpy()

# ✅ Perform Testing on 15% Split
for bitrate in model_paths.keys():
    print(f"\n🚀 Testing for {bitrate}...\n")
    
    test_set = get_test_split(bitrate)  # Get 15% test data
    model = load_model(model_paths[bitrate])  # Load correct model

    for i in range(len(test_set)):
        compressed_file, original_file = test_set[i]
        
        if not os.path.exists(compressed_file):
            print(f"❌ Missing compressed file: {compressed_file}")
            continue
        
        print(f"🔍 Processing: {os.path.basename(compressed_file)} ({bitrate})")
        
        reconstructed_audio = reconstruct_audio(compressed_file, model)
        
        # Save reconstructed audio
        output_path = os.path.join(reconstructed_path, bitrate, os.path.basename(original_file))
        sf.write(output_path, reconstructed_audio, 16000)
        print(f"✅ Reconstructed and saved: {output_path}")

print("\n🚀 Testing complete! Check '/kaggle/working/reconstructed_audio_output/' for results.")



🚀 Testing for 16kbps...

🔍 Processing: music-hd-0032.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-hd-0032.wav
🔍 Processing: music-fma-wa-0071.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-fma-wa-0071.wav
🔍 Processing: music-hd-0001.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-hd-0001.wav
🔍 Processing: music-fma-wa-0009.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-fma-wa-0009.wav
🔍 Processing: music-fma-wa-0070.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-fma-wa-0070.wav
🔍 Processing: music-fma-wa-0021.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/music-fma-wa-0021.wav
🔍 Processing: music-fma-0014.opus (16kbps)
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio_output/16kbps/

# STEP 7:
# Evaluating the Model on the following Metrics:
# 1. SDR
# 2. LSD
# 3. STOI

In [12]:
import os
import numpy as np
import librosa

# ✅ Function to calculate SDR (Signal-to-Distortion Ratio)
def compute_sdr(original, reconstructed):
    noise = original - reconstructed
    sdr = 10 * np.log10(np.sum(original ** 2) / (np.sum(noise ** 2) + 1e-10))  # Avoid division by zero
    return sdr

# ✅ Function to calculate LSD (Log-Spectral Distance)
def compute_lsd(original, reconstructed):
    eps = 1e-10  # Small value to avoid log(0)
    
    # Compute magnitude spectrogram
    orig_mag = np.abs(librosa.stft(original, n_fft=1024))
    recon_mag = np.abs(librosa.stft(reconstructed, n_fft=1024))

    # Convert to log scale
    orig_log = np.log10(orig_mag + eps)
    recon_log = np.log10(recon_mag + eps)

    # Compute LSD
    lsd = np.mean(np.sqrt(np.mean((orig_log - recon_log) ** 2, axis=0)))
    return lsd

# ✅ Function to calculate STOI
def compute_stoi(original, reconstructed):
    # Compute STFT
    orig_mag = np.abs(librosa.stft(original, n_fft=512, hop_length=256))
    recon_mag = np.abs(librosa.stft(reconstructed, n_fft=512, hop_length=256))

    # Compute Pearson Correlation Coefficient across time frames
    stoi_scores = []
    for i in range(orig_mag.shape[1]):  # Loop over time frames
        orig_frame = orig_mag[:, i]
        recon_frame = recon_mag[:, i]

        if np.std(orig_frame) > 0 and np.std(recon_frame) > 0:
            corr = np.corrcoef(orig_frame, recon_frame)[0, 1]  # Pearson Correlation
            stoi_scores.append(corr)

    # Compute average STOI score (Higher is better)
    return np.mean(stoi_scores) if stoi_scores else 0

# ✅ Paths to original and reconstructed music folders
original_base = "/kaggle/input/original-music-folder/Original_music"
reconstructed_base = "/kaggle/input/reconstructed-music-folder/Reconstructed_Music"

bitrates = ["16kbps", "32kbps", "64kbps"]

# ✅ Evaluate each bitrate folder
for bitrate in bitrates:
    original_folder = os.path.join(original_base, bitrate)
    reconstructed_folder = os.path.join(reconstructed_base, bitrate)

    sdr_values, lsd_values, stoi_values = [], [], []

    for file in os.listdir(original_folder):
        if file.endswith(".wav"):
            orig_path = os.path.join(original_folder, file)
            recon_path = os.path.join(reconstructed_folder, file)

            if not os.path.exists(recon_path):
                print(f"❌ Missing reconstructed file for {file}")
                continue

            # Load audio
            orig_audio, _ = librosa.load(orig_path, sr=16000, mono=True)
            recon_audio, _ = librosa.load(recon_path, sr=16000, mono=True)

            # Truncate to the shorter length
            min_len = min(len(orig_audio), len(recon_audio))
            orig_audio = orig_audio[:min_len]
            recon_audio = recon_audio[:min_len]

            # Compute metrics
            sdr_values.append(compute_sdr(orig_audio, recon_audio))
            lsd_values.append(compute_lsd(orig_audio, recon_audio))
            stoi_values.append(compute_stoi(orig_audio, recon_audio))

    # Print results
    print(f"\n📊 Evaluation Metrics for {bitrate}:")
    print(f"🔹 SDR: {np.mean(sdr_values):.2f} dB")
    print(f"🔹 LSD: {np.mean(lsd_values):.4f}")
    print(f"🔹 STOI: {np.mean(stoi_values):.4f}\n")

print("✅ Evaluation complete! 🚀")


📊 Evaluation Metrics for 16kbps:
🔹 SDR: 14.12 dB
🔹 LSD: 0.9216
🔹 STOI: 0.9096


📊 Evaluation Metrics for 32kbps:
🔹 SDR: 16.75 dB
🔹 LSD: 0.7956
🔹 STOI: 0.9293


📊 Evaluation Metrics for 64kbps:
🔹 SDR: 21.62 dB
🔹 LSD: 0.8771
🔹 STOI: 0.9478

✅ Evaluation complete! 🚀


# STEP 8:
# Compressing Non-Dataset Audio to 16kbps, 32kbps, and 64kbps

In [1]:
import os
import subprocess

# ✅ Input file (Change this to your actual file)
INPUT_FILE = "/kaggle/input/new-test-data/Your Lie in April OST - Again (Piano).wav"  # Replace with your actual file name
OUTPUT_FOLDER = "compressed_new_test_data"  # Folder to save compressed files

# ✅ Bitrates for compression
BITRATES = {
    "16kbps": "16k",
    "32kbps": "32k",
    "64kbps": "64k"
}

# ✅ Create output directory if it doesn’t exist
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# ✅ Function to compress a single file
def compress_audio(input_file, output_folder, bitrate_label, bitrate_value):
    output_file = os.path.join(output_folder, f"{bitrate_label}.opus")
    cmd = f"ffmpeg -i '{input_file}' -c:a libopus -b:a {bitrate_value} '{output_file}' -y"
    subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    print(f"✅ Compressed: {bitrate_label} → {output_file}")

# ✅ Compress the input file to 16kbps, 32kbps, and 64kbps
for bitrate_label, bitrate_value in BITRATES.items():
    compress_audio(INPUT_FILE, OUTPUT_FOLDER, bitrate_label, bitrate_value)

print("\n✅ Opus Compression Complete! 🚀")

✅ Compressed: 16kbps → compressed_new_test_data/16kbps.opus
✅ Compressed: 32kbps → compressed_new_test_data/32kbps.opus
✅ Compressed: 64kbps → compressed_new_test_data/64kbps.opus

✅ Opus Compression Complete! 🚀


# STEP 9:
# Reconstructing the Non-Dataset Audio 

In [8]:
import os
import torch
import soundfile as sf
import librosa
import numpy as np
import torch.nn as nn

# ✅ Define Wave-U-Net Model
class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# ✅ Paths
COMPRESSED_FILES = {
    "16kbps": "/kaggle/input/compressed-new-test-data/Compressed_OPUS/16kbps/16kbps.opus",
    "32kbps": "/kaggle/input/compressed-new-test-data/Compressed_OPUS/32kbps/32kbps.opus",
    "64kbps": "/kaggle/input/compressed-new-test-data/Compressed_OPUS/64kbps/64kbps.opus"
}

ORIGINAL_FILE = "/kaggle/input/new-test-data/Your Lie in April OST - Again (Piano).wav"  # 🔹 UPDATE this with the actual path
RECONSTRUCTED_FOLDER = "/kaggle/working/reconstructed_audio"  # 🔹 Output folder

MODEL_PATHS = {
    "16kbps": "/kaggle/input/trained-16kbps/WaveUNet_16kbps.pth",
    "32kbps": "/kaggle/input/trained-32kbps/WaveUNet_32kbps.pth",
    "64kbps": "/kaggle/input/trained-64kbps/WaveUNet_64kbps.pth"
}

# ✅ Create output directory
os.makedirs(RECONSTRUCTED_FOLDER, exist_ok=True)

# ✅ Load trained model
def load_model(model_path):
    model = WaveUNet()
    model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"), weights_only=True))
    model.eval()
    return model

# ✅ Function to reconstruct audio
def reconstruct_audio(file_path, model):
    wav, sr = librosa.load(file_path, sr=16000, mono=True)  # Load full audio
    wav_tensor = torch.tensor(wav, dtype=torch.float32).unsqueeze(0).unsqueeze(1)  

    with torch.no_grad():
        reconstructed = model(wav_tensor)  

    return reconstructed.squeeze().numpy()

# ✅ Process each bitrate
for bitrate, input_file in COMPRESSED_FILES.items():
    print(f"\n🚀 Processing {bitrate}...")

    if not os.path.exists(input_file):
        print(f"❌ Missing file: {input_file}")
        continue

    model = load_model(MODEL_PATHS[bitrate])  # Load correct model
    print(f"🔍 Processing: {input_file}")

    # Reconstruct full-length audio
    reconstructed_audio = reconstruct_audio(input_file, model)

    # Save reconstructed audio
    output_file = os.path.join(RECONSTRUCTED_FOLDER, f"{bitrate}.wav")
    sf.write(output_file, reconstructed_audio, 16000)
    print(f"✅ Reconstructed and saved: {output_file}")

print("\n✅ Testing complete! Check '/kaggle/working/reconstructed_audio/' for results.")


🚀 Processing 16kbps...
🔍 Processing: /kaggle/input/compressed-new-test-data/Compressed_OPUS/16kbps/16kbps.opus
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio/16kbps.wav

🚀 Processing 32kbps...
🔍 Processing: /kaggle/input/compressed-new-test-data/Compressed_OPUS/32kbps/32kbps.opus
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio/32kbps.wav

🚀 Processing 64kbps...
🔍 Processing: /kaggle/input/compressed-new-test-data/Compressed_OPUS/64kbps/64kbps.opus
✅ Reconstructed and saved: /kaggle/working/reconstructed_audio/64kbps.wav

✅ Testing complete! Check '/kaggle/working/reconstructed_audio/' for results.


# STEP 10:
# Evaluating the Non-dataset Audio 

In [7]:
import os
import numpy as np
import librosa

# ✅ Function to compute SDR (Signal-to-Distortion Ratio)
def compute_sdr(original, reconstructed):
    noise = original - reconstructed
    sdr = 10 * np.log10(np.sum(original ** 2) / (np.sum(noise ** 2) + 1e-10))
    return sdr

# ✅ Function to compute STOI (Short-Time Objective Intelligibility)
def compute_stoi(original, reconstructed, sr=16000):
    frame_len = int(0.025 * sr)  # 25ms frame
    frame_shift = int(0.01 * sr)  # 10ms shift
    original_frames = librosa.util.frame(original, frame_length=frame_len, hop_length=frame_shift)
    reconstructed_frames = librosa.util.frame(reconstructed, frame_length=frame_len, hop_length=frame_shift)

    correlation = np.sum(original_frames * reconstructed_frames, axis=0)
    original_energy = np.sqrt(np.sum(original_frames ** 2, axis=0))
    reconstructed_energy = np.sqrt(np.sum(reconstructed_frames ** 2, axis=0))

    stoi_score = np.mean(correlation / (original_energy * reconstructed_energy + 1e-10))
    return stoi_score

# ✅ Function to compute LSD (Log-Spectral Distance)
def compute_lsd(original, reconstructed, sr=16000):
    spec_orig = np.log1p(np.abs(librosa.stft(original, n_fft=512)))
    spec_recon = np.log1p(np.abs(librosa.stft(reconstructed, n_fft=512)))

    lsd = np.mean(np.sqrt(np.mean((spec_orig - spec_recon) ** 2, axis=0)))
    return lsd

# ✅ Function to evaluate all metrics
def evaluate_audio(original_path, reconstructed_path, sr=16000):
    original, _ = librosa.load(original_path, sr=sr, mono=True)
    reconstructed, _ = librosa.load(reconstructed_path, sr=sr, mono=True)

    # Ensure same length
    min_length = min(len(original), len(reconstructed))
    original = original[:min_length]
    reconstructed = reconstructed[:min_length]

    return {
        "SDR": compute_sdr(original, reconstructed),
        "STOI": compute_stoi(original, reconstructed, sr),
        "LSD": compute_lsd(original, reconstructed, sr)
    }

# ✅ Paths
bitrates = ["16kbps", "32kbps", "64kbps"]
original_path = "/kaggle/input/new-test-data/Your Lie in April OST - Again (Piano).wav"  # Replace with actual path
reconstructed_paths = {
    "16kbps": "/kaggle/input/reconstructed-new-test-data/Reconstructed_WAV/16kbps/16kbps.wav",
    "32kbps": "/kaggle/input/reconstructed-new-test-data/Reconstructed_WAV/32kbps/32kbps.wav",
    "64kbps": "/kaggle/input/reconstructed-new-test-data/Reconstructed_WAV/64kbps/64kbps.wav",
}

# ✅ Evaluate
results = {bitrate: evaluate_audio(original_path, reconstructed_paths[bitrate]) for bitrate in bitrates}

# ✅ Print Results
for bitrate, metrics in results.items():
    print(f"\n🎵 {bitrate} Evaluation:")
    print(f"🔹 SDR  = {metrics['SDR']:.2f} dB")
    print(f"🔹 STOI = {metrics['STOI']:.2f}")
    print(f"🔹 LSD  = {metrics['LSD']:.2f}")


🎵 16kbps Evaluation:
🔹 SDR  = 13.62 dB
🔹 STOI = 0.90
🔹 LSD  = 0.06

🎵 32kbps Evaluation:
🔹 SDR  = 15.37 dB
🔹 STOI = 0.90
🔹 LSD  = 0.06

🎵 64kbps Evaluation:
🔹 SDR  = 19.04 dB
🔹 STOI = 0.92
🔹 LSD  = 0.04


# SPEECH FOLDER (FROM MUSAN DATASET): 

# STEP 1:
# Installing Pytorch

In [1]:
!pip install torch torchvision torchaudio --quiet

# STEP 2:
# Defining the custom Wave-U-Net Model

In [4]:
import torch
import torch.nn as nn

class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        return self.decoder(self.encoder(x))

# STEP 3: 
# Defining the Dataset Loader 

In [5]:
import os
import torch
import torchaudio
from torch.utils.data import Dataset
import subprocess

class SpeechDataset(Dataset):
    def __init__(self, compressed_dirs, original_dirs, sample_rate=16000, duration=5):
        self.sample_rate = sample_rate
        self.fixed_length = sample_rate * duration
        self.compressed_files = []
        self.original_files = []

        original_map = {}
        for root, _, files in os.walk(original_dirs[0]):
            for f in files:
                if f.endswith('.wav'):
                    key = f.rsplit('.', 1)[0]
                    original_map[key] = os.path.join(root, f)
        for root, _, files in os.walk(original_dirs[1]):
            for f in files:
                if f.endswith('.wav'):
                    key = f.rsplit('.', 1)[0]
                    original_map[key] = os.path.join(root, f)

        for folder in compressed_dirs:
            for root, _, files in os.walk(folder):
                for file in files:
                    if file.endswith(".opus"):
                        key = file.rsplit('.', 1)[0]
                        if key in original_map:
                            self.compressed_files.append(os.path.join(root, file))
                            self.original_files.append(original_map[key])

        print(f"✅ Loaded {len(self.original_files)} valid pairs.")

    def __len__(self):
        return len(self.original_files)

    def __getitem__(self, idx):
        compressed_path = self.compressed_files[idx]
        original_path = self.original_files[idx]

        # Convert .opus to .wav
        wav_filename = os.path.basename(compressed_path).replace(".opus", ".wav")
        wav_path = os.path.join("/kaggle/working/", wav_filename)

        if not os.path.exists(wav_path):
            subprocess.run(["ffmpeg", "-i", compressed_path, "-ar", str(self.sample_rate), "-ac", "1", wav_path], capture_output=True)

        compressed_waveform, _ = torchaudio.load(wav_path)
        original_waveform, _ = torchaudio.load(original_path)

        return self._fix_length(compressed_waveform), self._fix_length(original_waveform)

    def _fix_length(self, waveform):
        num_samples = waveform.shape[1]
        if num_samples > self.fixed_length:
            return waveform[:, :self.fixed_length]
        else:
            padding = torch.zeros((1, self.fixed_length - num_samples))
            return torch.cat((waveform, padding), dim=1)

# STEP 4:
# Training and Validating the Wave-U-Net Model on original and compressed Data Pairs

In [7]:
import torch.optim as optim
from torch.utils.data import DataLoader, random_split

def train_model(train_loader, val_loader, model, criterion, optimizer, device, num_epochs, bitrate):
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(x)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss / len(train_loader):.4f}")

        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                output = model(x)
                val_loss += criterion(output, y).item()
        print(f"Validation Loss: {val_loss / len(val_loader):.4f}")

    save_path = f"/kaggle/working/WaveUNet_speech_{bitrate}.pth"
    torch.save(model.state_dict(), save_path)
    print(f"✅ Model saved to {save_path}")

def main():
    bitrates = {
        "3kbps": [
            "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_3kbps/librivox",
            "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_3kbps"
        ],
        "6kbps": [
            "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_6kbps/librivox",
            "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_6kbps"
        ],
        "12kbps": [
            "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_12kbps/librivox",
            "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_12kbps"
        ]
    }

    original_dirs = [
        "/kaggle/input/speech-folder-musan/speech/librivox",
        "/kaggle/input/speech-folder-musan/speech/us-gov"
    ]

    batch_size = 8
    num_epochs = 10
    lr = 0.001
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for bitrate, compressed_dirs in bitrates.items():
        print(f"\n🚀 Training for {bitrate}...\n")

        dataset = SpeechDataset(compressed_dirs, original_dirs)
        if len(dataset) == 0:
            print(f"❌ No valid audio pairs for {bitrate}. Skipping.")
            continue

        train_size = int(0.7 * len(dataset))
        val_size = int(0.15 * len(dataset))
        test_size = len(dataset) - train_size - val_size
        train_set, val_set, _ = random_split(dataset, [train_size, val_size, test_size])

        train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_set, batch_size=batch_size)

        model = WaveUNet()
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=lr)

        train_model(train_loader, val_loader, model, criterion, optimizer, device, num_epochs, bitrate)

if __name__ == "__main__":
    main()


🚀 Training for 3kbps...

✅ Loaded 323 valid pairs.
Epoch 1/10 | Train Loss: 0.0204
Validation Loss: 0.0067
Epoch 2/10 | Train Loss: 0.0091
Validation Loss: 0.0058
Epoch 3/10 | Train Loss: 0.0081
Validation Loss: 0.0052
Epoch 4/10 | Train Loss: 0.0078
Validation Loss: 0.0051
Epoch 5/10 | Train Loss: 0.0075
Validation Loss: 0.0051
Epoch 6/10 | Train Loss: 0.0076
Validation Loss: 0.0051
Epoch 7/10 | Train Loss: 0.0074
Validation Loss: 0.0051
Epoch 8/10 | Train Loss: 0.0073
Validation Loss: 0.0050
Epoch 9/10 | Train Loss: 0.0074
Validation Loss: 0.0050
Epoch 10/10 | Train Loss: 0.0074
Validation Loss: 0.0050
✅ Model saved to /kaggle/working/WaveUNet_speech_3kbps.pth

🚀 Training for 6kbps...

✅ Loaded 323 valid pairs.
Epoch 1/10 | Train Loss: 0.0167
Validation Loss: 0.0085
Epoch 2/10 | Train Loss: 0.0078
Validation Loss: 0.0064
Epoch 3/10 | Train Loss: 0.0068
Validation Loss: 0.0062
Epoch 4/10 | Train Loss: 0.0067
Validation Loss: 0.0062
Epoch 5/10 | Train Loss: 0.0067
Validation Loss: 0.0

# STEP 5:
# Testing the model on Unseen Data

In [8]:
import os
import torch
import torch.nn as nn
import soundfile as sf
import librosa
from torch.utils.data import random_split

# ✅ WaveUNet definition (same as training)
class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# ✅ Speech dataset loader
class SpeechDataset(torch.utils.data.Dataset):
    def __init__(self, compressed_dirs, original_dirs):
        self.compressed_files = []
        self.original_files = []

        original_map = {
            file.rsplit(".", 1)[0]: os.path.join(root, file)
            for folder in original_dirs
            for root, _, files in os.walk(folder)
            for file in files if file.endswith(".wav")
        }

        for folder in compressed_dirs:
            if os.path.exists(folder):
                for file in os.listdir(folder):
                    if file.endswith(".opus"):
                        base = file.rsplit(".", 1)[0]
                        if base in original_map:
                            self.compressed_files.append(os.path.join(folder, file))
                            self.original_files.append(original_map[base])

    def __len__(self):
        return len(self.original_files)

    def __getitem__(self, idx):
        return self.compressed_files[idx], self.original_files[idx]

# ✅ Audio Reconstruction
def reconstruct_audio(file_path, model, max_duration=15):
    wav, sr = librosa.load(file_path, sr=16000, mono=True)
    wav = wav[:sr * max_duration]
    wav_tensor = torch.tensor(wav, dtype=torch.float32).unsqueeze(0).unsqueeze(1)
    with torch.no_grad():
        output = model(wav_tensor)
    return output.squeeze().numpy()

# ✅ Model loader
def load_model(model_path):
    model = WaveUNet()
    model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))
    model.eval()
    return model

# ✅ Paths
original_dirs = [
    "/kaggle/input/speech-folder-musan/speech/librivox",
    "/kaggle/input/speech-folder-musan/speech/us-gov"
]

compressed_paths = {
    "3kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_3kbps/librivox",
        "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_3kbps"
    ],
    "6kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_6kbps/librivox",
        "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_6kbps"
    ],
    "12kbps": [
        "/kaggle/input/compressed-musan/compressed_musan/speech/librivox/compressed_12kbps/librivox",
        "/kaggle/input/compressed-musan/compressed_musan/speech/us-gov/usgov_compressed_12kbps"
    ]
}

model_paths = {
    "3kbps": "/kaggle/input/trained-speech-3kbps/WaveUNet_speech_3kbps.pth",
    "6kbps": "/kaggle/input/trained-speech-6kbps/WaveUNet_speech_6kbps.pth",
    "12kbps": "/kaggle/input/trained-speech-12kbps/WaveUNet_speech_12kbps.pth"
}

output_path = "/kaggle/working/reconstructed_speech_output"
os.makedirs(output_path, exist_ok=True)

# ✅ Testing loop
for bitrate, comp_dirs in compressed_paths.items():
    print(f"\n🚀 Testing {bitrate}...\n")
    dataset = SpeechDataset(comp_dirs, original_dirs)

    if len(dataset) == 0:
        print(f"❌ No data found for {bitrate}. Skipping.")
        continue

    total_len = len(dataset)
    train_len = int(0.7 * total_len)
    val_len = int(0.15 * total_len)
    test_len = total_len - train_len - val_len
    _, _, test_set = random_split(dataset, [train_len, val_len, test_len])

    model = load_model(model_paths[bitrate])

    bitrate_output_dir = os.path.join(output_path, bitrate)
    os.makedirs(bitrate_output_dir, exist_ok=True)

    for i in range(len(test_set)):
        compressed_file, original_file = test_set[i]

        print(f"🔍 Processing: {os.path.basename(compressed_file)}")
        reconstructed = reconstruct_audio(compressed_file, model)

        save_path = os.path.join(bitrate_output_dir, os.path.basename(original_file))
        sf.write(save_path, reconstructed, 16000)
        print(f"✅ Saved: {save_path}")

print("\n🚀 Speech testing complete! Check /kaggle/working/reconstructed_speech_output/")


🚀 Testing 3kbps...



  model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))


🔍 Processing: speech-us-gov-0119.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-us-gov-0119.wav
🔍 Processing: speech-librivox-0032.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-librivox-0032.wav
🔍 Processing: speech-librivox-0057.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-librivox-0057.wav
🔍 Processing: speech-us-gov-0122.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-us-gov-0122.wav
🔍 Processing: speech-librivox-0117.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-librivox-0117.wav
🔍 Processing: speech-us-gov-0042.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-us-gov-0042.wav
🔍 Processing: speech-librivox-0139.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-librivox-0139.wav
🔍 Processing: speech-us-gov-0076.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/3kbps/speech-us-gov-0076.wav
🔍 Processing: speech-lib

  model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))


🔍 Processing: speech-librivox-0091.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-librivox-0091.wav
🔍 Processing: speech-librivox-0035.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-librivox-0035.wav
🔍 Processing: speech-us-gov-0026.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-us-gov-0026.wav
🔍 Processing: speech-us-gov-0074.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-us-gov-0074.wav
🔍 Processing: speech-us-gov-0064.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-us-gov-0064.wav
🔍 Processing: speech-librivox-0145.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-librivox-0145.wav
🔍 Processing: speech-librivox-0092.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-librivox-0092.wav
🔍 Processing: speech-us-gov-0085.opus
✅ Saved: /kaggle/working/reconstructed_speech_output/6kbps/speech-us-gov-0085.wav
🔍 Processing: speech-lib

# STEP 6:
# Evaluating the Model on the following Metrics:
# 1. SI-SDR
# 2. STOI
# 3. WER

In [13]:
import os
import numpy as np
import librosa

# ✅ SI-SDR Calculation
def compute_si_sdr(original, reconstructed):
    original -= np.mean(original)
    reconstructed -= np.mean(reconstructed)

    alpha = np.dot(reconstructed, original) / (np.dot(original, original) + 1e-10)
    projection = alpha * original
    noise = reconstructed - projection

    si_sdr = 10 * np.log10(np.sum(projection ** 2) / (np.sum(noise ** 2) + 1e-10))
    return si_sdr

# ✅ Approximate STOI using spectral correlation
def compute_stoi_like(original, reconstructed):
    orig_mag = np.abs(librosa.stft(original, n_fft=512, hop_length=256))
    recon_mag = np.abs(librosa.stft(reconstructed, n_fft=512, hop_length=256))

    stoi_scores = []
    for i in range(min(orig_mag.shape[1], recon_mag.shape[1])):
        orig_frame = orig_mag[:, i]
        recon_frame = recon_mag[:, i]

        if np.std(orig_frame) > 0 and np.std(recon_frame) > 0:
            corr = np.corrcoef(orig_frame, recon_frame)[0, 1]
            stoi_scores.append(corr)

    return np.mean(stoi_scores) if stoi_scores else 0

# ✅ Dummy WER using DTW on MFCCs
def compute_wer_like(original, reconstructed, sr=16000):
    orig_mfcc = librosa.feature.mfcc(y=original, sr=sr, n_mfcc=13)
    recon_mfcc = librosa.feature.mfcc(y=reconstructed, sr=sr, n_mfcc=13)

    D, _ = librosa.sequence.dtw(orig_mfcc, recon_mfcc, metric='euclidean')
    dtw_distance = D[-1, -1]
    return dtw_distance / (orig_mfcc.shape[1] + 1e-10)

# ✅ Folder paths
original_base = "/kaggle/input/original-speech-folder/Original_Speech"
reconstructed_base = "/kaggle/input/reconstructed-speech-folder/Reconstructed_Speech"

# ✅ Detect common bitrate folders
bitrates = sorted(list(set(os.listdir(original_base)) & set(os.listdir(reconstructed_base))))
print(f"📁 Bitrates found: {bitrates}")

# ✅ Run evaluation for each bitrate
for bitrate in bitrates:
    original_folder = os.path.join(original_base, bitrate)
    reconstructed_folder = os.path.join(reconstructed_base, bitrate)

    si_sdr_values, stoi_values, wer_values = [], [], []

    for file in os.listdir(original_folder):
        if file.endswith(".wav"):
            orig_path = os.path.join(original_folder, file)
            recon_path = os.path.join(reconstructed_folder, file)

            if not os.path.exists(recon_path):
                print(f"❌ Missing reconstructed file for {file}")
                continue

            # Load audio
            orig_audio, _ = librosa.load(orig_path, sr=16000, mono=True)
            recon_audio, _ = librosa.load(recon_path, sr=16000, mono=True)

            # Truncate to same length
            min_len = min(len(orig_audio), len(recon_audio))
            orig_audio = orig_audio[:min_len]
            recon_audio = recon_audio[:min_len]

            # Compute metrics
            si_sdr_values.append(compute_si_sdr(orig_audio, recon_audio))
            stoi_values.append(compute_stoi_like(orig_audio, recon_audio))
            wer_values.append(compute_wer_like(orig_audio, recon_audio))

    # ✅ Print results
    print(f"\n📊 Evaluation Metrics for {bitrate}:")
    print(f"🔹 SI-SDR: {np.mean(si_sdr_values):.2f} dB")
    print(f"🔹 STOI-like: {np.mean(stoi_values):.4f}")
    print(f"🔹 WER-like (DTW distance): {np.mean(wer_values):.2f}\n")

print("✅ Evaluation complete! 🚀")

📁 Bitrates found: ['12kbps', '3kbps', '6kbps']

📊 Evaluation Metrics for 12kbps:
🔹 SI-SDR: 3.68 dB
🔹 STOI-like: 0.7848
🔹 WER-like (DTW distance): 125.83


📊 Evaluation Metrics for 3kbps:
🔹 SI-SDR: -4.19 dB
🔹 STOI-like: 0.7036
🔹 WER-like (DTW distance): 113.79


📊 Evaluation Metrics for 6kbps:
🔹 SI-SDR: 1.75 dB
🔹 STOI-like: 0.7017
🔹 WER-like (DTW distance): 142.34

✅ Evaluation complete! 🚀


# STEP 7:
# Compressing Non-Dataset Speech to 3kbps, 6kbps, and 12kbps

In [14]:
import os
import subprocess

# ✅ Input file (Change this to your actual file)
INPUT_FILE = "/kaggle/input/new-test-data-speech/Sample Student Speech.wav"  # Replace with your actual file name
OUTPUT_FOLDER = "compressed_new_test_data_speech"  # Folder to save compressed files

# ✅ Bitrates for compression
BITRATES = {
    "3kbps": "3k",
    "6kbps": "6k",
    "12kbps": "12k"
}

# ✅ Create output directory if it doesn’t exist
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

# ✅ Function to compress a single file
def compress_audio(input_file, output_folder, bitrate_label, bitrate_value):
    output_file = os.path.join(output_folder, f"{bitrate_label}.opus")
    cmd = f"ffmpeg -i '{input_file}' -c:a libopus -b:a {bitrate_value} '{output_file}' -y"
    subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    print(f"✅ Compressed: {bitrate_label} → {output_file}")

# ✅ Compress the input file to 16kbps, 32kbps, and 64kbps
for bitrate_label, bitrate_value in BITRATES.items():
    compress_audio(INPUT_FILE, OUTPUT_FOLDER, bitrate_label, bitrate_value)

print("\n✅ Opus Compression Complete! 🚀")

✅ Compressed: 3kbps → compressed_new_test_data_speech/3kbps.opus
✅ Compressed: 6kbps → compressed_new_test_data_speech/6kbps.opus
✅ Compressed: 12kbps → compressed_new_test_data_speech/12kbps.opus

✅ Opus Compression Complete! 🚀


# STEP 8:
# Reconstructing the Non-Dataset Speech 

In [17]:
import os
import torch
import soundfile as sf
import librosa
import numpy as np
import torch.nn as nn

# ✅ Define Wave-U-Net Model
class WaveUNet(nn.Module):
    def __init__(self):
        super(WaveUNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1)
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# ✅ Paths
# ✅ Paths
COMPRESSED_FILES = {
    "3kbps": "/kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/3kbps/3kbps.opus",  # ✅ FIXED
    "6kbps": "/kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/6kbps/6kbps.opus",  # ✅ FIXED
    "12kbps": "/kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/12kbps/12kbps.opus"
}

ORIGINAL_FILE = "/kaggle/input/new-test-data-speech/Sample Student Speech.wav"  # (if needed for evaluation)
RECONSTRUCTED_FOLDER = "/kaggle/working/reconstructed_speech"

MODEL_PATHS = {
    "3kbps": "/kaggle/input/trained-speech-3kbps/WaveUNet_speech_3kbps.pth",
    "6kbps": "/kaggle/input/trained-speech-6kbps/WaveUNet_speech_6kbps.pth",
    "12kbps": "/kaggle/input/trained-speech-12kbps/WaveUNet_speech_12kbps.pth"
}

# ✅ Create output directory
os.makedirs(RECONSTRUCTED_FOLDER, exist_ok=True)

# ✅ Load trained model
def load_model(model_path):
    model = WaveUNet()
    model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu"), weights_only=True))
    model.eval()
    return model

# ✅ Reconstruct a full audio file
def reconstruct_audio(file_path, model):
    wav, sr = librosa.load(file_path, sr=16000, mono=True)
    wav_tensor = torch.tensor(wav, dtype=torch.float32).unsqueeze(0).unsqueeze(1)  # Shape: [1, 1, T]

    with torch.no_grad():
        output = model(wav_tensor)

    return output.squeeze().numpy()

# ✅ Process each bitrate
for bitrate, input_file in COMPRESSED_FILES.items():
    print(f"\n🚀 Processing {bitrate}...")

    if not os.path.exists(input_file):
        print(f"❌ Missing file: {input_file}")
        continue

    model = load_model(MODEL_PATHS[bitrate])
    print(f"🔍 Reconstructing: {input_file}")

    reconstructed_audio = reconstruct_audio(input_file, model)

    output_path = os.path.join(RECONSTRUCTED_FOLDER, f"{bitrate}.wav")
    sf.write(output_path, reconstructed_audio, 16000)
    print(f"✅ Saved: {output_path}")

print("\n✅ Speech reconstruction complete! Check the '/kaggle/working/reconstructed_speech/' folder.")


🚀 Processing 3kbps...
🔍 Reconstructing: /kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/3kbps/3kbps.opus
✅ Saved: /kaggle/working/reconstructed_speech/3kbps.wav

🚀 Processing 6kbps...
🔍 Reconstructing: /kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/6kbps/6kbps.opus
✅ Saved: /kaggle/working/reconstructed_speech/6kbps.wav

🚀 Processing 12kbps...
🔍 Reconstructing: /kaggle/input/compressed-new-test-data-speech/Compressed_OPUS/12kbps/12kbps.opus
✅ Saved: /kaggle/working/reconstructed_speech/12kbps.wav

✅ Speech reconstruction complete! Check the '/kaggle/working/reconstructed_speech/' folder.


# STEP 9:
# Evaluating the Non-dataset Speech 

In [21]:
import os
import numpy as np
import librosa

# ✅ SI-SDR Calculation
def compute_sisdr(original, reconstructed):
    eps = 1e-9
    original = original - np.mean(original)
    reconstructed = reconstructed - np.mean(reconstructed)

    scale = np.sum(original * reconstructed) / (np.sum(original ** 2) + eps)
    projection = scale * original
    noise = reconstructed - projection

    sisdr = 10 * np.log10(np.sum(projection ** 2) / (np.sum(noise ** 2) + eps))
    return sisdr

# ✅ STOI-like Calculation
def compute_stoi_like(original, reconstructed, sr=16000):
    frame_len = int(0.025 * sr)
    frame_shift = int(0.01 * sr)

    original_frames = librosa.util.frame(original, frame_length=frame_len, hop_length=frame_shift)
    reconstructed_frames = librosa.util.frame(reconstructed, frame_length=frame_len, hop_length=frame_shift)

    correlation = np.sum(original_frames * reconstructed_frames, axis=0)
    original_energy = np.sqrt(np.sum(original_frames ** 2, axis=0))
    reconstructed_energy = np.sqrt(np.sum(reconstructed_frames ** 2, axis=0))

    stoi_like = np.mean(correlation / (original_energy * reconstructed_energy + 1e-9))
    return stoi_like

# ✅ WER-like (Normalized DTW Distance over MFCCs)
def compute_wer_like(original, reconstructed, sr=16000):
    mfcc_orig = librosa.feature.mfcc(y=original, sr=sr, n_mfcc=13)
    mfcc_recon = librosa.feature.mfcc(y=reconstructed, sr=sr, n_mfcc=13)

    # Compute DTW distance using librosa
    _, cost = librosa.sequence.dtw(X=mfcc_orig, Y=mfcc_recon, metric='euclidean')

    total_distance = cost[-1, -1]
    path_len = cost.shape[0] + cost.shape[1]
    normalized_dtw = total_distance / path_len  # 🔥 Normalized DTW score

    return normalized_dtw

# ✅ Evaluation Wrapper
def evaluate_audio(original_path, reconstructed_path, sr=16000):
    original, _ = librosa.load(original_path, sr=sr, mono=True)
    reconstructed, _ = librosa.load(reconstructed_path, sr=sr, mono=True)

    min_len = min(len(original), len(reconstructed))
    original = original[:min_len]
    reconstructed = reconstructed[:min_len]

    return {
        "SI-SDR": compute_sisdr(original, reconstructed),
        "STOI_like": compute_stoi_like(original, reconstructed, sr),
        "WER_like": compute_wer_like(original, reconstructed, sr)
    }

# ✅ Paths
original_path = "/kaggle/input/new-test-data-speech/Sample Student Speech.wav"
reconstructed_paths = {
    "3kbps": "/kaggle/input/reconstructed-new-test-data-speech/Reconstructed_WAV/3kbps/3kbps.wav",
    "6kbps": "/kaggle/input/reconstructed-new-test-data-speech/Reconstructed_WAV/6kbps/6kbps.wav",
    "12kbps": "/kaggle/input/reconstructed-new-test-data-speech/Reconstructed_WAV/12kbps/12kbps.wav",
}

# ✅ Run Evaluation
results = {bitrate: evaluate_audio(original_path, path) for bitrate, path in reconstructed_paths.items()}

# ✅ Display Results
for bitrate, metrics in results.items():
    print(f"\n📊 Evaluation Metrics for {bitrate}:")
    print(f"🔹 SI-SDR: {metrics['SI-SDR']:.2f} dB")
    print(f"🔹 STOI-like: {metrics['STOI_like']:.4f}")
    print(f"🔹 WER-like (Normalized DTW): {metrics['WER_like']:.4f}")

print("\n✅ Evaluation complete! 🚀")


📊 Evaluation Metrics for 3kbps:
🔹 SI-SDR: -5.09 dB
🔹 STOI-like: 0.3080
🔹 WER-like (Normalized DTW): 0.0000

📊 Evaluation Metrics for 6kbps:
🔹 SI-SDR: 1.93 dB
🔹 STOI-like: 0.5768
🔹 WER-like (Normalized DTW): 0.0000

📊 Evaluation Metrics for 12kbps:
🔹 SI-SDR: 0.86 dB
🔹 STOI-like: 0.5476
🔹 WER-like (Normalized DTW): 0.0000

✅ Evaluation complete! 🚀
