In [114]:
# inputs: vector of all embeddings with corresponding species label, label for species of interest
# outputs: embedding vector of species of interest, threshold for classification

import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
import joblib  # For saving and loading the PCA model


In [115]:

class MakePrediction:
    def __init__(self, model_folder="./model"):
        """Initialize with PCA configuration and model folder path."""
        self.model_folder = model_folder
        os.makedirs(model_folder, exist_ok=True)

    def fit_pca(self, embeddings, n_components=142):
        """Fit PCA on embeddings and save the model."""
        self.pca = PCA(n_components=n_components)
        reduced_embeddings = self.pca.fit_transform(embeddings)
        joblib.dump(self.pca, os.path.join(self.model_folder, "pca_model.joblib"))
        return reduced_embeddings

    def load_pca(self):
        """Load the PCA model from the model folder."""
        pca_path = os.path.join(self.model_folder, "pca_model.joblib")
        if os.path.exists(pca_path):
            self.pca = joblib.load(pca_path)
        else:
            raise FileNotFoundError("PCA model not found. Please run fit_pca first.")

    def transform_pca(self, embeddings):
        """Normalize embeddings and transform using PCA."""
        if not hasattr(self.pca, "components_"):
            self.load_pca()
        normalized_embeddings = normalize(embeddings, axis=1, norm="l2")
        return self.pca.transform(normalized_embeddings)

    @staticmethod
    def calculate_prototype(embeddings):
        """Calculate the prototype vector for a single species."""
        return np.median(embeddings, axis=0, keepdims=True)

    def save_species_prototype(self, species_prototype):
        """Save the species of interest prototype."""
        np.save(os.path.join(self.model_folder, "species_prototype.npy"), species_prototype)

    def load_species_prototype(self):
        """Load the species of interest prototype."""
        prototype_path = os.path.join(self.model_folder, "species_prototype.npy")
        if os.path.exists(prototype_path):
            return np.load(prototype_path)
        raise FileNotFoundError("Species prototype not found. Please run make_classifier first.")

    def determine_threshold(self, embeddings, labels, soi_label, min_recall):
        """Determine cosine similarity threshold to achieve minimum recall."""
        reduced_embeddings = self.transform_pca(embeddings)

        # Extract SOI embeddings and calculate its prototype
        soi_mask = np.array(labels) == soi_label
        other_mask = ~soi_mask
        soi_embeddings = reduced_embeddings[soi_mask]
        other_embeddings = reduced_embeddings[other_mask]

        # Calculate the SOI prototype (median embedding of the species of interest)
        species_prototype = self.calculate_prototype(soi_embeddings)
        self.save_species_prototype(species_prototype)

        # Compute cosine similarities
        soi_similarities = cosine_similarity(soi_embeddings, species_prototype).flatten()
        other_similarities = cosine_similarity(other_embeddings, species_prototype).flatten()

        # Determine the minimum threshold for recall
        min_required = int(np.ceil(min_recall * len(soi_similarities)))
        threshold = np.sort(soi_similarities)[::-1][min_required - 1]

        self.threshold = threshold
        np.save(os.path.join(self.model_folder, "threshold.npy"), threshold)
        return threshold

    def make_classifier(self, embeddings, labels, soi_label, min_recall, n_components=142):
        """Create classifier with PCA, species prototype, and threshold."""
        # Fit PCA with the embeddings
        self.fit_pca(embeddings, n_components=min(n_components, embeddings.shape[0]))

        # Determine the threshold and save the SOI prototype
        self.soi_label = soi_label  # Store species of interest label
        self.threshold = self.determine_threshold(embeddings, labels, soi_label, min_recall)

        # Save the SOI label
        soi_label_path = os.path.join(self.model_folder, "soi_label.txt")
        with open(soi_label_path, "w") as f:
            f.write(soi_label)

    def classify(self, unknown_embeddings, baseline_probability=1.0 / 6):
        """
        Classify unknown embeddings and provide predictions with confidence scores.

        Args:
            unknown_embeddings (np.ndarray): Embeddings to classify.
            baseline_probability (float): Baseline probability for relative confidence (default 1/6).

        Returns:
            pd.DataFrame: Predictions, confidence scores, and relative confidence values.
        """
        if not hasattr(self, "species_prototype"):
            self.load_classifier()
    
        # Transform unknown embeddings using the trained PCA
        reduced_embeddings = self.transform_pca(unknown_embeddings)

        # Load the species prototype
        species_prototype = self.load_species_prototype()

        # Compute cosine similarity between unknown embeddings and the species prototype
        similarities = cosine_similarity(reduced_embeddings, species_prototype).flatten()

        # Classify based on the threshold
        predictions = [
            "species_of_interest" if similarity >= self.threshold else "not_species_of_interest"
            for similarity in similarities
        ]

        # Calculate confidence scores
        confidence_scores = similarities  # Cosine similarity serves as the confidence score

        # Calculate relative confidence scores
        relative_confidences = [
            ((score - baseline_probability) / baseline_probability) * 100
            for score in confidence_scores
        ]

        # Create a DataFrame to store results
        results_df = pd.DataFrame(
            {
                "Predicted Label": predictions,
                "Confidence Score": confidence_scores,
                "Relative Confidence (%)": [round(rc, 2) for rc in relative_confidences],
            }
        )

        return results_df
    
    def load_classifier(self):
        """Load PCA, species prototype, threshold, and SOI label from the model folder."""
        # Load PCA model
        self.load_pca()

        # Load species prototype
        species_prototype_path = os.path.join(self.model_folder, "species_prototype.npy")
        if os.path.exists(species_prototype_path):
            self.species_prototype = np.load(species_prototype_path)
        else:
            raise FileNotFoundError("Species prototype not found. Ensure it was saved during make_classifier.")

        # Load threshold
        threshold_path = os.path.join(self.model_folder, "threshold.npy")
        if os.path.exists(threshold_path):
            self.threshold = float(np.load(threshold_path))
        else:
            raise FileNotFoundError("Threshold not found. Ensure it was saved during make_classifier.")

        # Load species of interest label
        soi_label_path = os.path.join(self.model_folder, "soi_label.txt")
        if os.path.exists(soi_label_path):
            with open(soi_label_path, "r") as f:
                self.soi_label = f.read().strip()
        else:
            raise FileNotFoundError("Species of interest label not found. Ensure it was saved during make_classifier.")

