<a href="https://colab.research.google.com/github/anomara1/DrAhmedOmara/blob/main/audio_1002.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install pesq pystoi

Collecting pesq
  Downloading pesq-0.0.4.tar.gz (38 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pystoi
  Downloading pystoi-0.4.1-py2.py3-none-any.whl.metadata (4.0 kB)
Downloading pystoi-0.4.1-py2.py3-none-any.whl (8.2 kB)
Building wheels for collected packages: pesq
  Building wheel for pesq (setup.py) ... [?25l[?25hdone
  Created wheel for pesq: filename=pesq-0.0.4-cp311-cp311-linux_x86_64.whl size=275943 sha256=1d68a6792be57d5e558dd126d8e9233a24d37b55de48418193091cbabcd17405
  Stored in directory: /root/.cache/pip/wheels/ae/f1/23/2698d0bf31eec2b2aa50623b5d93b6206c49c7155d0e31345d
Successfully built pesq
Installing collected packages: pesq, pystoi
Successfully installed pesq-0.0.4 pystoi-0.4.1


In [3]:
import numpy as np
import librosa
import scipy.signal
import scipy.spatial
import scipy.stats
from scipy.linalg import norm
import pesq  # For PESQ metric
from pystoi import stoi  # For STOI metric

class SignalMetrics:
    def __init__(self, original, processed, sr=16000):
        """
        Initialize with original and processed signals.
        :param original: 1D numpy array of the original signal
        :param processed: 1D numpy array of the processed signal
        :param sr: Sampling rate of the signals (default 16 kHz)
        """
        self.original = original
        self.processed = processed
        self.sr = sr

    def log_spectral_distance(self):
        """Log Spectral Distance (LSD)"""
        orig_spec = np.abs(librosa.stft(self.original)) + 1e-10
        proc_spec = np.abs(librosa.stft(self.processed)) + 1e-10
        lsd = np.mean((10 * np.log10(orig_spec / proc_spec)) ** 2)
        return np.sqrt(lsd)

    def mel_cepstral_distance(self):
        """Mel-Cepstral Distance (MCD)"""
        orig_mfcc = librosa.feature.mfcc(y=self.original, sr=self.sr)
        proc_mfcc = librosa.feature.mfcc(y=self.processed, sr=self.sr)
        return np.mean(np.linalg.norm(orig_mfcc - proc_mfcc, axis=0))

    def segmental_snr(self, frame_length=2048, overlap=1024):
        """Segmental Signal-to-Noise Ratio (SegSNR)"""
        orig_frames = librosa.util.frame(self.original, frame_length=frame_length, hop_length=overlap)
        proc_frames = librosa.util.frame(self.processed, frame_length=frame_length, hop_length=overlap)

        snr_list = []
        for o, p in zip(orig_frames.T, proc_frames.T):
            noise = o - p
            if np.any(o ** 2):
                snr_list.append(10 * np.log10(np.mean(o ** 2) / (np.mean(noise ** 2) + 1e-10)))

        return np.mean(snr_list) if snr_list else 0  # Avoid empty list issue

    def spectral_flatness_measure(self):
        """Spectral Flatness Measure (SFM)"""
        orig_spec = np.abs(librosa.stft(self.original)) + 1e-10
        proc_spec = np.abs(librosa.stft(self.processed)) + 1e-10
        orig_sfm = scipy.stats.gmean(orig_spec, axis=0) / np.mean(orig_spec, axis=0)
        proc_sfm = scipy.stats.gmean(proc_spec, axis=0) / np.mean(proc_spec, axis=0)
        return np.mean(np.abs(orig_sfm - proc_sfm))

    def spectral_centroid_distance(self):
        """Spectral Centroid Distance"""
        orig_centroid = librosa.feature.spectral_centroid(y=self.original, sr=self.sr)
        proc_centroid = librosa.feature.spectral_centroid(y=self.processed, sr=self.sr)
        return np.mean(np.abs(orig_centroid - proc_centroid))

    def spectral_coherence(self):
        """Spectral Coherence"""
        f, Cxy = scipy.signal.coherence(self.original, self.processed, fs=self.sr)
        return np.mean(Cxy)

    def harmonic_to_noise_ratio(self):
        """Harmonic-to-Noise Ratio (HNR)"""
        return librosa.effects.harmonic(self.processed).mean() / librosa.effects.percussive(self.processed).mean()

    def itakura_saito_distance(self):
        """Itakura-Saito Distance"""
        orig_psd = np.abs(librosa.stft(self.original)) ** 2
        proc_psd = np.abs(librosa.stft(self.processed)) ** 2
        return np.mean(orig_psd / proc_psd - np.log(orig_psd / proc_psd) - 1)

    def bark_spectral_distortion(self):
        """Bark Spectral Distortion (Approximated using Mel-frequency)"""
        orig_mel = librosa.feature.melspectrogram(y=self.original, sr=self.sr)
        proc_mel = librosa.feature.melspectrogram(y=self.processed, sr=self.sr)
        return np.mean(np.abs(orig_mel - proc_mel))

    def tonality_index(self):
        """Tonality Index (Ratio of Harmonic Energy to Total Energy)"""
        orig_harmonic = librosa.effects.harmonic(self.original)
        proc_harmonic = librosa.effects.harmonic(self.processed)
        return np.abs(np.mean(orig_harmonic) - np.mean(proc_harmonic))

    def psnr(self):
        """Peak Signal-to-Noise Ratio (PSNR)"""
        mse_val = self.mse()
        max_val = np.max(self.original) ** 2
        return 10 * np.log10(max_val / (mse_val + 1e-10))

    def mse(self):
        """Mean Squared Error (MSE)"""
        return np.mean((self.original - self.processed) ** 2)

    def euclidean_distance(self):
        """Euclidean Distance"""
        return np.linalg.norm(self.original - self.processed)

    def manhattan_distance(self):
        """Manhattan Distance"""
        return np.sum(np.abs(self.original - self.processed))

    def cosine_distance(self):
        """Cosine Distance"""
        return scipy.spatial.distance.cosine(self.original, self.processed)

    def chebyshev_distance(self):
        """Chebyshev Distance"""
        return np.max(np.abs(self.original - self.processed))

    def shannon_entropy(self, signal):
        """Shannon Entropy"""
        prob_dist = np.histogram(signal, bins=256, density=True)[0]
        prob_dist = prob_dist[prob_dist > 0]
        return -np.sum(prob_dist * np.log2(prob_dist))

    def kl_divergence(self):
        """Kullback-Leibler (KL) Divergence"""
        orig_hist = np.histogram(self.original, bins=256, density=True)[0] + 1e-10
        proc_hist = np.histogram(self.processed, bins=256, density=True)[0] + 1e-10
        return scipy.stats.entropy(orig_hist, proc_hist)

    def pesq(self):
        """Perceptual Evaluation of Speech Quality (PESQ)"""
        # PESQ requires signals to be resampled to 16 kHz or 8 kHz
        if self.sr not in [8000, 16000]:
            raise ValueError("PESQ requires sampling rate of 8000 Hz or 16000 Hz.")
        return pesq.pesq(self.sr, self.original, self.processed, 'wb')  # 'wb' for wideband

    def stoi(self):
        """Short-Time Objective Intelligibility (STOI)"""
        return stoi(self.original, self.processed, self.sr, extended=False)

    def compute_all_metrics(self):
        """Compute all metrics"""
        return {
            'LSD': self.log_spectral_distance(),
            'MCD': self.mel_cepstral_distance(),
            'SegSNR': self.segmental_snr(),
            'SFM': self.spectral_flatness_measure(),
            'Spectral Centroid Distance': self.spectral_centroid_distance(),
            'Spectral Coherence': self.spectral_coherence(),
            'HNR': self.harmonic_to_noise_ratio(),
            'Itakura-Saito Distance': self.itakura_saito_distance(),
            'Bark Spectral Distortion': self.bark_spectral_distortion(),
            'Tonality Index': self.tonality_index(),
            'PSNR': self.psnr(),
            'MSE': self.mse(),
            'Euclidean': self.euclidean_distance(),
            'Manhattan': self.manhattan_distance(),
            'Cosine': self.cosine_distance(),
            'Chebyshev': self.chebyshev_distance(),
            'Shannon Entropy': self.shannon_entropy(self.processed),
            'KL Divergence': self.kl_divergence(),
            'PESQ': self.pesq(),
            'STOI': self.stoi()
        }

# New Section

In [None]:
import os
import glob
import numpy as np
import librosa
import zipfile
import requests
from tqdm import tqdm

# ✅ Download function for datasets
def download_dataset(url, save_path):
    """Download and extract a dataset if it doesn't already exist."""
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    zip_path = os.path.join(save_path, "dataset.zip")

    if not os.path.exists(zip_path):
        print(f"Downloading dataset from {url} ...")
        response = requests.get(url, stream=True)
        with open(zip_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    file.write(chunk)

    # Extract ZIP file
    print(f"Extracting dataset in {save_path} ...")
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(save_path)

    os.remove(zip_path)  # Remove zip file after extraction
    print("Dataset downloaded and extracted.")

# ✅ Function to add White Gaussian Noise
def add_white_gaussian_noise(signal, snr_db):
    """Add white Gaussian noise to a signal at a given SNR level."""
    signal_power = np.mean(signal ** 2)
    noise_power = signal_power / (10 ** (snr_db / 10))
    noise = np.sqrt(noise_power) * np.random.randn(len(signal))
    return signal + noise

# ✅ Function to process dataset
def process_dataset(audio_files, sr=16000, snr_levels=[30, 20, 10, 5]):
    """Process all files in the dataset and compute metrics."""

    # Initialize results storage
    results = {
        snr: {metric: [] for metric in SignalMetrics(np.zeros(1), np.zeros(1)).compute_all_metrics().keys()}
        for snr in snr_levels
    }

    for file in tqdm(audio_files, desc="Processing Audio Files"):
        try:
            # Load the audio file
            original, _ = librosa.load(file, sr=sr, duration=5.0)

            # Check if the loaded file is long enough
            if len(original) < 2048:
                print(f"Skipping {file}: Too short! (Length: {len(original)} samples)")
                continue  # Skip this file

            # Process each SNR level
            for snr in snr_levels:
                noisy_signal = add_white_gaussian_noise(original, snr)
                metrics = SignalMetrics(original, noisy_signal).compute_all_metrics()

                # Store results
                for metric in metrics:
                    results[snr][metric].append(metrics[metric])

        except Exception as e:
            print(f"Error processing {file}: {e}")
            continue  # Skip the problematic file

    # Compute average results over all files
    avg_results = {
        snr: {metric: np.mean(results[snr][metric]) if results[snr][metric] else 0 for metric in results[snr]}
        for snr in snr_levels
    }
    return avg_results

# ✅ Step 1: Automate Dataset Download
speech_dataset_url = "https://zenodo.org/record/5036977/files/arabic_speech_commands.zip"  # Example dataset (Change if needed)
music_dataset_url = "http://opihi.cs.uvic.ca/sound/genres.tar.gz"  # GTZAN Music Dataset

speech_dataset_path = "./datasets/speech/"
music_dataset_path = "./datasets/music/"

# Download & Extract
download_dataset(speech_dataset_url, speech_dataset_path)
download_dataset(music_dataset_url, music_dataset_path)

# ✅ Step 2: Locate Audio Files
speech_files = glob.glob(os.path.join(speech_dataset_path, "**/*.wav"), recursive=True)
music_files = glob.glob(os.path.join(music_dataset_path, "**/*.wav"), recursive=True)

# ✅ Step 3: Process Speech and Music Datasets
if len(speech_files) == 0:
    print("No speech files found! Check dataset path.")
else:
    speech_results = process_dataset(speech_files)

if len(music_files) == 0:
    print("No music files found! Check dataset path.")
else:
    music_results = process_dataset(music_files)

# ✅ Step 4: Print Results
print("Speech Results:", speech_results)
print("Music Results:", music_results)


Downloading dataset from https://zenodo.org/record/5036977/files/arabic_speech_commands.zip ...
Extracting dataset in ./datasets/speech/ ...


BadZipFile: File is not a zip file