# 5nn

## NCBI 2020

In [14]:
# testing the folds
#!/usr/bin/env python3


encoders = [
    'PSRT',
]

csv_names = [
    'Yau2020_record_processed',
]


import numpy as np
import random
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder



from sklearn.model_selection import StratifiedKFold
from collections import Counter



from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)


###############################################################################
# 0. Seed Setting (Optional)
###############################################################################
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)


def find_labels_with_min_count(labels, min_count=15):
    """
    Find the positions of labels whose count is greater than or equal to `min_count`.

    Args:
        labels (list): A list of labels.
        min_count (int): The minimum count to filter labels (default=15).

    Returns:
        list: Indices of labels that meet the count condition.
    """
    # Count occurrences of each label
    label_counts = Counter(labels)
    
    # Find labels with count >= min_count
    valid_labels = {label for label, count in label_counts.items() if count >= min_count}
    
    # Find indices of valid labels
    indices = [i for i, label in enumerate(labels) if label in valid_labels]
    
    return indices




def compute_classification_metrics(y_true, y_pred, average='macro'):
    """
    Compute multiple classification metrics given true and predicted labels.
    Returns a dictionary with:
        - accuracy
        - balanced_accuracy
        - precision
        - recall
        - f1
        - roc_auc (if multi-class or binary)

    Note: For ROC-AUC with discrete predictions, we do a one-hot approach with
    'ovr' to handle multi-class. If there's only one class in y_true, ROC-AUC is None.
    """
    metrics_dict = {}

    # Basic stats
    acc = accuracy_score(y_true, y_pred)
    bal_acc = balanced_accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average=average, zero_division=0)
    rec = recall_score(y_true, y_pred, average=average, zero_division=0)
    f1 = f1_score(y_true, y_pred, average=average, zero_division=0)

    metrics_dict['accuracy'] = acc
    metrics_dict['balanced_accuracy'] = bal_acc
    metrics_dict[f'precision_{average}'] = prec
    metrics_dict[f'recall_{average}'] = rec
    metrics_dict[f'f1_{average}'] = f1

    # Attempt a multi-class or binary ROC-AUC with discrete predictions
    num_classes = len(np.unique(y_true))
    try:
        y_true_1hot = np.zeros((len(y_true), num_classes), dtype=int)
        y_pred_1hot = np.zeros((len(y_pred), num_classes), dtype=int)
        for i, lbl in enumerate(y_true):
            y_true_1hot[i, lbl] = 1
        for i, lbl in enumerate(y_pred):
            y_pred_1hot[i, lbl] = 1

        roc_val = roc_auc_score(
            y_true_1hot,
            y_pred_1hot,
            multi_class='ovr',
            average=average
        )
        metrics_dict[f'roc_auc_{average}'] = roc_val
    except ValueError:
        # e.g., if y_true has only one class
        metrics_dict[f'roc_auc_{average}'] = None

    return metrics_dict

def five_fold_5nn(distance_matrix, labels, n_neighbors=5, random_state=42):
    """
    Perform 5-fold stratified cross-validation using a distance matrix and
    5-NN classification. Computes multiple metrics on each fold and returns
    their average.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        n_neighbors (int): k in k-NN (default=5).
        random_state (int): seed for StratifiedKFold reproducibility.

    Returns:
        dict: A dictionary of average metrics (accuracy, balanced_accuracy,
              precision_weighted, recall_weighted, f1_weighted, roc_auc_weighted).
    """
    distance_matrix = np.asarray(distance_matrix)
    labels = np.asarray(labels)
    N = distance_matrix.shape[0]
    assert distance_matrix.shape == (N, N), "distance_matrix must be NxN"
    assert len(labels) == N, "labels length must match distance_matrix size"

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)

    # We'll accumulate metrics over the 5 folds
    metrics_list = []
    for train_idx, test_idx in skf.split(X=labels, y=labels):
        fold_preds = []
        fold_true = []

        for test_sample in test_idx:
            # Distances from test_sample to all others
            dist_row = distance_matrix[test_sample].copy()
            # Exclude the sample itself
            dist_row[test_sample] = np.inf

            # Only consider training set distances
            train_distances = dist_row[train_idx]

            # Indices in 'train_idx' of the n_neighbors nearest neighbors
            nn_in_train = np.argpartition(train_distances, n_neighbors)[:n_neighbors]
            nn_global_idx = train_idx[nn_in_train]

            # Majority vote
            neighbor_labels = labels[nn_global_idx]
            chosen_label = Counter(neighbor_labels).most_common(1)[0][0]

            fold_preds.append(chosen_label)
            fold_true.append(labels[test_sample])

        # Compute metrics for this fold
        fold_metrics = compute_classification_metrics(fold_true, fold_preds, average='macro')
        metrics_list.append(fold_metrics)
        
        # # 📤 Print metrics immediately
        # print(f"Fold {len(metrics_list)} results:")
        # for metric, value in fold_metrics.items():
        #     print(f"  {metric}: {value:.4f}" if value is not None else f"  {metric}: None")

    # Average metrics over folds
    avg_metrics = {}
    # We know the keys from one fold's result
    keys = metrics_list[0].keys()
    for k in keys:
        # Some might be None if there's an issue with ROC-AUC
        vals = [m[k] for m in metrics_list if m[k] is not None]
        if len(vals) > 0:
            avg_metrics[k] = np.mean(vals)
        else:
            avg_metrics[k] = None

    return avg_metrics


