In [12]:
import os
import sys
import shutil
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV, PredefinedSplit, cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from torchvision.models import resnet50

# Add project path
project_path = os.path.abspath("../code")
sys.path.append(project_path)
from vipm_features import ResNet50FeatureExtractor

# Configure environment variable
LOKY_MAX_CPU_COUNT = 16


In [4]:
# Carica il file CSV
def load_csv(csv_path):
    data = pd.read_csv(csv_path, header=None, names=['image_name', 'label'])
    return data['image_name'].tolist(), data['label'].tolist()

# Carica il file compresso NPZ
def load_features(npz_path):
    data = np.load(npz_path)
    features = data['features']
    labels = data['labels']
    return features, labels

# Percorsi
csv_path = '../dataset/train_small.csv'   
csv_unlabeled = '../dataset/train_unlabeled.csv'
csv_test = '../dataset/val_info.csv'

indir = '../dataset/train_set'  # Modifica in base alla posizione delle immagini
test_dir = '../dataset/val_set'
test_degraded_dir = '../dataset/val_set_degraded'

outdir = '../features'  # Modifica in base alla posizione delle feature
os.makedirs(outdir, exist_ok=True)

# Carica le features recuperate

# 20 
npz_path = os.path.join(outdir, 'features_unlabeled_retrived.npz')
features_20, labels_20 = load_features(npz_path)

# 20 cleaned
npz_path = os.path.join(outdir, 'features_unlabeled_retrived_cleaned.npz')
features_20_cleaned, labels_20_cleaned = load_features(npz_path)

# 40
npz_path = os.path.join(outdir, 'features_unlabeled_retrived_40.npz')
features_40, labels_40 = load_features(npz_path)

# 40 cleaned
npz_path = os.path.join(outdir, 'features_unlabeled_retrived_cleaned_40.npz')
features_40_cleaned, labels_40_cleaned = load_features(npz_path)

# 80
npz_path = os.path.join(outdir, 'features_unlabeled_retrived_80.npz')
features_80, labels_80 = load_features(npz_path)

# 80 cleaned
npz_path = os.path.join(outdir, 'features_unlabeled_retrived_cleaned_80.npz')
features_80_cleaned, labels_80_cleaned = load_features(npz_path)

# feature small 
npz_path = os.path.join(outdir, 'features_small_filtered.npz')
features_small_filtered, labels_small_filtered = load_features(npz_path)

# Carica le immagini dal CSV
image_names, labels_small = load_csv(csv_path)
labels_small = np.array(labels_small)
extractor = ResNet50FeatureExtractor()
features_small, _, _ = extractor.get_features(csv=csv_path, indir=indir, outdir=outdir, normalize=True)

# Test set 
image_names_test, labels_test = load_csv(csv_test)
labels_test = np.array(labels_test)
features_test, _, _ = extractor.get_features(csv=csv_test, indir=test_dir, outdir=outdir, normalize=True)

# Test set degraded
image_names_test_degraded, labels_test_degraded = load_csv(csv_test)
labels_test_degraded = np.array(labels_test_degraded)
features_test_degraded, _, _ = extractor.get_features(csv=csv_test, indir=test_degraded_dir, outdir=outdir, normalize=True, file_name='features_test_degraded_normalized.npz')

Caricamento delle feature da ../features\train_small_resnet50_features_normalized.npz
Caricamento delle feature da ../features\val_info_resnet50_features_normalized.npz
Caricamento delle feature da ../features\features_test_degraded_normalized.npz


In [13]:
log_path = "results_log.csv"
log_columns = [
    "Dimension", "Small Cleaned", "Dim Cleaned", "Phase", "K", "Accuracy", "Top-5 Accuracy"
]
logs = []

# Combinazioni di dimensioni e feature
configurations = [
    {"dimension": 20, "features": features_20, "labels": labels_20, "cleaned_features": features_20_cleaned, "cleaned_labels": labels_20_cleaned},
    {"dimension": 40, "features": features_40, "labels": labels_40, "cleaned_features": features_40_cleaned, "cleaned_labels": labels_40_cleaned},
    {"dimension": 80, "features": features_80, "labels": labels_80, "cleaned_features": features_80_cleaned, "cleaned_labels": labels_80_cleaned}
]

