## We will be using the following ensemble methods
* Averaging of probablitites
* Weighted averaging of probablitites
* Not using Majority voting as only 2 classifiers

## Yamnet functions

In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import librosa
import soundfile as sf
import os

2025-02-23 13:57:37.421826: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-02-23 13:57:37.429520: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1740299257.438516  146180 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1740299257.441221  146180 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-23 13:57:37.450662: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [21]:
import os

# 🔹 Disable GPU before TensorFlow initializes
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "false"  # Prevents GPU allocation

import tensorflow as tf
import tensorflow_hub as hub

# ✅ Verify TensorFlow is using only CPU
print("TF Devices After Forcing CPU:", tf.config.list_physical_devices())

# ✅ Now load YAMNet
yamnet_model = hub.load("https://tfhub.dev/google/yamnet/1")


TF Devices After Forcing CPU: [PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [22]:
# Load the pretrained YAMNet model
with tf.device('/CPU:0'):  # Force CPU for TensorFlow
    yamnet_model = hub.load("https://tfhub.dev/google/yamnet/1")

# Load our trained classifier model
classifier_model = tf.keras.models.load_model("/home/maditya/Desktop/Front Era/Model1 :- YamNet/final_YamNet_classifier.h5")

# Class labels mapping
class_names = ["Cry", "Scream", "Normal"]



In [4]:
def preprocess_audio_yamnet(audio_path, output_path="processed_audio.wav", target_sr=16000):
    """
    Converts input audio to WAV format if necessary, resamples to 16 kHz, 16-bit PCM, mono, and normalizes it.
    If input is already a valid WAV file, skips unnecessary processing.
    """
    # Check if file is already a WAV
    if audio_path.lower().endswith(".wav"):
        with sf.SoundFile(audio_path) as f:
            if f.samplerate == target_sr and f.channels == 1 and f.subtype == "PCM_16":
                print("Input is already a valid WAV file. Skipping conversion.")
                return audio_path  # No processing needed

    # Otherwise, process the file
    print("🔄 Processing audio to match required format...")
    
    # Load audio file
    y, sr = librosa.load(audio_path, sr=None, mono=True)  # Load as mono

    # Convert to 16 kHz if needed
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)

    # Normalize the audio (-1 to 1)
    y = y / np.max(np.abs(y))

    # Save as WAV with 16-bit PCM
    sf.write(output_path, y, target_sr, subtype="PCM_16")

    return output_path  # Return processed file path

In [5]:
### **2️⃣ Function to Extract YAMNet Embeddings**
def extract_yamnet_embeddings(audio_path):
    """
    Extracts YAMNet embeddings from an audio file and applies mean pooling.
    """
    # Load audio file (YAMNet expects waveform in range [-1, 1])
    y, sr = librosa.load(audio_path, sr=16000, mono=True)

    # Ensure waveform is a float32 NumPy array
    waveform = np.array(y, dtype=np.float32)

    # Run YAMNet to get embeddings
    _, embeddings, _ = yamnet_model(waveform)

    # Mean pooling to get a (1024,) vector
    mean_embedding = np.mean(embeddings.numpy(), axis=0)

    return mean_embedding  # Shape: (1024,)

In [6]:
def predict_probs_yamnet(audio_file):
    """
    1) Preprocess & load audio at 16kHz, mono, 16-bit PCM
    2) Extract YAMNet embeddings (1024-d)
    3) Predict with your Keras classifier -> get probabilities for [cry, scream, normal]
    4) Return shape (3,) array
    """
    # Step 1: Preprocess
    processed_audio = preprocess_audio_yamnet(audio_file)  # from your code
    
    # Step 2: Extract YAMNet embeddings
    embedding = extract_yamnet_embeddings(processed_audio)  # shape (1024,)

    # Step 3: Keras model expects shape (1, 1024)
    embedding = embedding.reshape(1, -1)

    # Step 4: Predict probabilities
    pred_probs = classifier_model.predict(embedding)  # shape (1, 3)
    return pred_probs[0]  # shape (3,)