def multiple_seeds_5nn(distance_matrix, labels, seeds, n_neighbors=5):
    """
    Run the 5-fold 5-NN classification for multiple random_state seeds,
    then average the metrics across all runs.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        seeds (list of ints): A list of random seeds to test.
        n_neighbors (int): k in k-NN (default=5).

    Returns:
        dict: A dictionary of overall average metrics across all seeds.
    """
    all_runs_metrics = []
    for seed in seeds:
        run_metrics = five_fold_5nn(distance_matrix, labels,
                                    n_neighbors=n_neighbors,
                                    random_state=seed)
        all_runs_metrics.append(run_metrics)

    # Average across seeds
    avg_overall = {}
    # we'll look at keys from the first run
    keys = all_runs_metrics[0].keys()
    for k in keys:
        vals = [run[k] for run in all_runs_metrics if run[k] is not None]
        if len(vals) > 0:
            avg_overall[k] = np.mean(vals)
        else:
            avg_overall[k] = None

    return avg_overall



# -------------------------------
# Feature Configuration
# -------------------------------

features_needed = [('facet', 0)]

###############################################################################
# 5. Main
###############################################################################
if __name__ == "__main__":
    # (Optional) Set seed for reproducibility
    set_seed(42)

    # -------------------------------------------------------------------------
    # Example: Suppose you have data in a CSV and features in a .npy file
    # similar to your Siamese approach
    # -------------------------------------------------------------------------

    for encoder in encoders:
        print("encoder ", encoder)
        for data_name in csv_names:
            print("data ", data_name)
            for k in range(4,5):
                print("k ", k)

                for feature, max_dim in features_needed:
                    print(feature)
                    print(max_dim)
                    path_to_data = f'data/{data_name}'
                    df = pd.read_csv(f"{path_to_data}.csv")
                    x_list = df['Accession (version)'].to_list()
                    y_list = df['Family'].to_list()
                    x_list, y_list = zip(*sorted(zip(x_list, y_list)))
                    x_list = list(x_list)
                    y_list = list(y_list)
                    indices = find_labels_with_min_count(y_list, min_count=15)
                    x_list = [x_list[i] for i in indices]
                    y_list = [y_list[i] for i in indices]


                    path_to_features = f'distances/{encoder}/{data_name}'
                    # for d in range(max_dim + 1):
                    # print(d)
                    distance_matrix = np.load(path_to_features+f'/k{k}_distance_{feature}{max_dim}.npy')
                    subD = distance_matrix[np.ix_(indices, indices)]




                    seeds = np.arange(1,31)
                    le = LabelEncoder()
                    y_int = le.fit_transform(y_list)  # shape (N,)
                    y_np = np.array(y_int, dtype=np.int32)
                    labels = y_np
                    results = multiple_seeds_5nn(subD, labels, seeds, n_neighbors=5)
                    print("Overall average metrics across seeds:")
                    for metric_name, val in results.items():
                        print(f"{metric_name}: {val}")



                    print("=================================================================================")
                    print("=================================================================================")
                    print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")


encoder  PSRT
data  Yau2020_record_processed
k  4
facet
0
Overall average metrics across seeds:
accuracy: 0.9131424375917768
balanced_accuracy: 0.8867418771768333
precision_macro: 0.9153044040334631
recall_macro: 0.8867418771768333
f1_macro: 0.8923677004592127
roc_auc_macro: 0.942555741862991


## NCBI 2022

