# Imports

In [None]:
# SciKit Learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# TensorFlow
from tensorflow.keras import regularizers
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, LeakyReLU
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# Vision Transformer

from timm.models import vit_base_patch16_224

# PyTorch
import torch
import torch.nn as nn
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

# Data Analysis
import pandas as pd
import numpy as np

# Audio Feature Extraction
import librosa

# Image Analysis
from PIL import Image

# General
from google.colab import drive
import os
import zipfile
from tqdm import tqdm
import matplotlib.pyplot as plt
import shutil
import re
from io import BytesIO, TextIOWrapper

# Constants

In [None]:
# Data paths
audio_zip_folder_2021 = "2021(audio).zip"
spec_zip_folder_2021 = "2021(spec).zip"
audio_zip_folder_2022 = "2022(audio).zip"
spec_zip_folder_2022 = "2022(spec).zip"
# These are our assumptions
top_audio_folder_2021 = "2021(audio)/"
top_spec_folder_2021 = "2021(spec)/"
top_audio_folder_2022 = "2022(audio)/"
top_spec_folder_2022 = "2022(spec)/"

# Folders
root_folder = "/content/"
drive_folder = root_folder + "drive/MyDrive/"
audio_folder_2021 = root_folder + "2021_audio/"
spec_folder_2021 = root_folder + "2021_spec/"
audio_folder_2022 = root_folder + "2022_audio/"
spec_folder_2022 = root_folder + "2022_spec/"

# Test paths
audio_zip_folder_2023 = "2023(audio).zip"
spec_zip_folder_2023 = "2023(spec).zip"
top_audio_folder_2023 = "2023(audio)/"
top_spec_folder_2023 = "2023(spec)/"
audio_folder_2023 = root_folder + "2023_audio/"
spec_folder_2023 = root_folder + "2023_spec/"

# Constants
perform_cross_validation = False
train_and_save_classifiers = False

perform_train = perform_cross_validation or train_and_save_classifiers
perform_test = True

fnn_model_name = "fnn.keras"
transformer_model_name = "transformer.pth"

# Preparing the Dataset

## Unzipping the Data

In [None]:
# Mount Google Drive
drive.mount('/content/drive/')

if perform_train:
    # Unzipping 2021 data
    if not os.path.isdir(audio_folder_2021):
        with zipfile.ZipFile(drive_folder + audio_zip_folder_2021, 'r') as zip_ref:
            zip_ref.extractall(audio_folder_2021)

    if not os.path.isdir(spec_folder_2021):
        with zipfile.ZipFile(drive_folder + spec_zip_folder_2021, 'r') as zip_ref:
            zip_ref.extractall(spec_folder_2021)

    # Unzipping 2022 data
    if not os.path.isdir(audio_folder_2022):
        with zipfile.ZipFile(drive_folder + audio_zip_folder_2022, 'r') as zip_ref:
            zip_ref.extractall(audio_folder_2022)

    if not os.path.isdir(spec_folder_2022):
        with zipfile.ZipFile(drive_folder + spec_zip_folder_2022, 'r') as zip_ref:
            zip_ref.extractall(spec_folder_2022)

if perform_test:
    # Unzipping the test data
    if not os.path.isdir(audio_folder_2023):
        with zipfile.ZipFile(drive_folder + audio_zip_folder_2023, 'r') as zip_ref:
            zip_ref.extractall(audio_folder_2023)

    if not os.path.isdir(spec_folder_2023):
        with zipfile.ZipFile(drive_folder + spec_zip_folder_2023, 'r') as zip_ref:
            zip_ref.extractall(spec_folder_2023)


Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


## Creating the DataFrame

In [None]:
def load_data_for_year(audio_folder, spec_folder, top_audio_folder, top_spec_folder):
    dataframes = []

    data_dir = audio_folder + top_audio_folder

    for loc in os.listdir(data_dir):
        if loc == ".DS_Store" or loc == "__MACOSX":
            continue

        train_dir = data_dir + "/" + loc

        audio_dir = audio_folder + top_audio_folder + loc
        spec_dir = spec_folder + top_spec_folder + loc

        # Loading the train_labels
        loc_train_labels = pd.read_csv(train_dir + "/train_labels.csv")
        loc_dataframe = pd.DataFrame(loc_train_labels)

        # Updating the audio paths
        loc_dataframe["audio_path"] = audio_dir + "/" + loc_dataframe["fname"]

        # Updating the spec paths
        loc_dataframe["spec_path"] = spec_dir + "/" + loc_dataframe["fname"].apply(lambda x: x.strip(".mp3") + ".png")

        dataframes.append(loc_dataframe)

    # Putting everything together!
    return pd.concat(dataframes)

