# Table Of Contents
1. Libraries and Dependencies
2. Preprocessing of Dataset
    * Denoising the Audio Dataset
    * Making positive Negative pairs
3. Training the Model
4. Training using Twin Neural Network Concept

# Libraries and Dependencies

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import librosa as lb

# Preprocessing the Dataset
**Making Positive Negative Pairs of Dataset to work on it.**

In [None]:
import os
import random
import csv

# Base directory of your dataset
base_dir = "/kaggle/input/voxceleb1-audio-wav-files-for-india-celebrity/vox1_indian/content/vox_indian"

# Function to get all audio file paths grouped by ID
def get_audio_paths(base_dir):
    id_folders = [os.path.join(base_dir, id_folder) for id_folder in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, id_folder))]
    audio_dict = {}
    for id_folder in id_folders:
        id_name = os.path.basename(id_folder)
        audio_files = []
        for root, _, files in os.walk(id_folder):
            for file in files:
                if file.endswith(".wav"):
                    audio_files.append(os.path.join(root, file))
        audio_dict[id_name] = audio_files
    return audio_dict

# Generate positive and negative pairs
def generate_pairs(audio_dict, num_pairs=5000):
    positive_pairs = []
    negative_pairs = []
    
    # Generate positive pairs
    for id_name, audio_files in audio_dict.items():
        if len(audio_files) > 1:  # At least two files to create a pair
            positive_pairs.extend([(f1, f2, 1) for f1 in audio_files for f2 in audio_files if f1 != f2])
    random.shuffle(positive_pairs)
    positive_pairs = positive_pairs[:num_pairs]
    
    # Generate negative pairs
    all_ids = list(audio_dict.keys())
    while len(negative_pairs) < num_pairs:
        id1, id2 = random.sample(all_ids, 2)
        file1 = random.choice(audio_dict[id1])
        file2 = random.choice(audio_dict[id2])
        negative_pairs.append((file1, file2, 0))
    
    return positive_pairs, negative_pairs

# Save pairs to CSV
def save_pairs_to_csv(pairs, output_path):
    with open(output_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['file1', 'file2', 'label'])  # Header
        writer.writerows(pairs)

# Main execution
audio_dict = get_audio_paths(base_dir)
positive_pairs, negative_pairs = generate_pairs(audio_dict, num_pairs=5000)

# Combine and save
all_pairs = positive_pairs + negative_pairs
random.shuffle(all_pairs)
output_csv_path = "/kaggle/working/audio_pairs.csv"
save_pairs_to_csv(all_pairs, output_csv_path)

print(f"✅ Pairs saved to {output_csv_path}")

**Denoising the Audio Dataset + Making Positive Negative Pairs**

In [None]:
import os
import librosa
import soundfile as sf
import noisereduce as nr

# Base directory of your dataset
input_dir = "/kaggle/input/voxceleb1-audio-wav-files-for-india-cele/vox1_indian/content/vox_indian"
output_dir = "/kaggle/working/denoised_vox_indian"

# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Denoise function
def denoise_audio(input_path, output_path):
    try:
        # Load the audio file
        y, sr = librosa.load(input_path, sr=None)

        # Estimate noise using the first 0.5 seconds of the audio
        noise_part = y[:int(sr * 0.5)]
        y_denoised = nr.reduce_noise(y=y, sr=sr, y_noise=noise_part)

        # Save the denoised audio
        sf.write(output_path, y_denoised, sr)
        print(f"✅ Denoised and saved: {output_path}")
    except Exception as e:
        print(f"❌ Error processing {input_path}: {e}")

# Process each ID folder
for id_folder in os.listdir(input_dir):
    id_path = os.path.join(input_dir, id_folder)

    # Ensure it's a directory
    if os.path.isdir(id_path):
        # Create the output folder for the ID
        output_id_folder = os.path.join(output_dir, id_folder)
        os.makedirs(output_id_folder, exist_ok=True)

        # Process each .wav file in the ID folder
        for root, _, files in os.walk(id_path):
            for file in files:
                if file.endswith(".wav"):
                    input_file_path = os.path.join(root, file)
                    output_file_path = os.path.join(output_id_folder, file)
                    
                    # Denoise and save the audio
                    denoise_audio(input_file_path, output_file_path)

