In [3]:
# Import necessary libraries
from pyannote.audio import Model
from pyannote.audio.pipelines import VoiceActivityDetection
import librosa
import soundfile as sf
import noisereduce as nr
import torch
import torchaudio
from speechbrain.inference.enhancement import SpectralMaskEnhancement
import numpy as np 
import os
from pydub import AudioSegment
from pydub.silence import split_on_silence
import glob
import shutil
from transformers import pipeline
import librosa.display
import matplotlib.pyplot as plt
import random
from collections import Counter

# Pre-trained model enhancing speech

In [7]:
# Define the input folder and output folder
input_folder = 'batch_1/*.wav'
output_enhanced_folder = 'enhanced_audios/'


In [8]:

# Create output folder if it doesn't exist
os.makedirs(output_enhanced_folder, exist_ok=True)

# Load the enhancement model
enhance_model = SpectralMaskEnhancement.from_hparams(
    source="speechbrain/metricgan-plus-voicebank",
    savedir="pretrained_models/metricgan-plus-voicebank",
)


In [None]:

# Process each audio file in the input folder
for idx, audio_file in enumerate(glob.glob(input_folder)):
    # Load and add fake batch dimension
    noisy = enhance_model.load_audio(audio_file).unsqueeze(0)

    # Add relative length tensor and enhance the audio
    enhanced = enhance_model.enhance_batch(noisy, lengths=torch.tensor([1.]))

    # Save the enhanced signal on disk with a unique filename
    enhanced_file_path = os.path.join(output_enhanced_folder, f'EnhancedSpeech_{idx+1}.wav')
    torchaudio.save(enhanced_file_path, enhanced.cpu(), 16000)

# Optionally, print a message indicating completion
print("Enhancement completed for all audio files.")


# Library to reduse noise 

In [10]:
# Define the input and output folders
input_enhanced_folder = 'enhanced_audios/'
output_cleaned_folder = 'CleanedAudios/'


In [None]:

# Create output folder if it doesn't exist
os.makedirs(output_cleaned_folder, exist_ok=True)

# Process each enhanced audio file in the input folder
for enhanced_audio_file in glob.glob(os.path.join(input_enhanced_folder, '*.wav')):
    # Load the enhanced audio
    audio_data, sr = librosa.load(enhanced_audio_file)

    # Reduce noise
    reduced_noise_audio = nr.reduce_noise(y=audio_data, sr=sr)

    # Save the cleaned audio in the CleanedAudios folder
    cleaned_file_path = os.path.join(output_cleaned_folder, os.path.basename(enhanced_audio_file))
    sf.write(cleaned_file_path, reduced_noise_audio, sr)

# Optionally, print a message indicating completion
print("Noise reduction completed for all enhanced audio files.")

# Pre-trained model for extracting the speech

In [3]:
# Replace 'your_actual_token_here' with your Hugging Face access token
HUGGINGFACE_TOKEN = "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"


In [4]:
# Load the pre-trained model for segmentation from Hugging Face
model = Model.from_pretrained(
  "pyannote/segmentation-3.0",  # Ensure you've accepted the model's user conditions
  use_auth_token=HUGGINGFACE_TOKEN
)

In [5]:
# Initialize the voice activity detection pipeline using the model
pipeline = VoiceActivityDetection(segmentation=model)

In [6]:
# Define the hyperparameters for the voice activity detection
HYPER_PARAMETERS = {
    "min_duration_on": 1.0,  # Minimum duration of speech (in seconds)
    "min_duration_off": 1.5,  # Minimum duration of silence (in seconds)
}

In [None]:
# Instantiate the pipeline with the defined hyperparameters
pipeline.instantiate(HYPER_PARAMETERS)

In [8]:
# Function to apply VAD and split audio based on speech segments
def split_and_extract_speech_with_vad(input_audio_file, output_dir, file_index):
    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Run VAD on the input audio file
    vad = pipeline(input_audio_file)

    # Load the original audio file
    audio, sr = librosa.load(input_audio_file, sr=None)

    # Process each speech segment
    for i, segment in enumerate(vad.itersegments()):
        start_time = int(segment.start * sr)  # Convert from seconds to samples
        end_time = int(segment.end * sr)

        # Extract the speech segment from the original audio
        speech_audio = audio[start_time:end_time]

        # If the segment has some valid length, save it as a separate file
        if len(speech_audio) > 0:
            output_file_path = os.path.join(output_dir, f"speech_segment_{file_index}_{i + 1}.wav")
            sf.write(output_file_path, speech_audio, sr)
            print(f"Saved: {output_file_path}")