# List of label columns
label_columns = ["AMRO", "BHCO", "CHSW", "EUST", "GRCA", "HOSP", "HOWR", "NOCA", "RBGU", "RWBL"]

if perform_train:
    data_2021 = load_data_for_year(audio_folder_2021, spec_folder_2021,
                                top_audio_folder_2021, top_spec_folder_2021)
    data_2022 = load_data_for_year(audio_folder_2022, spec_folder_2022,
                                top_audio_folder_2022, top_spec_folder_2022)

    data = pd.concat([data_2021, data_2022], axis=0, ignore_index=True)
    print(data.head())

                      fname  AMRO  BHCO  CHSW  EUST  GRCA  HOSP  HOWR  NOCA  \
0  PEN0_20210609_055000.mp3     1     0     1     0     0     1     0     0   
1  PEN0_20210609_060000.mp3     0     1     1     0     0     1     1     0   
2  PEN0_20210609_061000.mp3     1     0     1     0     0     0     0     0   
3  PEN0_20210609_062000.mp3     0     1     1     1     0     1     0     0   
4  PEN0_20210609_063000.mp3     0     0     1     0     0     1     0     1   

   RBGU  RWBL                                         audio_path  \
0     0     0  /content/2021_audio/2021(audio)/PEN/PEN0_20210...   
1     0     0  /content/2021_audio/2021(audio)/PEN/PEN0_20210...   
2     0     0  /content/2021_audio/2021(audio)/PEN/PEN0_20210...   
3     0     0  /content/2021_audio/2021(audio)/PEN/PEN0_20210...   
4     0     0  /content/2021_audio/2021(audio)/PEN/PEN0_20210...   

                                           spec_path  
0  /content/2021_spec/2021(spec)/PEN/PEN0_2021060...  
1  /co

## Removing Blank Rows

In [None]:
if perform_train:
    # Check for rows with all-zero labels
    blank_labels = data[(data[label_columns].sum(axis=1) == 0)]

    # Print the rows with blank labels
    if not blank_labels.empty:
        print(f"Found {len(blank_labels)} rows with blank labels:")
    else:
        print("No rows with blank labels found.")

    # Remove rows with all-zero labels
    data = data[data[label_columns].sum(axis=1) > 0]

## Adding the Location Field

In [None]:
if perform_train:
    # Extract the three-letter location code from the audio_path
    data['location'] = data['audio_path'].str.extract(r'/(\w{3})/')

    # Label Encode encode the location column
    encoder = LabelEncoder()
    encoded_fields = encoder.fit_transform(data['location'])
    category_mapping = {category: i for i, category in enumerate(encoder.classes_)}
    print(category_mapping)
    data['location'] = encoder.fit_transform(data['location'])
    data.head()

{'BRY': 0, 'CAL': 1, 'FIO': 2, 'HAR': 3, 'KEA': 4, 'LAW': 5, 'LIF': 6, 'MCK': 7, 'PEN': 8, 'SYL': 9, 'WAT': 10}


## Fixing Discrepencies in the Data

In [None]:
if perform_train:
    data[label_columns] = data[label_columns].replace(2, 1)

## Shuffling the Data

In [None]:
if perform_train:
    data = data.sample(n=len(data)).reset_index(drop=True)
    data.head()

# Feature Extraction

## Audio Feature Extraction

In [None]:
# Constants
SAMPLE_RATE = 16000
FRAME_SIZE = 1024  # Typical frame size for audio processing
HOP_LENGTH = 512  # Overlap of 50%
N_MFCC = 13  # Number of MFCCs to extract
N_CHROMA = 12  # Number of chroma features
N_CONTRAST = 7  # Number of spectral contrast bands

