<a href="https://colab.research.google.com/github/Sazim2019331087/voice_model/blob/main/CNN_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# --- IMPORTANT WARNING ---
print("="*80)
print("WARNING: Training a deep learning model from scratch (CNN Only)")
print("         with only 147 audio files is highly challenging and prone to overfitting.")
print("         The model will likely have limited generalization to new voices not in your dataset.")
print("         This code is provided for educational purposes to demonstrate the architecture.")
print("         Ensure you have a GPU runtime enabled in Colab, as CPU training will be extremely slow.")
print("="*80)

In [None]:
# ==============================================================================
# Step 1: Setup and Install Libraries
# ==============================================================================
print("--- Installing required libraries ---")
!pip install --upgrade pip
# CRITICAL: Re-install PyTorch and TorchAudio to ensure CUDA version compatibility
!pip install torch==2.0.1 torchvision==0.15.2 torchaudio==2.0.2 --index-url https://download.pytorch.org/whl/cu118
!pip install pandas scikit-learn joblib tqdm
!pip install ffmpeg-python
!apt-get update && !apt-get install -y ffmpeg

In [None]:
# ==============================================================================
# Step 2: Mount Google Drive and Load Data
# ==============================================================================
from google.colab import drive
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torchaudio
from torchaudio.transforms import MFCC
import numpy as np
from tqdm.notebook import tqdm
import joblib

print("\n--- Mounting Google Drive ---")
drive.mount('/content/drive')

PROJECT_ROOT_DIR = '/content/drive/MyDrive/project'
CSV_PATH = os.path.join(PROJECT_ROOT_DIR, 'training.csv')
AUDIO_FOLDER_PATH = os.path.join(PROJECT_ROOT_DIR, 'voices')

if not os.path.exists(PROJECT_ROOT_DIR):
    raise FileNotFoundError(f"Error: Project folder '{PROJECT_ROOT_DIR}' not found. Please check the path and your Google Drive structure.")
elif not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"Error: CSV file '{CSV_PATH}' not found. Please ensure it's in the correct location.")
elif not os.path.exists(AUDIO_FOLDER_PATH):
    raise FileNotFoundError(f"Error: Audio folder '{AUDIO_FOLDER_PATH}' not found. Please check the path and upload your audio files.")
else:
    print(f"Successfully located project folder at: {PROJECT_ROOT_DIR}")

print("\n--- Loading data from CSV ---")
df = pd.read_csv(CSV_PATH)
df['audio_path'] = df['audio_file'].apply(lambda x: os.path.join(AUDIO_FOLDER_PATH, x))

print("\n--- Verifying audio file paths and formats... ---")
verified_data_for_df = []

test_mfcc_transform = MFCC(
    sample_rate=16000, n_mfcc=40, melkwargs={'n_fft': 400, 'hop_length': 160, 'n_mels': 128}
)

problematic_files = []

for idx, row in tqdm(df.iterrows(), total=len(df), desc="Verifying audio files"):
    audio_file_path = row['audio_path']

    if not os.path.exists(audio_file_path):
        problematic_files.append((row['audio_file'], row['email'], "File Not Found"))
        continue

    try:
        waveform, sample_rate = torchaudio.load(audio_file_path, frame_offset=0, num_frames=16000 * 2)
        if sample_rate != 16000:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
            waveform = resampler(waveform)
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        elif waveform.ndim == 1:
            waveform = waveform.unsqueeze(0)

        temp_mfcc_features = test_mfcc_transform(waveform)

        if temp_mfcc_features.ndim != 3 or temp_mfcc_features.shape[0] != 1:
            raise ValueError(f"Initial MFCC features for {audio_file_path} unexpected shape: {temp_mfcc_features.shape}")

        verified_data_for_df.append(row.to_dict())

    except Exception as e:
        problematic_files.append((row['audio_file'], row['email'], f"Format Error: {e}"))

