In [1]:
import os 
import numpy as np
import pandas as pd
import librosa
import torch
import torch.nn as nn
from scipy.stats import mode
from scipy.stats import entropy
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
class PyTorchModel(nn.Module):
    def __init__(self,input_size):
        super(PyTorchModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)  # Keep spatial size constant
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.dropout = nn.Dropout(p=0.5)  # Dropout layers with probability 0.5
        
        self.global_pool = nn.AdaptiveMaxPool2d(output_size=(1, 1))  # Global max pooling
        self.batch_norm = nn.BatchNorm1d(num_features=32)  # Match the output of global pool
        self.fc = nn.Linear(in_features=32, out_features=3)  # Output layer for 3 classes

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.dropout(x)
        x = F.relu(self.conv2(x))
        x = self.dropout(x)
        x = F.relu(self.conv3(x))
        x = self.dropout(x)
        
        x = self.global_pool(x)  # Global max pooling
        x = x.view(x.size(0), -1)  # Flatten for fully connected layer
        x = self.batch_norm(x)
        x = self.fc(x)  # Dense layer
        
        return x

In [3]:
# Feature Extraction Function
import numpy as np
import librosa
import soundfile as sf
from scipy.signal import butter, filtfilt
from scipy.stats import entropy
class CoughFeatureExtractor:
    def __init__(self, sample_rate=48000):
        self.sample_rate = sample_rate
        
    def bandpass_filter(self, audio, lowcut=150, highcut=2000):
        """Apply bandpass filter with handling for short audio"""
        if len(audio) < 54:  # If audio is shorter than minimum required length
            # Pad the audio to minimum length
            audio = np.pad(audio, (0, 54 - len(audio)), mode='constant')
            
        nyquist = self.sample_rate // 2
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(4, [low, high], btype='band')
        
        try:
            filtered = filtfilt(b, a, audio)
            return filtered
        except ValueError:
            print(f"Warning: Audio length {len(audio)} was too short for filtering")
            return audio  # Return original audio if filtering fails
    
    def onset_detection(self, audio):
        """Detect onsets in the audio signal"""
        onset_env = librosa.onset.onset_strength(y=audio, sr=self.sample_rate)
        onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=self.sample_rate)
        return onset_frames
    
    def calculate_shannon_entropy(self, signal):
        """Calculate Shannon entropy of the signal"""
        histogram, _ = np.histogram(signal, bins=256, range=(-1.0, 1.0), density=True)
        histogram = histogram + 1e-10  # Add small epsilon to avoid log(0)
        return entropy(histogram, base=2)
    
    def calculate_mfcc_features(self, audio_path):
        """Calculate MFCC features and Shannon entropy with handling for short audio"""
        try:
            # Load and resample audio to 16kHz as per paper
            audio, orig_sr = sf.read(audio_path)
            
            # Convert to mono if stereo
            if len(audio.shape) > 1:
                audio = np.mean(audio, axis=1)
            
            if orig_sr != self.sample_rate:
                audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=self.sample_rate)
            
            # Ensure minimum length (100ms)
            min_samples = int(0.1 * self.sample_rate)  # 100ms worth of samples
            if len(audio) < min_samples:
                audio = np.pad(audio, (0, min_samples - len(audio)), mode='constant')
            
            # Apply preprocessing
            audio = audio / (np.max(np.abs(audio)) + 1e-10)  # Normalize with small epsilon
            audio = self.bandpass_filter(audio)  # Bandpass filter
            
            # Onset detection
            onset_frames = self.onset_detection(audio)
            if len(onset_frames) == 0:
                print("No significant onsets detected, skipping audio.")
                return None, None
            
            # Calculate frame length and hop length (5ms frame with 25% overlap)
            frame_length_ms = 5
            hop_length_ms = 3.75  # 25% overlap
            
            frame_length = int(frame_length_ms * self.sample_rate / 1000)
            hop_length = int(hop_length_ms * self.sample_rate / 1000)
            
            mfccs = librosa.feature.mfcc(
                y=audio,
                sr=self.sample_rate,
                n_mfcc=40,
                n_fft=frame_length,
                hop_length=hop_length,
                n_mels=40,
                window='hamming'
            )
            
            # Calculate Shannon entropy
            shannon_entropy = self.calculate_shannon_entropy(audio)
            
            return mfccs, shannon_entropy
            
        except Exception as e:
            print(f"Error processing {audio_path}: {str(e)}")
            return None, None