print(f"✅ All files have been denoised and saved in {output_dir}")

In [None]:
import os
import random
import csv

# Path to the spectrograms folder
input_dir = "/kaggle/input/spectrograms"

# Output CSV file
output_csv = "/kaggle/working/spectrogram_pairs.csv"

# Step 1: Organize spectrograms by IDs
spectrograms_by_id = {}
for person_id in os.listdir(input_dir):
    person_folder = os.path.join(input_dir, person_id)
    if os.path.isdir(person_folder):
        spectrograms_by_id[person_id] = [
            os.path.join(person_folder, file)
            for file in os.listdir(person_folder)
            if file.endswith(".png")
        ]

# Step 2: Create Positive Pairs
positive_pairs = []
for person_id, files in spectrograms_by_id.items():
    if len(files) > 1:  # Ensure there are at least 2 files to create pairs
        positive_pairs.extend([
            (files[i], files[j], 1)  # 1 indicates a positive pair
            for i in range(len(files))
            for j in range(i + 1, len(files))
        ])

# Shuffle and limit to 20,000 positive pairs
random.shuffle(positive_pairs)
positive_pairs = positive_pairs[:20000]

# Step 3: Create Negative Pairs
negative_pairs = []
person_ids = list(spectrograms_by_id.keys())
while len(negative_pairs) < 20000:
    id1, id2 = random.sample(person_ids, 2)  # Select two different IDs
    file1 = random.choice(spectrograms_by_id[id1])
    file2 = random.choice(spectrograms_by_id[id2])
    negative_pairs.append((file1, file2, 0))  # 0 indicates a negative pair

# Step 4: Combine and Save Pairs
all_pairs = positive_pairs + negative_pairs
random.shuffle(all_pairs)  # Shuffle the pairs before saving

# Save pairs to a CSV file
with open(output_csv, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["file1", "file2", "label"])  # Header row
    writer.writerows(all_pairs)

print(f"Pairs saved to: {output_csv}")

# Training the Model

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Dense, Lambda, GlobalAveragePooling2D
from tensorflow.keras.models import Model
import pandas as pd
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import csv

# Enable Mixed Precision
tf.keras.mixed_precision.set_global_policy("mixed_float16")

# Parameters
batch_size = 8
csv_file = "/kaggle/input/spectogram-pairs/spectrogram_pairs.csv"
input_shape = (224, 224, 1)
total_samples = 40001  # Ensure we have enough samples