def load_embeddings_labels(embeddings_file,labels_file):
    
    loaded = np.load(embeddings_file)
    embeddings = [loaded[key] for key in loaded]
    #embeddings = np.load(embeddings_file)
    
    labels = np.load(labels_file)
    return embeddings, labels

def load_embeddings_labels_folders(base_dir,folders,
                    label_name = 'labels.npy', embeddings_name = 'embeddings.npz'):
    
    embeddings_all = []
    labels_all = []
    for folder in folders:
        processed_dir = os.path.join(base_dir,folder,"processed")
        labels_file = os.path.join(processed_dir,label_name)
        embeddings_file = os.path.join(processed_dir,embeddings_name)

        embeddings, labels = load_embeddings_labels(embeddings_file,labels_file)
        embeddings_all += embeddings
        labels_all = np.concatenate((labels_all, labels))
        
    embeddings_npy = np.vstack(embeddings_all)
    labels_npy = np.vstack(labels_all)
    
    return embeddings_npy, labels_all


In [128]:
# Develop the classifier based on the embeddings and labels of the training dataset

# define training folder
#base_dir = "/home/leah_colossal_com/us_bird_train"
base_dir = "/home/leah_colossal_com/tbp_dataset/balanced_dataset/train_data"
folders = [name for name in os.listdir(base_dir) 
    if os.path.isdir(os.path.join(base_dir, name)) and name != 'model']
#folders = ['white_throated_sparrow','northern_cardinal','carolina_wren','eastern_towhee','kentucky_warbler']

model_folder = os.path.join(base_dir,"model") # this is where the saved classifier will go
      
# Make classifier based on training data
min_recall = 0.95 # Threshold of minium recall allowed
embeddings,labels = load_embeddings_labels_folders(base_dir,folders)

# make the predictor
predictor = MakePrediction(model_folder = model_folder)
#soi_label = 'eastern_towhee' # this is the label for the species of interest
soi_label = 'tooth_billed_pigeon' # this is the label for the species of interest

# ^ the classifier is a binary classifier of species of interest vs not. 

predictor.make_classifier(embeddings,labels,soi_label,min_recall)


In [None]:
## Classify, the code below uses the trained classifier to classify new datasets

In [129]:
# run the test set on this developed model

# load test embeddings
#test_base_dir = "/home/leah_colossal_com/us_bird_test"
test_base_dir = "/home/leah_colossal_com/tbp_dataset/balanced_dataset/test_data"

#folders = ['white_throated_sparrow','northern_cardinal','carolina_wren','eastern_towhee','kentucky_warbler']
embeddings_test,labels = load_embeddings_labels_folders(test_base_dir,folders)

results_df = predictor.classify(embeddings_test)


In [100]:
# run the test embeddings using the load_classifier function
# If you're re-using an classifier you've already developed, you can run the first two cells, then jump to here! 

test_base_dir = "/home/leah_colossal_com/us_bird_test"
folders = ['white_throated_sparrow','northern_cardinal','carolina_wren','eastern_towhee','kentucky_warbler']
model_folder = "/home/leah_colossal_com/us_bird_train/model" # This is from train, because that's where we made the model
      
predictor = MakePrediction(model_folder = model_folder)
predictor.load_classifier()

embeddings_test,labels = load_embeddings_labels_folders(test_base_dir,folders)
results_df = predictor.classify(embeddings_test)

In [130]:
# Generate metrics for the results. Giving accuracy, recall and F1

from sklearn.metrics import precision_score, recall_score, f1_score

# Extract the true labels for the species of interest
true_positive_mask = np.array(labels) == predictor.soi_label

# Get the predicted labels from the results DataFrame
predicted_labels = np.array(results_df['Predicted Label'])

# Binary true labels for species of interest (1 for SOI, 0 for others)
binary_true_labels = np.where(true_positive_mask, 1, 0)

# Binary predicted labels for species of interest (1 for 'species_of_interest', 0 for others)
binary_predicted_labels = np.where(predicted_labels == 'species_of_interest', 1, 0)

# Calculate metrics
accuracy_soi = sum(binary_true_labels == binary_predicted_labels) / len(binary_true_labels) if len(binary_true_labels) > 0 else 0
recall_soi = recall_score(binary_true_labels, binary_predicted_labels, zero_division=0)
precision_soi = precision_score(binary_true_labels, binary_predicted_labels, zero_division=0)
f1_soi = f1_score(binary_true_labels, binary_predicted_labels, zero_division=0)

# Print metrics
print(f"Accuracy for species of interest '{predictor.soi_label}': {accuracy_soi:.2%}")
print(f"Recall for species of interest '{predictor.soi_label}': {recall_soi:.2%}")
print(f"F1-score for species of interest '{predictor.soi_label}': {f1_soi:.2%}")


Accuracy for species of interest 'tooth_billed_pigeon': 98.78%
Recall for species of interest 'tooth_billed_pigeon': 100.00%
F1-score for species of interest 'tooth_billed_pigeon': 92.31%