In [15]:
# testing the folds
#!/usr/bin/env python3


encoders = [
    'PSRT',
]

csv_names = [
    'Yau2022_record_processed',
]


import numpy as np
import random
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder



from sklearn.model_selection import StratifiedKFold
from collections import Counter



from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)


###############################################################################
# 0. Seed Setting (Optional)
###############################################################################
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)


def find_labels_with_min_count(labels, min_count=15):
    """
    Find the positions of labels whose count is greater than or equal to `min_count`.

    Args:
        labels (list): A list of labels.
        min_count (int): The minimum count to filter labels (default=15).

    Returns:
        list: Indices of labels that meet the count condition.
    """
    # Count occurrences of each label
    label_counts = Counter(labels)
    
    # Find labels with count >= min_count
    valid_labels = {label for label, count in label_counts.items() if count >= min_count}
    
    # Find indices of valid labels
    indices = [i for i, label in enumerate(labels) if label in valid_labels]
    
    return indices




def compute_classification_metrics(y_true, y_pred, average='macro'):
    """
    Compute multiple classification metrics given true and predicted labels.
    Returns a dictionary with:
        - accuracy
        - balanced_accuracy
        - precision
        - recall
        - f1
        - roc_auc (if multi-class or binary)

    Note: For ROC-AUC with discrete predictions, we do a one-hot approach with
    'ovr' to handle multi-class. If there's only one class in y_true, ROC-AUC is None.
    """
    metrics_dict = {}

    # Basic stats
    acc = accuracy_score(y_true, y_pred)
    bal_acc = balanced_accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average=average, zero_division=0)
    rec = recall_score(y_true, y_pred, average=average, zero_division=0)
    f1 = f1_score(y_true, y_pred, average=average, zero_division=0)

    metrics_dict['accuracy'] = acc
    metrics_dict['balanced_accuracy'] = bal_acc
    metrics_dict[f'precision_{average}'] = prec
    metrics_dict[f'recall_{average}'] = rec
    metrics_dict[f'f1_{average}'] = f1

    # Attempt a multi-class or binary ROC-AUC with discrete predictions
    num_classes = len(np.unique(y_true))
    try:
        y_true_1hot = np.zeros((len(y_true), num_classes), dtype=int)
        y_pred_1hot = np.zeros((len(y_pred), num_classes), dtype=int)
        for i, lbl in enumerate(y_true):
            y_true_1hot[i, lbl] = 1
        for i, lbl in enumerate(y_pred):
            y_pred_1hot[i, lbl] = 1

        roc_val = roc_auc_score(
            y_true_1hot,
            y_pred_1hot,
            multi_class='ovr',
            average=average
        )
        metrics_dict[f'roc_auc_{average}'] = roc_val
    except ValueError:
        # e.g., if y_true has only one class
        metrics_dict[f'roc_auc_{average}'] = None

    return metrics_dict

def five_fold_5nn(distance_matrix, labels, n_neighbors=5, random_state=42):
    """
    Perform 5-fold stratified cross-validation using a distance matrix and
    5-NN classification. Computes multiple metrics on each fold and returns
    their average.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        n_neighbors (int): k in k-NN (default=5).
        random_state (int): seed for StratifiedKFold reproducibility.

    Returns:
        dict: A dictionary of average metrics (accuracy, balanced_accuracy,
              precision_weighted, recall_weighted, f1_weighted, roc_auc_weighted).
    """
    distance_matrix = np.asarray(distance_matrix)
    labels = np.asarray(labels)
    N = distance_matrix.shape[0]
    assert distance_matrix.shape == (N, N), "distance_matrix must be NxN"
    assert len(labels) == N, "labels length must match distance_matrix size"

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)

    # We'll accumulate metrics over the 5 folds
    metrics_list = []
    for train_idx, test_idx in skf.split(X=labels, y=labels):
        fold_preds = []
        fold_true = []

        for test_sample in test_idx:
            # Distances from test_sample to all others
            dist_row = distance_matrix[test_sample].copy()
            # Exclude the sample itself
            dist_row[test_sample] = np.inf

            # Only consider training set distances
            train_distances = dist_row[train_idx]

            # Indices in 'train_idx' of the n_neighbors nearest neighbors
            nn_in_train = np.argpartition(train_distances, n_neighbors)[:n_neighbors]
            nn_global_idx = train_idx[nn_in_train]

            # Majority vote
            neighbor_labels = labels[nn_global_idx]
            chosen_label = Counter(neighbor_labels).most_common(1)[0][0]

            fold_preds.append(chosen_label)
            fold_true.append(labels[test_sample])

        # Compute metrics for this fold
        fold_metrics = compute_classification_metrics(fold_true, fold_preds, average='macro')
        metrics_list.append(fold_metrics)
        
        # # 📤 Print metrics immediately
        # print(f"Fold {len(metrics_list)} results:")
        # for metric, value in fold_metrics.items():
        #     print(f"  {metric}: {value:.4f}" if value is not None else f"  {metric}: None")

    # Average metrics over folds
    avg_metrics = {}
    # We know the keys from one fold's result
    keys = metrics_list[0].keys()
    for k in keys:
        # Some might be None if there's an issue with ROC-AUC
        vals = [m[k] for m in metrics_list if m[k] is not None]
        if len(vals) > 0:
            avg_metrics[k] = np.mean(vals)
        else:
            avg_metrics[k] = None

    return avg_metrics