if problematic_files:
    print(f"\n--- {len(problematic_files)} Problematic audio files found and skipped ---")
    problematic_df = pd.DataFrame(problematic_files, columns=['audio_file', 'email', 'Reason'])
    print(problematic_df.to_markdown(index=False, numalign="left", stralign="left"))
    print("\nTip: Use `ffmpeg -i input.wav -ar 16000 -ac 1 -c:a pcm_s16le output_converted.wav` to convert problematic files.")
else:
    print("\nAll audio files verified successfully!")

if not verified_data_for_df:
    raise ValueError("No valid audio files found after verification. Please check your data.")

existing_files_df = pd.DataFrame(verified_data_for_df)
existing_files_df['speaker_id'] = existing_files_df['email'].astype('category').cat.codes
speaker_mapping = dict(enumerate(existing_files_df['email'].astype('category').cat.categories))
num_speakers = len(speaker_mapping)

print(f"\n--- Speaker Mapping (Total Unique Speakers: {num_speakers}) ---")
print(speaker_mapping)

if len(existing_files_df) < num_speakers * 2:
    print("\nWARNING: Dataset has very few samples per speaker. Stratification might be difficult.")
    print(f"Total samples: {len(existing_files_df)}, Unique speakers: {num_speakers}")
    train_df, test_df = train_test_split(
        existing_files_df,
        test_size=max(1, min(int(0.2 * len(existing_files_df)), num_speakers)),
        random_state=42,
        stratify=None
    )
    print("Proceeding with NON-STRATIFIED split due to limited samples per speaker.")
else:
    train_df, test_df = train_test_split(
        existing_files_df,
        test_size=num_speakers,
        random_state=42,
        stratify=existing_files_df['speaker_id']
    )
    print(f"Using STRATIFIED split with {len(test_df)} samples in test set.")

print(f"\n--- Dataset Split for Training and Testing ---\nTraining samples: {len(train_df)}\nTesting samples: {len(test_df)}")


In [None]:
# ==============================================================================
# Step 3: Create a Custom PyTorch Dataset with MFCCs
# ==============================================================================
class SpeakerDatasetMFCC(Dataset):
    def __init__(self, dataframe, target_sr=16000, num_mfcc=40, n_fft=400, hop_length=160):
        self.dataframe = dataframe
        self.target_sr = target_sr
        self.num_mfcc = num_mfcc
        self.max_len_sec = 30
        self.max_len_samples = self.max_len_sec * self.target_sr

        self.mfcc_transform = MFCC(
            sample_rate=target_sr, n_mfcc=num_mfcc, melkwargs={'n_fft': n_fft, 'hop_length': hop_length, 'n_mels': 128}
        )

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        audio_path = row['audio_path']
        label = row['speaker_id']

        try:
            waveform, sample_rate = torchaudio.load(audio_path)

            if sample_rate != self.target_sr:
                resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=self.target_sr)
                waveform = resampler(waveform)

            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)
            elif waveform.ndim == 1:
                waveform = waveform.unsqueeze(0)

            if waveform.shape[1] > self.max_len_samples:
                waveform = waveform[:, :self.max_len_samples]
            elif waveform.shape[1] < self.max_len_samples:
                padding = self.max_len_samples - waveform.shape[1]
                waveform = torch.nn.functional.pad(waveform, (0, padding))

            mfcc_features = self.mfcc_transform(waveform)

            if mfcc_features.ndim == 3 and mfcc_features.shape[0] == 1:
                mfcc_features = mfcc_features.squeeze(0)

            return mfcc_features, torch.tensor(label, dtype=torch.long)

        except Exception as e:
            print(f"Error processing {audio_path}: {e}. Skipping this sample.")
            return None, None

def collate_fn(batch):
    batch = [item for item in batch if item[0] is not None]
    if not batch:
        return None, None

    mfccs, labels = zip(*batch)

    mfccs_stacked = torch.stack(mfccs)
    labels_stacked = torch.stack(labels)

    return mfccs_stacked, labels_stacked