In [None]:
# Main function to process all files in the CleanedAudios folder
def process_cleaned_audios(input_folder, output_folder):
    # Loop through all audio files in the input folder
    for idx, audio_file in enumerate(glob.glob(os.path.join(input_folder, '*.wav'))):
        # Apply VAD and extract speech segments from each file
        split_and_extract_speech_with_vad(audio_file, output_folder, idx + 1)

# Define the input folder containing cleaned audios and output folder for speech segments
input_cleaned_folder = "Batch4Audio"  # Folder with cleaned audio files
output_speech_segments_folder = 'Segments_4/'  # Folder to save all speech segments

# Ensure the output folder for speech segments exists
os.makedirs(output_speech_segments_folder, exist_ok=True)

# Process all cleaned audio files and extract speech segments
if __name__ == "__main__":
    process_cleaned_audios(input_cleaned_folder, output_speech_segments_folder)

print("Speech extraction completed for all cleaned audio files.")

# Dataset labeling with emotions using pre-training model

In [None]:
audio_classification_pipeline = pipeline("audio-classification", model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition")

In [5]:
# Path to the folder containing batch folders
processed_audio_folder = "C:\\Users\\rkhm3\\Desktop\\PerfectCLeaning\\AllAudioData"
# Path to the folder where emotion-labeled folders will be created
output_folder = "Emotion_Classified_Audio"

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)


In [None]:
# Initialize a counter to rename files
audio_counter = 1

# Iterate over each batch folder
for batch_folder in os.listdir(processed_audio_folder):
    batch_folder_path = os.path.join(processed_audio_folder, batch_folder)
    
    # Skip non-folder files
    if not os.path.isdir(batch_folder_path):
        continue

    # Iterate over all audio files in the batch folder
    for file_name in os.listdir(batch_folder_path):
        audio_file_path = os.path.join(batch_folder_path, file_name)
        
        try:
            # Use the pipeline to classify the audio file
            result = audio_classification_pipeline(audio_file_path)
            emotion = result[0]['label']
            
            # Define the folder for this emotion
            emotion_folder = os.path.join(output_folder, emotion)
            os.makedirs(emotion_folder, exist_ok=True)
            
            # Rename and move the audio file to the emotion folder
            new_file_name = f"Audio_{audio_counter}_{emotion}.wav"
            new_file_path = os.path.join(emotion_folder, new_file_name)
            
            # Copy the file to the emotion folder with the new name
            shutil.copy(audio_file_path, new_file_path)
            print(f"Saved: {new_file_name}")
            # Increment the counter
            audio_counter += 1
            
        except Exception as e:
            print(f"Error processing {audio_file_path}: {str(e)}")
            continue

print("Audio files have been classified, renamed, and saved by emotion.")


# Dataset balancing

In [None]:

# Define dataset paths
original_dataset_path = r"C:\Users\rkhm3\Desktop\HearWell\Emotion_Classified_Audio" # Path to your dataset folders
balanced_dataset_path = "Balanced_Emotion_Audio"  # Path to save the balanced dataset
target_samples = 14000  # Target number of samples per emotion

# Target emotions (folders to balance)
target_emotions = ['angry', 'calm', 'disgust', 'happy', 'sad']

# Create the balanced dataset directory
os.makedirs(balanced_dataset_path, exist_ok=True)


In [None]:

# Iterate over each emotion folder
for emotion in target_emotions:
    emotion_folder = os.path.join(original_dataset_path, emotion)
    if not os.path.exists(emotion_folder):
        print(f"Emotion folder '{emotion}' does not exist. Skipping.")
        continue

    # Get list of all files in the emotion folder
    files = [f for f in os.listdir(emotion_folder) if os.path.isfile(os.path.join(emotion_folder, f))]

    # Randomly sample files if there are more than target_samples
    if len(files) > target_samples:
        sampled_files = random.sample(files, target_samples)
    else:
        print(f"Not enough files in '{emotion}'. Using all {len(files)} files.")
        sampled_files = files

    # Create the emotion folder in the balanced dataset directory
    balanced_emotion_folder = os.path.join(balanced_dataset_path, emotion)
    os.makedirs(balanced_emotion_folder, exist_ok=True)

    # Copy sampled files to the balanced dataset directory
    for file in sampled_files:
        src_path = os.path.join(emotion_folder, file)
        dest_path = os.path.join(balanced_emotion_folder, file)
        shutil.copy(src_path, dest_path)

    print(f"Balanced dataset created for emotion '{emotion}' with {len(sampled_files)} files.")

print("Balanced dataset creation complete!")