def multiple_seeds_5nn(distance_matrix, labels, seeds, n_neighbors=5):
    """
    Run the 5-fold 5-NN classification for multiple random_state seeds,
    then average the metrics across all runs.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        seeds (list of ints): A list of random seeds to test.
        n_neighbors (int): k in k-NN (default=5).

    Returns:
        dict: A dictionary of overall average metrics across all seeds.
    """
    all_runs_metrics = []
    for seed in seeds:
        run_metrics = five_fold_5nn(distance_matrix, labels,
                                    n_neighbors=n_neighbors,
                                    random_state=seed)
        all_runs_metrics.append(run_metrics)

    # Average across seeds
    avg_overall = {}
    # we'll look at keys from the first run
    keys = all_runs_metrics[0].keys()
    for k in keys:
        vals = [run[k] for run in all_runs_metrics if run[k] is not None]
        if len(vals) > 0:
            avg_overall[k] = np.mean(vals)
        else:
            avg_overall[k] = None

    return avg_overall



# -------------------------------
# Feature Configuration
# -------------------------------

features_needed = [('facet', 0)]


###############################################################################
# 5. Main
###############################################################################
if __name__ == "__main__":
    # (Optional) Set seed for reproducibility
    set_seed(42)

    # -------------------------------------------------------------------------
    # Example: Suppose you have data in a CSV and features in a .npy file
    # similar to your Siamese approach
    # -------------------------------------------------------------------------

    for encoder in encoders:
        print("encoder ", encoder)
        for data_name in csv_names:
            print("data ", data_name)
            for k in range(4,5):
                print("k ", k)

                for feature, max_dim in features_needed:
                    print(feature)
                    print(max_dim)
                    path_to_data = f'data/{data_name}'
                    df = pd.read_csv(f"{path_to_data}.csv")
                    x_list = df['Accession (version)'].to_list()
                    y_list = df['Family'].to_list()
                    x_list, y_list = zip(*sorted(zip(x_list, y_list)))
                    x_list = list(x_list)
                    y_list = list(y_list)
                    indices = find_labels_with_min_count(y_list, min_count=15)
                    x_list = [x_list[i] for i in indices]
                    y_list = [y_list[i] for i in indices]


                    path_to_features = f'distances/{encoder}/{data_name}'
                    # for d in range(max_dim + 1):
                    # print(d)
                    distance_matrix = np.load(path_to_features+f'/k{k}_distance_{feature}{max_dim}.npy')
                    subD = distance_matrix[np.ix_(indices, indices)]




                    seeds = np.arange(1,31)
                    le = LabelEncoder()
                    y_int = le.fit_transform(y_list)  # shape (N,)
                    y_np = np.array(y_int, dtype=np.int32)
                    labels = y_np
                    results = multiple_seeds_5nn(subD, labels, seeds, n_neighbors=5)
                    print("Overall average metrics across seeds:")
                    for metric_name, val in results.items():
                        print(f"{metric_name}: {val}")



                    print("=================================================================================")
                    print("=================================================================================")
                    print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")


encoder  PSRT
data  Yau2022_record_processed
k  4
facet
0
Overall average metrics across seeds:
accuracy: 0.9024065372651517
balanced_accuracy: 0.8193924931401959
precision_macro: 0.8585754342317297
recall_macro: 0.8193924931401959
f1_macro: 0.8243980681502112
roc_auc_macro: 0.9090338370862462


## NCBI 2024

In [16]:
# testing the folds
#!/usr/bin/env python3