train_dataset = SpeakerDatasetMFCC(train_df)
test_dataset = SpeakerDatasetMFCC(test_df)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

print(f"\n--- DataLoader created. Total training batches: {len(train_loader)} ---")


In [None]:
# ==============================================================================
# Step 4: Define a CNN Only Model (SpeakerCNN)
# ==============================================================================

class SpeakerCNN(nn.Module):
    def __init__(self, num_speakers, num_mfcc=40, dropout_rate=0.3):
        super(SpeakerCNN, self).__init__()

        print("\n--- Initializing SpeakerCNN Model Architecture (ONLY CNN) ---")
        print(f"Number of speakers (output classes): {num_speakers}")
        print(f"Number of MFCC features (input channels for CNN): {num_mfcc}")
        print(f"Dropout rate: {dropout_rate}")

        self.conv_layers = nn.Sequential(
            nn.Conv1d(num_mfcc, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),

            nn.Conv1d(128, 256, kernel_size=5, padding=2),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )
        print("  CNN Layers defined.")

        self.global_pool = nn.AdaptiveAvgPool1d(1)
        print(f"  Global Pooling Layer: AdaptiveAvgPool1d(1) (summarizes CNN output sequence)")

        self.fc_layer = nn.Linear(256, num_speakers)
        print(f"  Fully Connected Layer: Linear(in=256, out={num_speakers})")
        self.dropout = nn.Dropout(dropout_rate)
        print(f"  Dropout Layer: Dropout(p={dropout_rate})")
        print("--- Model Initialization Complete ---")

    def forward(self, x):
        print(f"\nDEBUG: Forward Pass (SpeakerCNN) - Input shape to model: {x.shape}")

        x = self.conv_layers(x)
        print(f"DEBUG: Forward Pass (SpeakerCNN) - Shape after CNN layers: {x.shape}")

        pooled_output = self.global_pool(x).squeeze(-1)
        print(f"DEBUG: Forward Pass (SpeakerCNN) - Shape after Global Pooling: {pooled_output.shape}")

        x = self.dropout(pooled_output)
        print(f"DEBUG: Forward Pass (SpeakerCNN) - Shape after Dropout: {x.shape}")

        x = self.fc_layer(x)
        print(f"DEBUG: Forward Pass (SpeakerCNN) - Final output shape (logits): {x.shape}")
        return x

# Determine the device (GPU if available, else CPU)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"\n--- Initializing model on device: {device} ---")

# Initialize the SpeakerCNN model
model = SpeakerCNN(num_speakers=num_speakers, num_mfcc=train_dataset.num_mfcc).to(device)

# Define Loss function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
# ==============================================================================
# Step 5: Train the Model
# ==============================================================================