# Ensure dataset generates enough samples per epoch
steps_per_epoch = (total_samples // batch_size) + 1
validation_samples = int(total_samples * 0.2)  # 20% for validation
validation_steps = (validation_samples // batch_size) + 1  

# Function to preprocess images
def preprocess_image(image_path):
    img = load_img(image_path, color_mode="grayscale")  # Load as grayscale
    img = img_to_array(img) / 255.0  # Normalize to [0,1]
    return img.astype("float32")  # Ensure TensorFlow compatibility

# Function to create a generator
def pair_generator(csv_file, repeat=False):
    df = pd.read_csv(csv_file)
    while True:  # Loop indefinitely for training
        for _, row in df.iterrows():
            img1 = preprocess_image(row["file1"])
            img2 = preprocess_image(row["file2"])
            label = np.array(row["label"], dtype=np.float32)  # Convert label to float32
            yield (img1, img2), label
        if not repeat:
            break  # Stop after one pass for validation

# Function to create a TensorFlow dataset
def create_tf_dataset(csv_file, batch_size, repeat=True):
    output_signature = (
        (tf.TensorSpec(shape=(224, 224, 1), dtype=tf.float32),  
         tf.TensorSpec(shape=(224, 224, 1), dtype=tf.float32)),  
        tf.TensorSpec(shape=(), dtype=tf.float32)  
    )

    dataset = tf.data.Dataset.from_generator(
        lambda: pair_generator(csv_file, repeat=repeat),
        output_signature=output_signature
    )
    
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

# Create datasets
train_dataset = create_tf_dataset(csv_file, batch_size, repeat=True)  # Train with repeat
val_dataset = create_tf_dataset(csv_file, batch_size, repeat=False)  # No repeat for validation

# Define the Siamese Model
def build_siamese_model(input_shape):
    base_model = ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    feature_extractor = Model(inputs=base_model.input, outputs=x)

    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape)

    # Stack grayscale images into 3 channels
    stack_a = tf.keras.layers.Concatenate()([input_a, input_a, input_a])
    stack_b = tf.keras.layers.Concatenate()([input_b, input_b, input_b])

    feat_a = feature_extractor(stack_a)
    feat_b = feature_extractor(stack_b)

    l1_distance = Lambda(lambda tensors: tf.abs(tensors[0] - tensors[1]))([feat_a, feat_b])
    output = Dense(1, activation="sigmoid")(l1_distance)

    model = Model(inputs=[input_a, input_b], outputs=output)
    return model

siamese_model = build_siamese_model(input_shape)

# Compile the Model
siamese_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

# Custom Callback for Epoch Tracking
class SaveEpochMetricsCallback(tf.keras.callbacks.Callback):
    def __init__(self, output_csv_path):
        super(SaveEpochMetricsCallback, self).__init__()
        self.output_csv_path = output_csv_path
        with open(self.output_csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["epoch", "loss", "accuracy", "val_loss", "val_accuracy"])

    def on_epoch_end(self, epoch, logs=None):
        epoch_metrics = [
            epoch + 1,  # Epoch number
            logs.get("loss"),  # Training loss
            logs.get("accuracy"),  # Training accuracy
            logs.get("val_loss"),  # Validation loss
            logs.get("val_accuracy")  # Validation accuracy
        ]
        with open(self.output_csv_path, mode='a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(epoch_metrics)
        print(f"✅ Epoch {epoch + 1} metrics saved to {self.output_csv_path}")

# Path to save CSV
epoch_metrics_csv = "/kaggle/working/epoch_metrics.csv"
save_metrics_callback = SaveEpochMetricsCallback(output_csv_path=epoch_metrics_csv)

# Train the model
with tf.device('/GPU:0'):  # Ensure GPU usage
    history = siamese_model.fit(
        train_dataset,
        validation_data=val_dataset,
        steps_per_epoch=steps_per_epoch,  
        validation_steps=validation_steps,  
        epochs=10,
        callbacks=[save_metrics_callback]
    )

# Save the model
siamese_model.save("/kaggle/working/siamese_resnet_model.h5")

# Evaluate the Model
with tf.device('/GPU:0'):
    loss, accuracy = siamese_model.evaluate(val_dataset, steps=validation_steps)
    print(f"🎯 Validation Accuracy: {accuracy:.2f}")

In [None]:
# Training the Model using Twin Neural Networking Concept

**Again Training the model using Twin Neural Networking Concept.For Referring to this Concept, [Do Check Here](https://www.mathworks.com/help/deeplearning/ug/train-twin-network-to-compare-images.html)**

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Dense, Lambda, GlobalAveragePooling2D
from tensorflow.keras.models import Model
import pandas as pd
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import csv

print("started")

# Enable Mixed Precision
tf.keras.mixed_precision.set_global_policy("mixed_float16")

# Parameters
batch_size = 8
csv_file = "/kaggle/input/spectogram-pairs/spectrogram_pairs.csv"
input_shape = (224, 224, 1)
total_samples = 40001  # Ensure we have enough samples

# Define dataset parameters
steps_per_epoch = (total_samples // batch_size) + 1
validation_samples = int(total_samples * 0.2)  # 20% for validation
validation_steps = (validation_samples // batch_size) + 1  

# Function to preprocess images
def preprocess_image(image_path):
    img = load_img(image_path, color_mode="grayscale")  # Load as grayscale
    img = img_to_array(img) / 255.0  # Normalize to [0,1]
    return img.astype("float32")  # Ensure TensorFlow compatibility

# Function to create a generator
def pair_generator(csv_file, repeat=False):
    df = pd.read_csv(csv_file)
    while True:  # Loop indefinitely for training
        for _, row in df.iterrows():
            img1 = preprocess_image(row["file1"])
            img2 = preprocess_image(row["file2"])
            label = np.array(row["label"], dtype=np.float32)  # Convert label to float32
            yield (img1, img2), label
        if not repeat:
            break  # Stop after one pass for validation

# Function to create a TensorFlow dataset
def create_tf_dataset(csv_file, batch_size, repeat=True):
    output_signature = (
        (tf.TensorSpec(shape=(224, 224, 1), dtype=tf.float32),  
         tf.TensorSpec(shape=(224, 224, 1), dtype=tf.float32)),  
        tf.TensorSpec(shape=(), dtype=tf.float32)  
    )

    dataset = tf.data.Dataset.from_generator(
        lambda: pair_generator(csv_file, repeat=repeat),
        output_signature=output_signature
    )
    
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

# Create datasets
train_dataset = create_tf_dataset(csv_file, batch_size, repeat=True)  # Train with repeat
val_dataset = create_tf_dataset(csv_file, batch_size, repeat=False)  # No repeat for validation

# Step 2: Build the Twin Neural Network
def build_twin_network(input_shape):
    base_model = ResNet50(weights="imagenet", include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False  # Freeze ResNet weights

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    feature_extractor = Model(inputs=base_model.input, outputs=x)

    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape)

    # Stack grayscale images into 3 channels
    stack_a = tf.keras.layers.Concatenate()([input_a, input_a, input_a])
    stack_b = tf.keras.layers.Concatenate()([input_b, input_b, input_b])

    feat_a = feature_extractor(stack_a)
    feat_b = feature_extractor(stack_b)

    # Compute L2 distance
    def euclidean_distance(vects):
        x, y = vects
        return tf.sqrt(tf.reduce_sum(tf.square(x - y), axis=1, keepdims=True))

    distance = Lambda(euclidean_distance)([feat_a, feat_b])
    output = Dense(1, activation="sigmoid")(distance)  # Binary classification (similar/dissimilar)

    model = Model(inputs=[input_a, input_b], outputs=output)
    return model

twin_model = build_twin_network(input_shape)

# Step 3: Define Contrastive Loss Function
def contrastive_loss(y_true, y_pred):
    margin = 1.0
    return tf.reduce_mean(y_true * tf.square(y_pred) + (1 - y_true) * tf.square(tf.maximum(margin - y_pred, 0)))

# Compile the Model
twin_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss=contrastive_loss,
    metrics=["accuracy"]
)

# Custom Callback for Epoch Tracking with Printing
class SaveAndPrintEpochMetrics(tf.keras.callbacks.Callback):
    def __init__(self, output_csv_path):
        super(SaveAndPrintEpochMetrics, self).__init__()
        self.output_csv_path = output_csv_path
        with open(self.output_csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["epoch", "loss", "accuracy", "val_loss", "val_accuracy"])

    def on_epoch_end(self, epoch, logs=None):
        epoch_metrics = [
            epoch + 1,  # Epoch number
            logs.get("loss"),  # Training loss
            logs.get("accuracy"),  # Training accuracy
            logs.get("val_loss"),  # Validation loss
            logs.get("val_accuracy")  # Validation accuracy
        ]
        # Save to CSV
        with open(self.output_csv_path, mode='a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(epoch_metrics)

        # Print the metrics after every epoch
        print(f"\n🔹 Epoch {epoch + 1} Completed")
        print(f"📉 Training Loss: {logs.get('loss'):.4f}")
        print(f"✅ Training Accuracy: {logs.get('accuracy'):.4f}")
        print(f"📊 Validation Loss: {logs.get('val_loss'):.4f}")
        print(f"🎯 Validation Accuracy: {logs.get('val_accuracy'):.4f}\n")

# Path to save CSV
epoch_metrics_csv = "/kaggle/working/epoch_metrics.csv"
save_print_callback = SaveAndPrintEpochMetrics(output_csv_path=epoch_metrics_csv)

# Train the model
with tf.device('/GPU:0'):  # Ensure GPU usage
    history = twin_model.fit(
        train_dataset,
        validation_data=val_dataset,
        steps_per_epoch=steps_per_epoch,  
        validation_steps=validation_steps,  
        epochs=10,
        callbacks=[save_print_callback]
    )

# Save the model
twin_model.save("/kaggle/working/twin_resnet_model.h5")

# Evaluate the Model
with tf.device('/GPU:0'):
    loss, accuracy = twin_model.evaluate(val_dataset, steps=validation_steps)
    print(f"🎯 Final Validation Accuracy: {accuracy:.4f}")