encoders = [
    'PSRT',
]

csv_names = [
    'NCBI_record_valid_nucleotide',
]

import numpy as np
import random
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder



from sklearn.model_selection import StratifiedKFold
from collections import Counter



from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)


###############################################################################
# 0. Seed Setting (Optional)
###############################################################################
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)


def find_labels_with_min_count(labels, min_count=15):
    """
    Find the positions of labels whose count is greater than or equal to `min_count`.

    Args:
        labels (list): A list of labels.
        min_count (int): The minimum count to filter labels (default=15).

    Returns:
        list: Indices of labels that meet the count condition.
    """
    # Count occurrences of each label
    label_counts = Counter(labels)
    
    # Find labels with count >= min_count
    valid_labels = {label for label, count in label_counts.items() if count >= min_count}
    
    # Find indices of valid labels
    indices = [i for i, label in enumerate(labels) if label in valid_labels]
    
    return indices




def compute_classification_metrics(y_true, y_pred, average='macro'):
    """
    Compute multiple classification metrics given true and predicted labels.
    Returns a dictionary with:
        - accuracy
        - balanced_accuracy
        - precision
        - recall
        - f1
        - roc_auc (if multi-class or binary)

    Note: For ROC-AUC with discrete predictions, we do a one-hot approach with
    'ovr' to handle multi-class. If there's only one class in y_true, ROC-AUC is None.
    """
    metrics_dict = {}

    # Basic stats
    acc = accuracy_score(y_true, y_pred)
    bal_acc = balanced_accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average=average, zero_division=0)
    rec = recall_score(y_true, y_pred, average=average, zero_division=0)
    f1 = f1_score(y_true, y_pred, average=average, zero_division=0)

    metrics_dict['accuracy'] = acc
    metrics_dict['balanced_accuracy'] = bal_acc
    metrics_dict[f'precision_{average}'] = prec
    metrics_dict[f'recall_{average}'] = rec
    metrics_dict[f'f1_{average}'] = f1

    # Attempt a multi-class or binary ROC-AUC with discrete predictions
    num_classes = len(np.unique(y_true))
    try:
        y_true_1hot = np.zeros((len(y_true), num_classes), dtype=int)
        y_pred_1hot = np.zeros((len(y_pred), num_classes), dtype=int)
        for i, lbl in enumerate(y_true):
            y_true_1hot[i, lbl] = 1
        for i, lbl in enumerate(y_pred):
            y_pred_1hot[i, lbl] = 1

        roc_val = roc_auc_score(
            y_true_1hot,
            y_pred_1hot,
            multi_class='ovr',
            average=average
        )
        metrics_dict[f'roc_auc_{average}'] = roc_val
    except ValueError:
        # e.g., if y_true has only one class
        metrics_dict[f'roc_auc_{average}'] = None

    return metrics_dict

def five_fold_5nn(distance_matrix, labels, n_neighbors=5, random_state=42):
    """
    Perform 5-fold stratified cross-validation using a distance matrix and
    5-NN classification. Computes multiple metrics on each fold and returns
    their average.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        n_neighbors (int): k in k-NN (default=5).
        random_state (int): seed for StratifiedKFold reproducibility.

    Returns:
        dict: A dictionary of average metrics (accuracy, balanced_accuracy,
              precision_weighted, recall_weighted, f1_weighted, roc_auc_weighted).
    """
    distance_matrix = np.asarray(distance_matrix)
    labels = np.asarray(labels)
    N = distance_matrix.shape[0]
    assert distance_matrix.shape == (N, N), "distance_matrix must be NxN"
    assert len(labels) == N, "labels length must match distance_matrix size"

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)

    # We'll accumulate metrics over the 5 folds
    metrics_list = []
    for train_idx, test_idx in skf.split(X=labels, y=labels):
        fold_preds = []
        fold_true = []

        for test_sample in test_idx:
            # Distances from test_sample to all others
            dist_row = distance_matrix[test_sample].copy()
            # Exclude the sample itself
            dist_row[test_sample] = np.inf

            # Only consider training set distances
            train_distances = dist_row[train_idx]

            # Indices in 'train_idx' of the n_neighbors nearest neighbors
            nn_in_train = np.argpartition(train_distances, n_neighbors)[:n_neighbors]
            nn_global_idx = train_idx[nn_in_train]

            # Majority vote
            neighbor_labels = labels[nn_global_idx]
            chosen_label = Counter(neighbor_labels).most_common(1)[0][0]

            fold_preds.append(chosen_label)
            fold_true.append(labels[test_sample])

        # Compute metrics for this fold
        fold_metrics = compute_classification_metrics(fold_true, fold_preds, average='macro')
        metrics_list.append(fold_metrics)
        
        # # 📤 Print metrics immediately
        # print(f"Fold {len(metrics_list)} results:")
        # for metric, value in fold_metrics.items():
        #     print(f"  {metric}: {value:.4f}" if value is not None else f"  {metric}: None")

    # Average metrics over folds
    avg_metrics = {}
    # We know the keys from one fold's result
    keys = metrics_list[0].keys()
    for k in keys:
        # Some might be None if there's an issue with ROC-AUC
        vals = [m[k] for m in metrics_list if m[k] is not None]
        if len(vals) > 0:
            avg_metrics[k] = np.mean(vals)
        else:
            avg_metrics[k] = None

    return avg_metrics