def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=100):
    model.train()
    print("\n--- Starting Training ---")
    best_accuracy = 0.0

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_train_predictions = 0
        total_train_samples = 0

        # tqdm progress bar for training batches
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} (Train)")
        for i, (inputs, labels) in enumerate(pbar):
            if inputs is None: # Skip batches with no valid samples
                pbar.set_postfix_str("Skipping empty batch")
                continue

            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad() # Zero the gradients
            outputs = model(inputs) # Forward pass
            loss = criterion(outputs, labels) # Calculate loss
            loss.backward() # Backward pass
            optimizer.step() # Update weights

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train_samples += labels.size(0)
            correct_train_predictions += (predicted == labels).sum().item()

            # Update progress bar postfix with current batch loss and accuracy
            pbar.set_postfix({'loss': running_loss / (i+1), 'train_acc': 100 * correct_train_predictions / total_train_samples})

        # Calculate epoch-level training loss and accuracy
        epoch_train_loss = running_loss / len(train_loader)
        epoch_train_accuracy = 100 * correct_train_predictions / total_train_samples

        # Evaluate on the test set after each epoch
        model.eval() # Set model to evaluation mode (disables dropout, etc.)
        correct_test_predictions = 0
        total_test_samples = 0
        test_loss = 0.0

        with torch.no_grad(): # Disable gradient calculations for evaluation
            # tqdm progress bar for test batches
            test_pbar = tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} (Test)")
            for inputs, labels in test_pbar:
                if inputs is None: continue # Skip empty batches
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                test_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total_test_samples += labels.size(0)
                correct_test_predictions += (predicted == labels).sum().item()
                # Update progress bar postfix with current test loss and accuracy
                test_pbar.set_postfix({'test_loss': test_loss / (test_pbar.n + 1), 'test_acc': 100 * correct_test_predictions / total_test_samples})

        # Calculate epoch-level test loss and accuracy
        epoch_test_loss = test_loss / len(test_loader)
        epoch_test_accuracy = 100 * correct_test_predictions / total_test_samples

        # Print epoch summary
        print(f"Epoch {epoch+1} Summary: Train Loss: {epoch_train_loss:.4f}, Train Acc: {epoch_train_accuracy:.2f}%, "
              f"Test Loss: {epoch_test_loss:.4f}, Test Acc: {epoch_test_accuracy:.2f}%")

        # Save the model if it's the best one so far (based on test accuracy)
        # Ensure SAVE_DIR and model_save_path are correctly defined for the specific model (CNN/RNN)
        if epoch_test_accuracy > best_accuracy:
            best_accuracy = epoch_test_accuracy
            # These paths should be defined globally in your script's Step 6 section
            # For CNN Only:
            SAVE_DIR = os.path.join(PROJECT_ROOT_DIR, 'saved_models_cnn_only')
            model_save_path = os.path.join(SAVE_DIR, 'speaker_cnn_best.pth')
            # For RNN Only:
            # SAVE_DIR = os.path.join(PROJECT_ROOT_DIR, 'saved_models_rnn_only')
            # model_save_path = os.path.join(SAVE_DIR, 'speaker_rnn_best.pth')

            os.makedirs(SAVE_DIR, exist_ok=True)
            torch.save(model.state_dict(), model_save_path)
            print(f"New best model saved with Test Accuracy: {best_accuracy:.2f}%")

        model.train() # Set model back to training mode for the next epoch

# Call the train_model function to start training
# Ensure model, train_loader, test_loader, criterion, optimizer, and num_epochs are defined.
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=100)

In [None]:
# ==============================================================================
# Step 6: Save the Trained Model and Speaker Mapping (Final Save)
# ==============================================================================
SAVE_DIR = os.path.join(PROJECT_ROOT_DIR, 'saved_models_cnn_only') # Changed save directory
os.makedirs(SAVE_DIR, exist_ok=True)
model_final_save_path = os.path.join(SAVE_DIR, 'speaker_cnn_final.pth') # Changed model name
mapping_save_path = os.path.join(SAVE_DIR, 'speaker_mapping_cnn_only.joblib') # Changed mapping name

joblib.dump(speaker_mapping, mapping_save_path)
torch.save(model.state_dict(), model_final_save_path)

print(f"\n--- Final Trained Model Saved to: {model_final_save_path} ---")
print(f"Speaker Mapping Saved to: {mapping_save_path}")

In [None]:
# ==============================================================================
# Step 7: Inference (Detect a Person from a New Audio File - Interactive Upload)
# ==============================================================================
from google.colab import files
import soundfile as sf