# Preprocessing function for MFCCs, Chroma, and Spectral Contrast
def preprocess_audio_features(file_path):
    try:
        # Load audio file
        audio, sr = librosa.load(file_path, sr=SAMPLE_RATE)

        # Normalize audio
        audio = librosa.util.normalize(audio)

        # Extract MFCCs
        mfccs = librosa.feature.mfcc(
            y=audio, sr=sr, n_mfcc=N_MFCC, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH
        )
        mfcc_mean = np.mean(mfccs, axis=1)  # Compute mean across time

        # Extract Chroma features
        chroma = librosa.feature.chroma_stft(
            y=audio, sr=sr, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH
        )
        chroma_mean = np.mean(chroma, axis=1)  # Compute mean across time

        # Extract Spectral Contrast
        contrast = librosa.feature.spectral_contrast(
            y=audio, sr=sr, n_fft=FRAME_SIZE, hop_length=HOP_LENGTH
        )
        contrast_mean = np.mean(contrast, axis=1)  # Compute mean across time

        # Combine all features
        combined_features = np.concatenate([mfcc_mean, chroma_mean, contrast_mean])  # Shape: (N_MFCC + N_CHROMA + N_CONTRAST,)

        return combined_features
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return np.zeros(N_MFCC + N_CHROMA + N_CONTRAST)  # Return zeros if processing fails

# Function to generate feature set
def generate_audio_feature_set(data):
    audio_features = []
    for index, row in data.iterrows():
        audio_path = row['audio_path']
        features = preprocess_audio_features(audio_path)
        audio_features.append(features)  # Append the combined feature vector
    data["audio_features"] = audio_features
    return data

if perform_train:
    data = generate_audio_feature_set(data)

## Spectrogram Generation

In [None]:
class SpectrogramDataset(Dataset):
    def __init__(self, dataframe, transform=None):

        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]['spec_path']
        labels = self.dataframe.iloc[idx][1:11].values.astype('float32')

        # Load the spectrogram image
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(labels, dtype=torch.float32)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Creating the Models

## Building the FNN for Audio