def multiple_seeds_5nn(distance_matrix, labels, seeds, n_neighbors=5):
    """
    Run the 5-fold 5-NN classification for multiple random_state seeds,
    then average the metrics across all runs.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        seeds (list of ints): A list of random seeds to test.
        n_neighbors (int): k in k-NN (default=5).

    Returns:
        dict: A dictionary of overall average metrics across all seeds.
    """
    all_runs_metrics = []
    for seed in seeds:
        run_metrics = five_fold_5nn(distance_matrix, labels,
                                    n_neighbors=n_neighbors,
                                    random_state=seed)
        all_runs_metrics.append(run_metrics)

    # Average across seeds
    avg_overall = {}
    # we'll look at keys from the first run
    keys = all_runs_metrics[0].keys()
    for k in keys:
        vals = [run[k] for run in all_runs_metrics if run[k] is not None]
        if len(vals) > 0:
            avg_overall[k] = np.mean(vals)
        else:
            avg_overall[k] = None

    return avg_overall



# -------------------------------
# Feature Configuration
# -------------------------------

features_needed = [('facet', 0)]

###############################################################################
# 5. Main
###############################################################################
if __name__ == "__main__":
    # (Optional) Set seed for reproducibility
    set_seed(42)

    # -------------------------------------------------------------------------
    # Example: Suppose you have data in a CSV and features in a .npy file
    # similar to your Siamese approach
    # -------------------------------------------------------------------------

    for encoder in encoders:
        print("encoder ", encoder)
        for data_name in csv_names:
            print("data ", data_name)
            for k in range(4,5):
                print("k ", k)

                for feature, max_dim in features_needed:
                    print(feature)
                    print(max_dim)
                    path_to_data = f'data/{data_name}'
                    df = pd.read_csv(f"{path_to_data}.csv")
                    x_list = df['Accession (version)'].to_list()
                    y_list = df['Family'].to_list()
                    x_list, y_list = zip(*sorted(zip(x_list, y_list)))
                    x_list = list(x_list)
                    y_list = list(y_list)
                    indices = find_labels_with_min_count(y_list, min_count=15)
                    x_list = [x_list[i] for i in indices]
                    y_list = [y_list[i] for i in indices]


                    path_to_features = f'distances/{encoder}/{data_name}'
                    # for d in range(max_dim + 1):
                    # print(d)
                    distance_matrix = np.load(path_to_features+f'/k{k}_distance_{feature}{max_dim}.npy')
                    subD = distance_matrix[np.ix_(indices, indices)]




                    seeds = np.arange(1,31)
                    le = LabelEncoder()
                    y_int = le.fit_transform(y_list)  # shape (N,)
                    y_np = np.array(y_int, dtype=np.int32)
                    labels = y_np
                    results = multiple_seeds_5nn(subD, labels, seeds, n_neighbors=5)
                    print("Overall average metrics across seeds:")
                    for metric_name, val in results.items():
                        print(f"{metric_name}: {val}")



                    print("=================================================================================")
                    print("=================================================================================")
                    print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")


encoder  PSRT
data  NCBI_record_valid_nucleotide
k  4
facet
0


Overall average metrics across seeds:
accuracy: 0.8759404096834262
balanced_accuracy: 0.7916106489739025
precision_macro: 0.8529276718534098
recall_macro: 0.7916106489739025
f1_macro: 0.8044019815704782
roc_auc_macro: 0.8952229908522413


## NCBI 2024 All

In [17]:
# testing the folds
#!/usr/bin/env python3


encoders = [
    'PSRT',
]