## Wav2Vec2 functions

In [23]:
import os
import librosa
import soundfile as sf
import numpy as np
import torch
import torch.nn as nn
from transformers import Wav2Vec2Model, Wav2Vec2FeatureExtractor

In [24]:
model_name = "facebook/wav2vec2-base-960h"
wav2vec2_model = Wav2Vec2Model.from_pretrained(model_name)
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(model_name)

wav2vec2_model.eval()      # Inference mode
wav2vec2_model.to("cpu")   # or "cuda" if you have a GPU

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Wav2Vec2Model(
  (feature_extractor): Wav2Vec2FeatureEncoder(
    (conv_layers): ModuleList(
      (0): Wav2Vec2GroupNormConvLayer(
        (conv): Conv1d(1, 512, kernel_size=(10,), stride=(5,), bias=False)
        (activation): GELUActivation()
        (layer_norm): GroupNorm(512, 512, eps=1e-05, affine=True)
      )
      (1-4): 4 x Wav2Vec2NoLayerNormConvLayer(
        (conv): Conv1d(512, 512, kernel_size=(3,), stride=(2,), bias=False)
        (activation): GELUActivation()
      )
      (5-6): 2 x Wav2Vec2NoLayerNormConvLayer(
        (conv): Conv1d(512, 512, kernel_size=(2,), stride=(2,), bias=False)
        (activation): GELUActivation()
      )
    )
  )
  (feature_projection): Wav2Vec2FeatureProjection(
    (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (projection): Linear(in_features=512, out_features=768, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): Wav2Vec2Encoder(
    (pos_conv_embed): Wav2Vec2PositionalConvEmbedding(
  

In [25]:
def preprocess_audio_wav2vec2(input_path, output_path="temp_processed.wav", target_sr=16000):
    """
    Converts input audio to WAV (16kHz, 16-bit PCM, mono).
    If it's already valid, you could skip re-saving. Here we always ensure consistency.
    """
    # Load with librosa (auto-resamples if sr != None).
    y, sr = librosa.load(input_path, sr=None, mono=True)

    # If sample rate != target_sr, resample
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)

    # Normalize to -1..1
    if np.max(np.abs(y)) > 0:
        y = y / np.max(np.abs(y))

    # Save as 16-bit PCM WAV
    sf.write(output_path, y, target_sr, subtype="PCM_16")

    return output_path


In [26]:
def extract_wav2vec2_embedding(audio_path):
    """
    Loads audio at 16kHz, uses Wav2Vec2FeatureExtractor & Wav2Vec2Model to get (hidden_size,) embedding.
    """
    # 1) Load audio at 16kHz
    y, sr = librosa.load(audio_path, sr=16000, mono=True)

    # 2) Convert waveform to Wav2Vec2 inputs
    inputs = feature_extractor(y, sampling_rate=16000, return_tensors="pt")

    # 3) Forward pass
    with torch.no_grad():
        outputs = wav2vec2_model(**inputs)  # last_hidden_state shape: (1, seq_len, hidden_size)

    # 4) Squeeze batch dimension => (seq_len, hidden_size)
    last_hidden = outputs.last_hidden_state.squeeze(0)

    # 5) Mean-pool over time => shape (hidden_size,)
    mean_emb = last_hidden.mean(dim=0).cpu().numpy()

    return mean_emb


In [27]:
import torch.nn as nn

class FFNN(nn.Module):
    def __init__(self, input_dim=768, dropout_rate=0.3):
        """
        A feed-forward neural network with Dropout and BatchNorm for regularization.
        Adjust input_dim if your embeddings are not 768 in size.
        """
        super(FFNN, self).__init__()

        self.fc1 = nn.Linear(input_dim, 1024)
        self.bn1 = nn.BatchNorm1d(1024)  
        self.dropout1 = nn.Dropout(dropout_rate)

        self.fc2 = nn.Linear(1024, 512)
        self.bn2 = nn.BatchNorm1d(512)  
        self.dropout2 = nn.Dropout(dropout_rate)

        self.fc3 = nn.Linear(512, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.dropout3 = nn.Dropout(dropout_rate)

        self.fc4 = nn.Linear(256, 128)
        self.bn4 = nn.BatchNorm1d(128)
        self.dropout4 = nn.Dropout(dropout_rate)

        self.fc5 = nn.Linear(128, 3)  # Output for 3 classes

        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)

        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)

        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout3(x)

        x = self.relu(self.bn4(self.fc4(x)))
        x = self.dropout4(x)

        x = self.fc5(x)  # Logits for classification
        return x


In [28]:
# Load your trained weights
device = torch.device("cpu")  # or "cuda"
acoustic_classifier = FFNN(input_dim=768, dropout_rate=0.3).to(device)
acoustic_classifier.load_state_dict(torch.load("/home/maditya/Desktop/Front Era/Model2 :- Wav2Vec2/best_model.pth", map_location=device))
acoustic_classifier.eval()

FFNN(
  (fc1): Linear(in_features=768, out_features=1024, bias=True)
  (bn1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout1): Dropout(p=0.3, inplace=False)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (bn2): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout2): Dropout(p=0.3, inplace=False)
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (bn3): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout3): Dropout(p=0.3, inplace=False)
  (fc4): Linear(in_features=256, out_features=128, bias=True)
  (bn4): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout4): Dropout(p=0.3, inplace=False)
  (fc5): Linear(in_features=128, out_features=3, bias=True)
  (relu): ReLU()
)