# Funzione per train e evaluation con KNN
def train_and_evaluate_knn(X, y, k, cv_splits=5):
    knn = KNeighborsClassifier(n_neighbors=k)
    skf = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=42)

    # Cross-validation per accuracy
    accuracies = cross_val_score(knn, X, y, cv=skf, scoring='accuracy')
    mean_accuracy = np.mean(accuracies)

    # Cross-validation per top-5 accuracy
    top_5_accuracies = []
    for train_idx, val_idx in skf.split(X, y):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]
        knn.fit(X_train, y_train)
        top_5_predictions = np.argsort(knn.predict_proba(X_val), axis=1)[:, -5:]
        top_5_accuracy = np.mean([y in top_5 for y, top_5 in zip(y_val, top_5_predictions)])
        top_5_accuracies.append(top_5_accuracy)

    mean_top_5_accuracy = np.mean(top_5_accuracies)
    return mean_accuracy, mean_top_5_accuracy
# Test small come baseline
for small_cleaned in [False, True]:
    print(f"Small Cleaned: {small_cleaned}")

    if small_cleaned:
        small_features = features_small_filtered
        small_labels = labels_small_filtered
    else:
        small_features = features_small
        small_labels = labels_small

    # Prova con diversi valori di K
    for k in [5, 10, 25, 50, 100, 150]:
        print(f"  Valutazione per K={k}")

        # Cross-validation 80-20 con funzione dedicata
        mean_acc_cv, mean_top_5_acc_cv = train_and_evaluate_knn(small_features, small_labels, k)
        logs.append([0, small_cleaned, False, "Cross-Validation", k, mean_acc_cv, mean_top_5_acc_cv])
        print(f"    Cross-Validation: Accuracy={mean_acc_cv}, Top-5 Accuracy={mean_top_5_acc_cv}")

        # Test su test set
        knn = KNeighborsClassifier(n_neighbors=k)
        knn.fit(small_features, small_labels)
        top_5_predictions_test = np.argsort(knn.predict_proba(features_test), axis=1)[:, -5:]
        acc_test = knn.score(features_test, labels_test)
        top_5_acc_test = np.mean([y in top_5 for y, top_5 in zip(labels_test, top_5_predictions_test)])
        logs.append([0, small_cleaned, False, "Test Set (only on small)", k, acc_test, top_5_acc_test])
        print(f"    Test Set (only on small): Accuracy={acc_test}, Top-5 Accuracy={top_5_acc_test}")

        # Test su test set degraded
        top_5_predictions_test_degraded = np.argsort(knn.predict_proba(features_test_degraded), axis=1)[:, -5:]
        acc_test_degraded = knn.score(features_test_degraded, labels_test_degraded)
        top_5_acc_test_degraded = np.mean([y in top_5 for y, top_5 in zip(labels_test_degraded, top_5_predictions_test_degraded)])
        logs.append([0, small_cleaned, False, "Test Set Degraded (only on small)", k, acc_test_degraded, top_5_acc_test_degraded])
        print(f"    Test Set Degraded (only on small): Accuracy={acc_test_degraded}, Top-5 Accuracy={top_5_acc_test_degraded}")
        
        