In [4]:
def evaluate_predictions_mean_based(folder_path, model_path, labeled_csv_path, output_csv_path):
    # Load the trained model
    input_size = 9  # Total features per frame
    model = PyTorchModel(input_size=input_size)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    # Load labeled data
    labeled_data = pd.read_csv(labeled_csv_path)

    true_labels = []
    predicted_labels = []
    filenames = []

    # Initialize the feature extractor
    feature_extractor = CoughFeatureExtractor()

    for file in os.listdir(folder_path):
        if file.endswith((".wav", ".ogg")):
            file_path = os.path.join(folder_path, file)

            # Extract features using the feature extractor
            mfccs, shannon_entropy = feature_extractor.calculate_mfcc_features(file_path)

            # Skip if feature extraction failed
            if mfccs is None or shannon_entropy is None:
                print(f"Skipping {file} due to feature extraction error.")
                continue

            # Flatten the MFCCs and combine with Shannon entropy
            mfccs_flat = mfccs.flatten()  # Flatten MFCCs into a 1D array
            features = np.append(mfccs_flat, shannon_entropy)  # Combine with Shannon entropy

            # Convert to tensor
            features_tensor = torch.tensor(features, dtype=torch.float32)

            # Predict for each frame
            with torch.no_grad():
                outputs = model(features_tensor.unsqueeze(0))  # Add batch dimension
                frame_predictions = torch.argmax(outputs, dim=1).numpy()

            # Calculate mode (most frequent) of predictions
            if len(frame_predictions) > 0:
                mode_result = mode(frame_predictions)
                
                # Check if mode_result.mode is an array or scalar
                if isinstance(mode_result.mode, np.ndarray):
                    mean_prediction = int(mode_result.mode[0]) + 1  # Add 1 for mapping (1, 2, 3)
                else:
                    mean_prediction = int(mode_result.mode) + 1  # Add 1 for mapping (1, 2, 3)
                    
            else:
                mean_prediction = -1  # Assign a default label if no predictions are available

            # Extract file number from file name and match with labeled data
            file_number = int(''.join(filter(str.isdigit, file)))  # Extract digits from the file name
            true_label_row = labeled_data[labeled_data["file_number"] == file_number]

            # Get true label from labeled data
            if not true_label_row.empty:
                true_label = true_label_row["target"].values[0]
            else:
                true_label = -1  # Assign a default label if no labels are found

            # Append results
            filenames.append(file)
            true_labels.append(true_label)
            predicted_labels.append(mean_prediction)
            print(f"Processed {file}: True Label = {true_label}, Predicted Label = {mean_prediction}")

    # Calculate accuracy
    accuracy = np.mean(np.array(true_labels) == np.array(predicted_labels)) * 100
    print(f"Accuracy: {accuracy:.2f}%")

    # Save results to CSV
    output_df = pd.DataFrame({
        "Filename": filenames,
        "True Label": true_labels,
        "Predicted Label": predicted_labels
    })
    output_df.to_csv(output_csv_path, index=False)
    print(f"Results saved to {output_csv_path}")

# Paths and Configuration
folder_path = r"C:\Users\OM\Desktop\Anvesshan\Final_supported_file1"  # Path to folder containing audio files
model_path = r"C:\Users\OM\Desktop\Anvesshan\modeified_files\trained_model.pth"  # Path to the trained model
labeled_csv_path = r"C:\Users\OM\Desktop\Anvesshan\modeified_files\labelled_data.csv"  # Path to labeled CSV file
output_csv_path = "output.csv"  # Output CSV file path

# Run the evaluation
evaluate_predictions_mean_based(folder_path, model_path, labeled_csv_path, output_csv_path)

: 