In [29]:
def predict_probs_wav2vec2(audio_file):
    """
    1) Preprocess & load audio at 16kHz
    2) Extract Wav2Vec2 embeddings (768-d)
    3) Forward pass through PyTorch FFNN -> get logits
    4) Convert logits to probabilities via softmax -> shape (3,)
    """
    # Step 1: Preprocess
    processed_path = preprocess_audio_wav2vec2(audio_file, "temp_processed.wav")  # from your code

    # Step 2: Extract Wav2Vec2 embedding
    embedding = extract_wav2vec2_embedding(processed_path)  # shape (768,)

    # Convert to torch tensor => shape (1, 768)
    tensor_emb = torch.from_numpy(embedding).unsqueeze(0).float().to(device)

    # Step 3: Get logits from your FFNN
    with torch.no_grad():
        logits = acoustic_classifier(tensor_emb)  # shape (1, 3)

    # Step 4: Softmax -> probabilities
    probs = torch.softmax(logits, dim=1).cpu().numpy()[0]  # shape (3,)
    return probs


# Ensemble by Averaging

In [30]:
def ensemble_average(audio_file):
    """
    1) Get probabilities from YAMNet & Wav2Vec2
    2) Average them
    3) Argmax -> final label
    """
    p_yam = predict_probs_yamnet(audio_file)  # shape (3,)
    p_wav = predict_probs_wav2vec2(audio_file)  # shape (3,)

    # Average
    p_ensemble = (p_yam + p_wav) / 2.0  # shape (3,)

    final_class_idx = np.argmax(p_ensemble)
    class_names = ["cry", "scream", "normal"]  # must match your training labels
    predicted_label = class_names[final_class_idx]
    confidence = p_ensemble[final_class_idx]

    print(f"\nEnsemble (Averaging) Prediction => {predicted_label} (Confidence: {confidence:.2f})")
    return predicted_label, confidence


# Ensemble by weighted averaging
* YamNet is more accurate, hence give more weight to him