# Cross-validation 80-20 e test
for config in configurations:
    dim = config["dimension"]
    print(f"Iniziando configurazione per dimensione: {dim}")

    # Combinazioni di feature tra small e dimensione specifica
    for small_cleaned in [False, True]:
        for dim_cleaned in [False, True]:

            if small_cleaned:
                small_features = features_small_filtered
                small_labels = labels_small_filtered
            else:
                small_features = features_small
                small_labels = labels_small

            if dim_cleaned:
                current_features = config["cleaned_features"]
                current_labels = config["cleaned_labels"][0]
            else:
                current_features = config["features"]
                current_labels = config["labels"][0]

            print(f"  Small Cleaned: {small_cleaned}, Dim Cleaned: {dim_cleaned}")

            # Unione delle feature
            combined_features = np.concatenate((current_features, small_features), axis=0)
            combined_labels = np.concatenate((current_labels, small_labels), axis=0)

            # Prova con diversi valori di K
            for k in [5, 10, 25, 50, 100, 150]:
                print(f"    Valutazione per K={k}")

                # Cross-validation 80-20 con funzione dedicata
                mean_acc_cv, mean_top_5_acc_cv = train_and_evaluate_knn(combined_features, combined_labels, k)
                logs.append([dim, small_cleaned, dim_cleaned, "Cross-Validation", k, mean_acc_cv, mean_top_5_acc_cv])
                print(f"      Cross-Validation: Accuracy={mean_acc_cv}, Top-5 Accuracy={mean_top_5_acc_cv}")

                # Validation set solo con feature della dimensione specifica
                knn = KNeighborsClassifier(n_neighbors=k)
                knn.fit(current_features, current_labels)
                top_5_predictions_val = np.argsort(knn.predict_proba(small_features), axis=1)[:, -5:]
                acc_val = knn.score(small_features, small_labels)
                top_5_acc_val = np.mean([y in top_5 for y, top_5 in zip(small_labels, top_5_predictions_val)])
                logs.append([dim, small_cleaned, dim_cleaned, "Validation small", k, acc_val, top_5_acc_val])
                print(f"      Validation small: Accuracy={acc_val}, Top-5 Accuracy={top_5_acc_val}")

                # Test su test set
                top_5_predictions_test = np.argsort(knn.predict_proba(features_test), axis=1)[:, -5:]
                acc_test = knn.score(features_test, labels_test)
                top_5_acc_test = np.mean([y in top_5 for y, top_5 in zip(labels_test, top_5_predictions_test)])
                logs.append([dim, small_cleaned, dim_cleaned, "Test Set (only on retrived)", k, acc_test, top_5_acc_test])
                print(f"      Test Set (only on retrived): Accuracy={acc_test}, Top-5 Accuracy={top_5_acc_test}")

                # Test su test set degraded
                top_5_predictions_test_degraded = np.argsort(knn.predict_proba(features_test_degraded), axis=1)[:, -5:]
                acc_test_degraded = knn.score(features_test_degraded, labels_test_degraded)
                top_5_acc_test_degraded = np.mean([y in top_5 for y, top_5 in zip(labels_test_degraded, top_5_predictions_test_degraded)])
                logs.append([dim, small_cleaned, dim_cleaned, "Test Set Degraded (only on retrived)", k, acc_test_degraded, top_5_acc_test_degraded])
                print(f"      Test Set Degraded (only on retrived): Accuracy={acc_test_degraded}, Top-5 Accuracy={top_5_acc_test_degraded}")
                
                # Training su entrambi i set
                knn = KNeighborsClassifier(n_neighbors=k)
                knn.fit(combined_features, combined_labels)

                # Test su test set
                top_5_predictions_test = np.argsort(knn.predict_proba(features_test), axis=1)[:, -5:]
                acc_test = knn.score(features_test, labels_test)
                top_5_acc_test = np.mean([y in top_5 for y, top_5 in zip(labels_test, top_5_predictions_test)])
                logs.append([dim, small_cleaned, dim_cleaned, "Test Set (on small and retrived)", k, acc_test, top_5_acc_test])
                print(f"      Test Set (on small and retrived): Accuracy={acc_test}, Top-5 Accuracy={top_5_acc_test}")

                # Test su test set degraded
                top_5_predictions_test_degraded = np.argsort(knn.predict_proba(features_test_degraded), axis=1)[:, -5:]
                acc_test_degraded = knn.score(features_test_degraded, labels_test_degraded)
                top_5_acc_test_degraded = np.mean([y in top_5 for y, top_5 in zip(labels_test_degraded, top_5_predictions_test_degraded)])
                logs.append([dim, small_cleaned, dim_cleaned, "Test Set Degraded (on small and retrived)", k, acc_test_degraded, top_5_acc_test_degraded])
                print(f"      Test Set Degraded (on small and retrived): Accuracy={acc_test_degraded}, Top-5 Accuracy={top_5_acc_test_degraded}")

# Salva i log in un CSV
log_df = pd.DataFrame(logs, columns=log_columns)
log_df.to_csv(log_path, index=False)
print(f"Log salvato in {log_path}")

Small Cleaned: False
  Valutazione per K=5
    Cross-Validation: Accuracy=0.14003984063745017, Top-5 Accuracy=0.3264940239043824
    Test Set( only on small): Accuracy=0.1727530431882608, Top-5 Accuracy=0.3746873436718359
    Test Set Degraded (only on small): Accuracy=0.12231115557778889, Top-5 Accuracy=0.28080707020176754
  Valutazione per K=10
    Cross-Validation: Accuracy=0.15976095617529879, Top-5 Accuracy=0.3141434262948207
    Test Set( only on small): Accuracy=0.20168417542104386, Top-5 Accuracy=0.3691012172753043
    Test Set Degraded (only on small): Accuracy=0.14107053526763383, Top-5 Accuracy=0.2722194430548608
  Valutazione per K=25
    Cross-Validation: Accuracy=0.17709163346613543, Top-5 Accuracy=0.3609561752988048
    Test Set( only on small): Accuracy=0.22069368017342003, Top-5 Accuracy=0.42587960646990164
    Test Set Degraded (only on small): Accuracy=0.16016341504085377, Top-5 Accuracy=0.3186593296648324
  Valutazione per K=50
    Cross-Validation: Accuracy=0.18266