In [None]:
# Define the Dense model
def build_dense_model(input_shape, num_classes):
    # Clear session before retraining
    K.clear_session()

    model = Sequential([
        Dense(2048, activation=LeakyReLU(alpha=0.01), kernel_regularizer=regularizers.L2(0.2), input_shape=(input_shape,)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(2048, activation=LeakyReLU(alpha=0.01), kernel_regularizer=regularizers.L2(0.2)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(1024, activation=LeakyReLU(alpha=0.01), kernel_regularizer=regularizers.L2(0.2)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(1024, activation=LeakyReLU(alpha=0.01), kernel_regularizer=regularizers.L2(0.2)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(1024, activation=LeakyReLU(alpha=0.01), kernel_regularizer=regularizers.L2(0.2)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(num_classes, activation='sigmoid')
    ])
    model.compile(
        optimizer=SGD(learning_rate=1.0, momentum=0.9),
        loss='binary_crossentropy',
        metrics=['accuracy', Precision(), Recall()]
    )
    return model

# Callbacks for training
reduce_lr_fnn = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5)

## Building the Transformer Model

In [None]:
class ViTClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ViTClassifier, self).__init__()
        # Load pre-trained ViT
        self.vit = vit_base_patch16_224(pretrained=True)
        # Modify the classifier head
        self.vit.head = nn.Linear(self.vit.head.in_features, num_classes)

    def forward(self, x):
        return self.vit(x)

# Update train_one_epoch and evaluate to return precision and recall
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    all_preds = []
    all_labels = []

    for inputs, labels in tqdm(dataloader, desc="Training", leave=False):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        preds = torch.sigmoid(outputs)
        all_preds.extend(preds.detach().cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    all_preds = (torch.tensor(all_preds) >= 0.5).int().numpy()
    precision = precision_score(all_labels, all_preds, average="micro")
    recall = recall_score(all_labels, all_preds, average="micro")
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average="micro")
    return total_loss / len(dataloader), accuracy, precision, recall, f1


def evaluate(model, dataloader, criterion, device, split_name="Validation"):
    model.eval()
    total_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc=f"{split_name}", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels.float())

            total_loss += loss.item()

            preds = torch.sigmoid(outputs)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    all_preds = (torch.tensor(all_preds) >= 0.5).int().numpy()
    precision = precision_score(all_labels, all_preds, average="micro")
    recall = recall_score(all_labels, all_preds, average="micro")
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average="micro")
    return all_preds, total_loss / len(dataloader), accuracy, precision, recall, f1

def train_and_evaluate(model, train_dataloader, val_dataloader, test_dataloader, num_epochs, device, save_model=False):
    criterion = nn.BCEWithLogitsLoss()
    optimizer = Adam(model.parameters(), lr=1e-4, weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.7)

    best_val_loss = float("inf")
    patience = 10
    epochs_without_improvement = 0

    # Save metrics for plotting
    metrics = {
        "train_loss": [],
        "val_loss": [],
        "train_accuracy": [],
        "val_accuracy": [],
        "train_precision": [],
        "val_precision": [],
        "train_recall": [],
        "val_recall": [],
        "train_f1": [],
        "val_f1": [],
    }

    save_path = f"{transformer_model_name}"

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # Training
        train_loss, train_accuracy, train_precision, train_recall, train_f1 = train_one_epoch(
            model, train_dataloader, criterion, optimizer, device
        )

        # Validation
        preds, val_loss, val_accuracy, val_precision, val_recall, val_f1 = evaluate(
            model, val_dataloader, criterion, device, split_name="Validation"
        )

        # Store metrics
        metrics["train_loss"].append(train_loss)
        metrics["val_loss"].append(val_loss)
        metrics["train_accuracy"].append(train_accuracy)
        metrics["val_accuracy"].append(val_accuracy)
        metrics["train_precision"].append(train_precision)
        metrics["val_precision"].append(val_precision)
        metrics["train_recall"].append(train_recall)
        metrics["val_recall"].append(val_recall)
        metrics["train_f1"].append(train_f1)
        metrics["val_f1"].append(val_f1)

        scheduler.step()

        print(f"  Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train F1: {train_f1:.4f}")
        print(f"  Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_without_improvement = 0
            if save_model:
                torch.save(model, save_path)
                print(f"  Best model saved to {save_path}.")

    if type(test_dataloader) != type(None):
        print("\nEvaluating on the Test Set...")
        preds, test_loss, test_accuracy, test_precision, test_recall, test_f1 = evaluate(
            model, test_dataloader, criterion, device, split_name="Test"
        )
        print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}, Test F1: {test_f1:.4f}")

        return preds, metrics
    return [], metrics

# Training the Models

## Running the Audio Model

In [None]:
def run_fnn_model(train, train_labels, validation, validation_labels, test, num_epochs, save_model=False):
    # Input shape and number of classes
    audio_features_train = [arr.tolist() for arr in train["audio_features"]]
    audio_features_validation = [arr.tolist() for arr in validation["audio_features"]]

    # Add location features to audio features
    train_combined_features = np.hstack((np.array(audio_features_train), train["location"].to_numpy().reshape(-1, 1)))
    validation_combined_features = np.hstack((np.array(audio_features_validation), validation["location"].to_numpy().reshape(-1, 1)))

    input_shape = train_combined_features.shape[1]  # Number of MFCC features (e.g., 13)
    num_classes = train_labels.shape[1]  # Number of bird species (e.g., 10)

    # Build the model
    fnn_model = build_dense_model(input_shape, num_classes)
    fnn_model.summary()

    # Train the model
    fnn_history = fnn_model.fit(
        train_combined_features, train_labels,
        validation_data=(validation_combined_features, validation_labels),  # Include validation data
        batch_size=32,
        epochs=num_epochs,
        callbacks=[reduce_lr_fnn],
        verbose=1
    )

    if save_model:
        fnn_model.save(fnn_model_name)

    if type(test) != type(None):
        audio_features_test = [arr.tolist() for arr in test["audio_features"]]
        test_combined_features = np.hstack((np.array(audio_features_test), test["location"].to_numpy().reshape(-1, 1)))

        # Run on test data
        fnn_preds = fnn_model.predict(test_combined_features)

        return fnn_preds, fnn_history
    return [], fnn_history

## Running the Transformer Model

In [None]:
def run_transformer_model(train, train_labels, validation, validation_labels, test, num_epochs, save_model=False):
    transformer_model = ViTClassifier(num_classes=len(label_columns))
    # Set up device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    transformer_model.to(device)

    # Create datasets for training, validation, and testing
    train_dataset = SpectrogramDataset(dataframe=train, transform=transform)
    val_dataset = SpectrogramDataset(dataframe=validation, transform=transform)

    # Create DataLoaders for training, validation, and testing
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=False)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    if type(test) != type(None):
        test_dataset = SpectrogramDataset(dataframe=test, transform=transform)
        test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    else:
        test_dataloader = None

    # Train and test
    transformer_preds, transformer_history = train_and_evaluate(
        model=transformer_model,
        train_dataloader=train_dataloader,
        val_dataloader=val_dataloader,
        test_dataloader=test_dataloader,
        num_epochs=num_epochs,
        device=device,
        save_model=save_model
    )

    return transformer_preds, transformer_history