In [31]:
def ensemble_weighted(audio_file, alpha=0.6):
    """
    Weighted ensemble:
      p_ensemble = alpha * p_yam + (1 - alpha) * p_wav
    If alpha > 0.5, YAMNet has more influence; if alpha < 0.5, Wav2Vec2 has more influence.
    """
    p_yam = predict_probs_yamnet(audio_file)  # shape (3,)
    p_wav = predict_probs_wav2vec2(audio_file)  # shape (3,)

    # Weighted average
    p_ensemble = alpha * p_yam + (1 - alpha) * p_wav  # shape (3,)

    final_class_idx = np.argmax(p_ensemble)
    class_names = ["cry", "scream", "normal"]
    predicted_label = class_names[final_class_idx]
    confidence = p_ensemble[final_class_idx]

    print(f"\nEnsemble (Weighted α={alpha}) Prediction => {predicted_label} (Confidence: {confidence:.2f})")
    return predicted_label, confidence


# Usage Example

In [None]:
audio_file = "/home/maditya/Desktop/Front Era/Other/Augmented_Dataset/Scream/scream_06_aug1.wav"

# 1) Ensemble by simple averaging
label_avg, conf_avg = ensemble_average(audio_file)

# 2) Ensemble by weighted averaging (e.g., alpha=0.7 => 70% YAMNet, 30% Wav2Vec2)
label_wt, conf_wt = ensemble_weighted(audio_file, alpha=0.7)


# Creating Testing Dataset

In [38]:
import os
import shutil
import csv
import random

# Define paths
source_folder = "/home/maditya/Desktop/Front Era/Ensemble/Augmented_Dataset"
destination_folder = 'Test_data_ensemble'
csv_file_path = "/home/maditya/Desktop/Front Era/Ensemble/augmented_dataset.csv"

# Create the destination folder if it doesn't exist
if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

# Initialize the CSV file
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['filename', 'label'])  # Write the header

    # Iterate through each class folder
    for class_name in ['Cry', 'Normal', 'Scream']:
        class_folder = os.path.join(source_folder, class_name)
        files = os.listdir(class_folder)
        
        # Calculate 5% of the files
        num_files_to_copy = max(1, int(len(files) * 0.05))  # Ensure at least 1 file is copied
        selected_files = random.sample(files, num_files_to_copy)
        
        # Copy the selected files and write to CSV
        for file_name in selected_files:
            source_file = os.path.join(class_folder, file_name)
            destination_file = os.path.join(destination_folder, file_name)
            shutil.copy(source_file, destination_file)
            
            # Write to CSV
            writer.writerow([file_name, class_name])

print(f"5% of files from each class have been copied to {destination_folder}.")
print(f"CSV file '{csv_file_path}' has been created with the mapping of audio files to labels.")

5% of files from each class have been copied to Test_data_ensemble.
CSV file '/home/maditya/Desktop/Front Era/Ensemble/augmented_dataset.csv' has been created with the mapping of audio files to labels.


### Converting labels in test_Data_ensemble.csv

In [39]:
import csv

# Define the CSV file path
csv_file_path = "/home/maditya/Desktop/Front Era/Ensemble/test_data_ensemble.csv"
updated_csv_file_path = 'test_data_ensemble_encoded.csv'

# Define the label encoding mapping
label_encoding = {'Cry': 0, 'Scream': 1, 'Normal': 2}

# Read the original CSV and write to a new CSV with encoded labels
with open(csv_file_path, mode='r') as infile, open(updated_csv_file_path, mode='w', newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    
    # Read the header and add a new column for encoded labels
    header = next(reader)
    header.append('encoded_label')  # Add the new column
    writer.writerow(header)
    
    # Process each row
    for row in reader:
        filename, label = row
        encoded_label = label_encoding[label]  # Get the encoded value
        row.append(encoded_label)  # Add the encoded label to the row
        writer.writerow(row)

print(f"Encoded labels have been added to the new CSV file: {updated_csv_file_path}")

Encoded labels have been added to the new CSV file: test_data_ensemble_encoded.csv