def predict_speaker_from_audio(model, audio_file_path, speaker_mapping,
                               target_sr=16000, num_mfcc=40, n_fft=400, hop_length=160):
    model.eval()
    mfcc_transform = MFCC(sample_rate=target_sr, n_mfcc=num_mfcc, melkwargs={'n_fft': n_fft, 'hop_length': hop_length})
    max_len_samples = 30 * target_sr

    try:
        if not os.path.exists(audio_file_path):
            raise FileNotFoundError(f"Audio file not found: {audio_file_path}")

        waveform, sample_rate = torchaudio.load(audio_file_path)
        if sample_rate != target_sr:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sr)
            waveform = resampler(waveform)
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        elif waveform.ndim == 1:
            waveform = waveform.unsqueeze(0)

        if waveform.shape[1] > max_len_samples:
            waveform = waveform[:, :max_len_samples]
        elif waveform.shape[1] < max_len_samples:
            padding = max_len_samples - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, padding))

        mfcc_features = mfcc_transform(waveform)

        if mfcc_features.ndim == 3 and mfcc_features.shape[0] == 1:
            mfcc_features = mfcc_features.squeeze(0)

        input_tensor = mfcc_features.unsqueeze(0).to(next(model.parameters()).device)

        with torch.no_grad():
            outputs = model(input_tensor)
            probabilities = torch.softmax(outputs, dim=1)
            confidence, predicted_id_tensor = torch.max(probabilities, 1)

            predicted_id = predicted_id_tensor.item()
            predicted_confidence = confidence.item()

        predicted_email = speaker_mapping[predicted_id]

        return predicted_email, predicted_confidence

    except Exception as e:
        print(f"Error during inference for {audio_file_path}: {e}")
        return None, None

# Reload the best saved model and mapping for inference
loaded_model = SpeakerCNN(num_speakers=num_speakers, num_mfcc=train_dataset.num_mfcc).to(device) # Changed model class
best_model_path = os.path.join(PROJECT_ROOT_DIR, 'saved_models_cnn_only', 'speaker_cnn_best.pth') # Changed path/name
model_final_save_path = os.path.join(PROJECT_ROOT_DIR, 'saved_models_cnn_only', 'speaker_cnn_final.pth') # Changed path/name

if os.path.exists(best_model_path):
    loaded_model.load_state_dict(torch.load(best_model_path, map_location=device))
    print(f"Loaded best model from: {best_model_path}")
else:
    loaded_model.load_state_dict(torch.load(model_final_save_path, map_location=device))
    print(f"Loaded final model from: {model_final_save_path} (Best model not found)")

loaded_speaker_mapping = joblib.load(os.path.join(PROJECT_ROOT_DIR, 'saved_models_cnn_only', 'speaker_mapping_cnn_only.joblib')) # Changed mapping name

# Load existing_files_df to get names
try:
    full_df = pd.read_csv(CSV_PATH)
    full_df['audio_path'] = full_df['audio_file'].apply(lambda x: os.path.join(AUDIO_FOLDER_PATH, x))
    existing_files_df_temp = full_df[full_df['audio_path'].apply(os.path.exists)].copy().reset_index(drop=True)
    existing_files_df_temp['speaker_id'] = existing_files_df_temp['email'].astype('category').cat.codes
    existing_files_df = existing_files_df_temp
    print("Reloaded existing_files_df for name lookup.")
except Exception as e:
    print(f"Error reloading existing_files_df for name lookup: {e}")
    existing_files_df = pd.DataFrame({'email': [], 'name': []})


print("\n--- Upload an audio file from your PC for speaker detection ---")
uploaded_files = files.upload()

if uploaded_files:
    uploaded_file_name = list(uploaded_files.keys())[0]
    uploaded_file_path = os.path.join('/content/', uploaded_file_name)

    print(f"\nUploaded file: {uploaded_file_name}")
    print(f"File saved to: {uploaded_file_path}")

    print(f"\n--- Performing Inference on the uploaded audio file ---")

    detected_email, confidence = predict_speaker_from_audio(
        loaded_model, uploaded_file_path, loaded_speaker_mapping,
        num_mfcc=train_dataset.num_mfcc
    )

    if detected_email:
        print("\n--- Detection Result ---")
        print(f"Corresponding Email ID: {detected_email}")
        print(f"Confidence: {confidence:.4f}\n")

    else:
        print("Detection failed for the uploaded file. No matching speaker found or an error occurred.")
        print("Ensure it's a clear recording of one of the trained speakers.")
        print(f"Best confidence achieved (if any): {confidence:.4f}")

else:
    print("No file was uploaded.")