## Visualizing the Performance Metrics

In [None]:
def plot_metrics(fnn_history, transformer_history):
    fig, axs = plt.subplots(2, 4, figsize=(20, 8))

    # Plotting the FNN metrics
    axs[0, 0].plot(fnn_history.history['accuracy'], label='Training Accuracy', color='blue')
    axs[0, 0].plot(fnn_history.history['val_accuracy'], label='Validation Accuracy', color='orange')
    axs[0, 0].set_title("FNN Accuracy")
    axs[0, 0].set_xlabel('Epochs')
    axs[0, 0].set_ylabel('Accuracy')
    axs[0, 0].legend()

    axs[0, 1].plot(fnn_history.history['loss'], label='Training Loss', color='blue')
    axs[0, 1].plot(fnn_history.history['val_loss'], label='Validation Loss', color='orange')
    axs[0, 1].set_title("FNN Loss")
    axs[0, 1].set_xlabel('Epochs')
    axs[0, 1].set_ylabel('Loss')
    axs[0, 1].legend()

    axs[0, 2].plot(fnn_history.history['precision'], label='Training Precision', color='blue')
    axs[0, 2].plot(fnn_history.history['val_precision'], label='Validation Precision', color='orange')
    axs[0, 2].set_title("FNN Precision")
    axs[0, 2].set_xlabel('Epochs')
    axs[0, 2].set_ylabel('Precision')
    axs[0, 2].legend()

    axs[0, 3].plot(fnn_history.history['recall'], label='Training Recall', color='blue')
    axs[0, 3].plot(fnn_history.history['val_recall'], label='Validation Recall', color='orange')
    axs[0, 3].set_title("FNN Recall")
    axs[0, 3].set_xlabel('Epochs')
    axs[0, 3].set_ylabel('Recall')
    axs[0, 3].legend()


    # Plotting the Transformer metrics
    epochs = range(1, len(transformer_history["train_loss"]) + 1)

    axs[1, 0].plot(epochs, transformer_history['train_accuracy'], label='Training Accuracy', color='blue')
    axs[1, 0].plot(epochs, transformer_history['val_accuracy'], label='Validation Accuracy', color='orange')
    axs[1, 0].set_title("Transformer Accuracy")
    axs[1, 0].set_xlabel('Epochs')
    axs[1, 0].set_ylabel('Accuracy')
    axs[1, 0].legend()

    axs[1, 1].plot(epochs, transformer_history['train_loss'], label='Training Loss', color='blue')
    axs[1, 1].plot(epochs, transformer_history['val_loss'], label='Validation Loss', color='orange')
    axs[1, 1].set_title("Transformer Loss")
    axs[1, 1].set_xlabel('Epochs')
    axs[1, 1].set_ylabel('Loss')
    axs[1, 1].legend()

    axs[1, 2].plot(epochs, transformer_history['train_precision'], label='Training Precision', color='blue')
    axs[1, 2].plot(epochs, transformer_history['val_precision'], label='Validation Precision', color='orange')
    axs[1, 2].set_title('Transformer Precision')
    axs[1, 2].set_xlabel('Epochs')
    axs[1, 2].set_ylabel('Precision')
    axs[1, 2].legend()

    axs[1, 3].plot(epochs, transformer_history['train_recall'], label='Training Recall', color='blue')
    axs[1, 3].plot(epochs, transformer_history['val_recall'], label='Validation Recall', color='orange')
    axs[1, 3].set_title('Transformer Recall')
    axs[1, 3].set_xlabel('Epochs')
    axs[1, 3].set_ylabel('Recall')
    axs[1, 3].legend()

    plt.tight_layout()

    plt.show()

## Performing K-Fold Cross Validation