csv_names = [
    'NCBI_record_valid_count',
]

import numpy as np
import random
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder



from sklearn.model_selection import StratifiedKFold
from collections import Counter



from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score
)


###############################################################################
# 0. Seed Setting (Optional)
###############################################################################
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)


def find_labels_with_min_count(labels, min_count=15):
    """
    Find the positions of labels whose count is greater than or equal to `min_count`.

    Args:
        labels (list): A list of labels.
        min_count (int): The minimum count to filter labels (default=15).

    Returns:
        list: Indices of labels that meet the count condition.
    """
    # Count occurrences of each label
    label_counts = Counter(labels)
    
    # Find labels with count >= min_count
    valid_labels = {label for label, count in label_counts.items() if count >= min_count}
    
    # Find indices of valid labels
    indices = [i for i, label in enumerate(labels) if label in valid_labels]
    
    return indices




def compute_classification_metrics(y_true, y_pred, average='macro'):
    """
    Compute multiple classification metrics given true and predicted labels.
    Returns a dictionary with:
        - accuracy
        - balanced_accuracy
        - precision
        - recall
        - f1
        - roc_auc (if multi-class or binary)

    Note: For ROC-AUC with discrete predictions, we do a one-hot approach with
    'ovr' to handle multi-class. If there's only one class in y_true, ROC-AUC is None.
    """
    metrics_dict = {}

    # Basic stats
    acc = accuracy_score(y_true, y_pred)
    bal_acc = balanced_accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, average=average, zero_division=0)
    rec = recall_score(y_true, y_pred, average=average, zero_division=0)
    f1 = f1_score(y_true, y_pred, average=average, zero_division=0)

    metrics_dict['accuracy'] = acc
    metrics_dict['balanced_accuracy'] = bal_acc
    metrics_dict[f'precision_{average}'] = prec
    metrics_dict[f'recall_{average}'] = rec
    metrics_dict[f'f1_{average}'] = f1

    # Attempt a multi-class or binary ROC-AUC with discrete predictions
    num_classes = len(np.unique(y_true))
    try:
        y_true_1hot = np.zeros((len(y_true), num_classes), dtype=int)
        y_pred_1hot = np.zeros((len(y_pred), num_classes), dtype=int)
        for i, lbl in enumerate(y_true):
            y_true_1hot[i, lbl] = 1
        for i, lbl in enumerate(y_pred):
            y_pred_1hot[i, lbl] = 1

        roc_val = roc_auc_score(
            y_true_1hot,
            y_pred_1hot,
            multi_class='ovr',
            average=average
        )
        metrics_dict[f'roc_auc_{average}'] = roc_val
    except ValueError:
        # e.g., if y_true has only one class
        metrics_dict[f'roc_auc_{average}'] = None

    return metrics_dict

def five_fold_5nn(distance_matrix, labels, n_neighbors=5, random_state=42):
    """
    Perform 5-fold stratified cross-validation using a distance matrix and
    5-NN classification. Computes multiple metrics on each fold and returns
    their average.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        n_neighbors (int): k in k-NN (default=5).
        random_state (int): seed for StratifiedKFold reproducibility.

    Returns:
        dict: A dictionary of average metrics (accuracy, balanced_accuracy,
              precision_weighted, recall_weighted, f1_weighted, roc_auc_weighted).
    """
    distance_matrix = np.asarray(distance_matrix)
    labels = np.asarray(labels)
    N = distance_matrix.shape[0]
    assert distance_matrix.shape == (N, N), "distance_matrix must be NxN"
    assert len(labels) == N, "labels length must match distance_matrix size"

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)

    # We'll accumulate metrics over the 5 folds
    metrics_list = []
    for train_idx, test_idx in skf.split(X=labels, y=labels):
        fold_preds = []
        fold_true = []

        for test_sample in test_idx:
            # Distances from test_sample to all others
            dist_row = distance_matrix[test_sample].copy()
            # Exclude the sample itself
            dist_row[test_sample] = np.inf

            # Only consider training set distances
            train_distances = dist_row[train_idx]

            # Indices in 'train_idx' of the n_neighbors nearest neighbors
            nn_in_train = np.argpartition(train_distances, n_neighbors)[:n_neighbors]
            nn_global_idx = train_idx[nn_in_train]

            # Majority vote
            neighbor_labels = labels[nn_global_idx]
            chosen_label = Counter(neighbor_labels).most_common(1)[0][0]

            fold_preds.append(chosen_label)
            fold_true.append(labels[test_sample])

        # Compute metrics for this fold
        fold_metrics = compute_classification_metrics(fold_true, fold_preds, average='macro')
        metrics_list.append(fold_metrics)
        
        # # 📤 Print metrics immediately
        # print(f"Fold {len(metrics_list)} results:")
        # for metric, value in fold_metrics.items():
        #     print(f"  {metric}: {value:.4f}" if value is not None else f"  {metric}: None")

    # Average metrics over folds
    avg_metrics = {}
    # We know the keys from one fold's result
    keys = metrics_list[0].keys()
    for k in keys:
        # Some might be None if there's an issue with ROC-AUC
        vals = [m[k] for m in metrics_list if m[k] is not None]
        if len(vals) > 0:
            avg_metrics[k] = np.mean(vals)
        else:
            avg_metrics[k] = None

    return avg_metrics