In [None]:
if perform_cross_validation:
    f1_scores = []
    selected_thresolds = []
    n_folds = 5
    fnn_epochs = 100
    transformer_epochs = 20

    for i in range(n_folds):
        # Split the data to train, validation and test
        train, test = train_test_split(data, test_size=0.15)
        train, validation = train_test_split(train, test_size=0.15)

        train_labels = train[label_columns]
        validation_labels = validation[label_columns]
        test_labels = test[label_columns]

        # Run both models
        fnn_preds, fnn_history = run_fnn_model(train, train_labels, validation, validation_labels, test, fnn_epochs)
        transformer_preds, transformer_history = run_transformer_model(train, train_labels, validation, validation_labels, test, transformer_epochs)

        # Combine their probabilities
        final_preds = (fnn_preds + transformer_preds) / 2

        # Calculate F1
        thresholds = np.linspace(0, 1, 101)
        best_threshold = 0
        best_f1 = 0
        for threshold in thresholds:
            pred_labels_binary = (final_preds >= threshold).astype(int)

            f1 = f1_score(test_labels, pred_labels_binary, average='micro')
            if f1 > best_f1:
                best_f1 = f1
                best_threshold = threshold

        print(f"F1: {best_f1} at threshold: {best_threshold}")

        f1_scores.append(best_f1)
        selected_thresolds.append(best_threshold)

        # Plotting the metrics
        plot_metrics(fnn_history, transformer_history)

    print(f"F1 mean: {np.mean(f1_scores)} and std-dev: {np.std(f1_scores)}")
    print(f"Threshold mean: {np.mean(selected_thresolds)} and std-dev: {np.std(selected_thresolds)}")

## Training the Models on All the Data

In [None]:
if train_and_save_classifiers:
    train, validation = train_test_split(data, test_size=0.15)
    train_labels = train[label_columns]
    validation_labels = validation[label_columns]

    epochs_fnn = 200
    epochs_transformer = 40

    _, fnn_history = run_fnn_model(train, train_labels, validation, validation_labels, None, epochs_fnn, save_model=True)
    shutil.copy(root_folder + fnn_model_name, drive_folder + fnn_model_name)
    _, transformer_history = run_transformer_model(train, train_labels, validation, validation_labels, None, epochs_transformer, save_model=True)

    # Plotting the metrics
    plot_metrics(fnn_history, transformer_history)

    #shutil.copy(root_folder + fnn_model_name, drive_folder + fnn_model_name)
    shutil.copy(root_folder + transformer_model_name, drive_folder + transformer_model_name)

# Running the Data on Blind Test Data

## Preparing the Test Data Frames

In [None]:
predictions_zip = "2023_predictions.zip"
predictions_folder = "2023_predictions/"

if not os.path.isdir(predictions_folder):
    with zipfile.ZipFile(root_folder + predictions_zip, 'r') as zip_ref:
        zip_ref.extractall()


def load_test_data():
    dataframes = []

    data_dir = root_folder + predictions_folder

    # Obtained from a label encoder
    location_encodings = {'BRY': 0, 'CAL': 1, 'FIO': 2, 'HAR': 3, 'KEA': 4,
                          'LAW': 5, 'LIF': 6, 'MCK': 7, 'PEN': 8, 'SYL': 9,
                          'WAT': 10}

    for loc in os.listdir(data_dir):
        if loc == ".DS_Store" or loc == "__MACOSX":
            continue

        loc_dir = data_dir + "/" + loc

        audio_dir = audio_folder_2023 + top_audio_folder_2023 + loc
        spec_dir = spec_folder_2023 + top_spec_folder_2023 + loc

        # Loading the predictions
        test_labels = pd.read_csv(loc_dir + "/test_labels.csv")
        predictions_dataframe = pd.DataFrame(test_labels)

        # Updating the audio paths
        predictions_dataframe["audio_path"] = audio_dir + "/" + predictions_dataframe["fname"]

        # Updating the spec paths
        predictions_dataframe["spec_path"] = spec_dir + "/" + predictions_dataframe["fname"].apply(lambda x: x.strip(".mp3") + ".png")

        # Add the location encodings
        predictions_dataframe['location'] = location_encodings[loc]

        dataframes.append(predictions_dataframe)

    # Putting everything together!
    return dataframes

blind_test_data = load_test_data()
blind_test_data[0].head()

## Perform Audio Feature Extraction

In [None]:
new_test_data = []

for loc_df in blind_test_data:
    new_test_data.append(generate_audio_feature_set(loc_df))

blind_test_data = new_test_data

## Preparing the Data for the Models

In [None]:
def create_fnn_input(test):
    audio_features_test = [arr.tolist() for arr in test["audio_features"]]
    test_combined_features = np.hstack((np.array(audio_features_test), test["location"].to_numpy().reshape(-1, 1)))

    return test_combined_features

def create_transformer_input(test):
    test_dataset = SpectrogramDataset(dataframe=test, transform=transform)
    test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    return test_dataloader

## Loading the Models

In [None]:
# Load a model saved in the SavedModel format
fnn_model = load_model(root_folder + fnn_model_name, custom_objects={'LeakyReLU': LeakyReLU})