def multiple_seeds_5nn(distance_matrix, labels, seeds, n_neighbors=5):
    """
    Run the 5-fold 5-NN classification for multiple random_state seeds,
    then average the metrics across all runs.

    Args:
        distance_matrix (ndarray): NxN precomputed distances.
        labels (array-like): length N array of class labels.
        seeds (list of ints): A list of random seeds to test.
        n_neighbors (int): k in k-NN (default=5).

    Returns:
        dict: A dictionary of overall average metrics across all seeds.
    """
    all_runs_metrics = []
    for seed in seeds:
        run_metrics = five_fold_5nn(distance_matrix, labels,
                                    n_neighbors=n_neighbors,
                                    random_state=seed)
        all_runs_metrics.append(run_metrics)

    # Average across seeds
    avg_overall = {}
    # we'll look at keys from the first run
    keys = all_runs_metrics[0].keys()
    for k in keys:
        vals = [run[k] for run in all_runs_metrics if run[k] is not None]
        if len(vals) > 0:
            avg_overall[k] = np.mean(vals)
        else:
            avg_overall[k] = None

    return avg_overall



# -------------------------------
# Feature Configuration
# -------------------------------

features_needed = [('facet', 0)]

###############################################################################
# 5. Main
###############################################################################
if __name__ == "__main__":
    # (Optional) Set seed for reproducibility
    set_seed(42)

    # -------------------------------------------------------------------------
    # Example: Suppose you have data in a CSV and features in a .npy file
    # similar to your Siamese approach
    # -------------------------------------------------------------------------

    for encoder in encoders:
        print("encoder ", encoder)
        for data_name in csv_names:
            print("data ", data_name)
            for k in range(4,5):
                print("k ", k)

                for feature, max_dim in features_needed:
                    print(feature)
                    print(max_dim)
                    path_to_data = f'data/{data_name}'
                    df = pd.read_csv(f"{path_to_data}.csv")
                    x_list = df['Accession (version)'].to_list()
                    y_list = df['Family'].to_list()
                    x_list, y_list = zip(*sorted(zip(x_list, y_list)))
                    x_list = list(x_list)
                    y_list = list(y_list)
                    indices = find_labels_with_min_count(y_list, min_count=15)
                    x_list = [x_list[i] for i in indices]
                    y_list = [y_list[i] for i in indices]


                    path_to_features = f'distances/{encoder}/{data_name}'
                    # for d in range(max_dim + 1):
                    # print(d)
                    distance_matrix = np.load(path_to_features+f'/k{k}_distance_{feature}{max_dim}.npy')
                    subD = distance_matrix[np.ix_(indices, indices)]




                    seeds = np.arange(1,31)
                    le = LabelEncoder()
                    y_int = le.fit_transform(y_list)  # shape (N,)
                    y_np = np.array(y_int, dtype=np.int32)
                    labels = y_np
                    results = multiple_seeds_5nn(subD, labels, seeds, n_neighbors=5)
                    print("Overall average metrics across seeds:")
                    for metric_name, val in results.items():
                        print(f"{metric_name}: {val}")



                    print("=================================================================================")
                    print("=================================================================================")
                    print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
                print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")
            print("=================================================================================")


encoder  PSRT
data  NCBI_record_valid_count
k  4
facet
0
Overall average metrics across seeds:
accuracy: 0.8759067882472139
balanced_accuracy: 0.79430174045709
precision_macro: 0.8531756964988015
recall_macro: 0.79430174045709
f1_macro: 0.8066358920785363
roc_auc_macro: 0.8966229384573784