# Verify the loaded model
fnn_model.summary()

transformer_model = torch.load(root_folder + transformer_model_name)

# Set the model to evaluation mode
transformer_model.eval()

  transformer_model = torch.load(root_folder + transformer_model_name)


ViTClassifier(
  (vit): VisionTransformer(
    (patch_embed): PatchEmbed(
      (proj): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      (norm): Identity()
    )
    (pos_drop): Dropout(p=0.0, inplace=False)
    (patch_drop): Identity()
    (norm_pre): Identity()
    (blocks): Sequential(
      (0): Block(
        (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (attn): Attention(
          (qkv): Linear(in_features=768, out_features=2304, bias=True)
          (q_norm): Identity()
          (k_norm): Identity()
          (attn_drop): Dropout(p=0.0, inplace=False)
          (proj): Linear(in_features=768, out_features=768, bias=True)
          (proj_drop): Dropout(p=0.0, inplace=False)
        )
        (ls1): Identity()
        (drop_path1): Identity()
        (norm2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (mlp): Mlp(
          (fc1): Linear(in_features=768, out_features=3072, bias=True)
          (act): GELU(approximate='none')


# Running the Models on Test Data

In [None]:
# Run on test data
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ideal_threshold = 0.15

predictions = []
for loc_df in blind_test_data:
    loc_name = "".join(set(loc_df['audio_path'].str.extract(r'/(\w{3})/')[0]))

    print(f"Running the FNN model for location: {loc_name}")
    fnn_preds = fnn_model.predict(create_fnn_input(loc_df))

    transformer_preds = []
    with torch.no_grad():
        print(f"Running the Transformer model for location: {loc_name}")
        for inputs, labels in tqdm(create_transformer_input(loc_df), desc=f"Transformer Testing {loc_name}", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = transformer_model(inputs)
            preds = torch.sigmoid(outputs)
            transformer_preds.extend(preds.cpu().numpy())

    transformer_preds = (torch.tensor(transformer_preds) >= 0.5).int().numpy()

    # Combine the preds
    final_preds = (fnn_preds + transformer_preds) / 2

    pred_labels_binary = (final_preds >= ideal_threshold).astype(int)

    pred_df = loc_df.iloc[:, 0:len(label_columns)+1]
    pred_df[label_columns] = pred_labels_binary

    predictions.append(pred_df)

Running the FNN model for location: PEN
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
Running the Transformer model for location: PEN


                                                                      

Running the FNN model for location: LIF
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: LIF


                                                                      

Running the FNN model for location: MCK
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: MCK


                                                                      

Running the FNN model for location: CAL
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: CAL


                                                                      

Running the FNN model for location: FIO
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: FIO


                                                                      

Running the FNN model for location: SYL
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: SYL


                                                                      

Running the FNN model for location: LAW
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: LAW


                                                                      

Running the FNN model for location: HAR
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: HAR


                                                                      

Running the FNN model for location: BRY
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: BRY


                                                                      

Running the FNN model for location: KEA
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 




Running the Transformer model for location: KEA


                                                                      

Running the FNN model for location: WAT
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 




Running the Transformer model for location: WAT




## Save the Predictions to CSVs

In [None]:
for loc_df in predictions:
    loc_name = loc_df['fname'][0][:3]
    loc_df.to_csv(root_folder + predictions_folder + loc_name + "/test_labels.csv", index=False)

## Zip the Predictions

In [None]:
submission_name = "group_03"
shutil.make_archive(root_folder + submission_name, 'zip', root_folder, predictions_folder)

'/content/group_03.zip'

## Validate the Zip File

In [None]:
def validate_zip_file(zip_file_path):
    """
    Validate the input zip file against the specified requirements and generate a summary report.

    :param zip_file_path: Path to the input zip file.
    """
    report = []

    # Check if the file exists
    if not os.path.exists(zip_file_path):
        return [f"❌ The file '{zip_file_path}' does not exist. Please provide a valid file path."]

    # Check 1: Validate the zip file name
    if not re.match(r"group_\d{2}\.zip$", os.path.basename(zip_file_path)):
        report.append("❌ The zip file name must be in the format 'group_##.zip' (e.g., 'group_01.zip').")
    else:
        report.append("✅ Zip file name format is valid.")

    # Check if it is a valid zip file
    if not zipfile.is_zipfile(zip_file_path):
        report.append("❌ The file is not a valid zip file.")
        return report

    # Required structure and header
    required_locations = ["BRY", "CAL", "FIO", "HAR", "KEA", "LAW", "LIF", "MCK", "PEN", "SYL", "WAT"]
    required_header = ["fname", "AMRO", "BHCO", "CHSW", "EUST", "GRCA", "HOSP", "HOWR", "NOCA", "RBGU", "RWBL"]

    # Validate contents of the zip file
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Get the list of files in the zip archive
        zip_file_list = zip_ref.namelist()

        # Check 2: Validate top-level directory
        if not any(name.startswith("2023_predictions/") for name in zip_file_list):
            report.append("❌ The zip file must contain a top-level directory named '2023_predictions'.")
            return report  # Further checks depend on this directory existing

        report.append("✅ Top-level directory '2023' exists.")

        # Check 3: Validate subdirectories
        found_locations = set(
            name.split('/')[1]
            for name in zip_file_list
            if name.startswith("2023_predictions/") and len(name.split('/')) > 1
        )
        missing_subdirs = set(required_locations) - found_locations
        if missing_subdirs:
            report.append(f"❌ Missing subdirectories: {', '.join(missing_subdirs)}.")
        else:
            report.append("✅ All required subdirectories are present.")

        # Check 4: Validate files in subdirectories
        for location in required_locations:
            location_files = [
                name
                for name in zip_file_list
                if name.startswith(f"2023_predictions/{location}/") and len(name.split('/')) == 3
            ]

            if not location_files:
                report.append(f"❌ Subdirectory '{location}' does not contain any files.")
                continue

            test_labels_files = [f for f in location_files if f.endswith("test_labels.csv")]
            if not test_labels_files:
                report.append(f"❌ Subdirectory '{location}' does not contain a file named 'test_labels.csv'.")
                continue

            # Validate the contents of test_labels.csv
            test_labels_path = test_labels_files[0]
            try:
                with zip_ref.open(test_labels_path) as file:
                    df = pd.read_csv(TextIOWrapper(file, 'utf-8'), header=None)

                # Check header
                if list(df.iloc[0]) != required_header:
                    report.append(f"❌ The header in '{test_labels_path}' is incorrect. Expected: {required_header}")
                else:
                    report.append(f"✅ The header in '{test_labels_path}' is correct.")

                # Check column count
                if df.shape[1] != 11:
                    report.append(f"❌ '{test_labels_path}' does not have exactly 11 columns.")
                    continue

                # Validate first column and other columns
                first_column = df.iloc[1:, 0].astype(str)
                remaining_columns = df.iloc[1:, 1:].apply(pd.to_numeric, errors='coerce')
                if not first_column.str.endswith(".mp3").all():
                    report.append(f"❌ The first column of '{test_labels_path}' contains values that are not filenames with an 'mp3' extension.")
                if not ((remaining_columns == 0) | (remaining_columns == 1)).all().all():
                    report.append(f"❌ The non-header columns of '{test_labels_path}' contain values other than 0 or 1.")
                else:
                    report.append(f"✅ The file '{test_labels_path}' meets all column requirements.")
            except Exception as e:
                report.append(f"❌ Error reading or validating '{test_labels_path}': {e}")

    return report

validation_report = validate_zip_file(root_folder + submission_name + ".zip")

print("\nValidation Report:")
for line in validation_report:
    print(line)


Validation Report:
✅ Zip file name format is valid.
✅ Top-level directory '2023' exists.
✅ All required subdirectories are present.
✅ The header in '2023_predictions/BRY/test_labels.csv' is correct.
✅ The file '2023_predictions/BRY/test_labels.csv' meets all column requirements.
✅ The header in '2023_predictions/CAL/test_labels.csv' is correct.
✅ The file '2023_predictions/CAL/test_labels.csv' meets all column requirements.
✅ The header in '2023_predictions/FIO/test_labels.csv' is correct.
✅ The file '2023_predictions/FIO/test_labels.csv' meets all column requirements.
✅ The header in '2023_predictions/HAR/test_labels.csv' is correct.
✅ The file '2023_predictions/HAR/test_labels.csv' meets all column requirements.
✅ The header in '2023_predictions/KEA/test_labels.csv' is correct.
✅ The file '2023_predictions/KEA/test_labels.csv' meets all column requirements.
✅ The header in '2023_predictions/LAW/test_labels.csv' is correct.
✅ The file '2023_predictions/LAW/test_labels.csv' meets all 