## start

In [28]:
from typing import Any, Tuple, Union, Dict, Callable, List, Iterable
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import IsolationForest
from torch.utils.data import DataLoader, random_split
#from ColoredMNIST import ColoredMNIST, make_biased_color_samplers
from CmnistMLP import CmnistMLP
from CmnistMLP0 import CmnistMLP0
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.manifold import TSNE
from CMNIST import CMNIST
from MLP import MLP
from CIFAR10C import CIFAR10C
from TV_RESNET18 import TV_RESNET18


In [29]:
def visualize_with_tsne(features, labels,path, title = None):

    tsne_model = TSNE(n_components=2, random_state=42)
    transformed_features = tsne_model.fit_transform(features)
    unique_labels = np.unique(labels)

    plt.figure(figsize=(10, 8))
    for label in unique_labels:
        plt.scatter(transformed_features[labels == label, 0], transformed_features[labels == label, 1], label=str(label))

    plt.title(
        "t-SNE Visualization of Features" if title is None \
        else title
    )

    plt.legend()
    plt.savefig(path+f"{title.replace(' ', '')}.png")
    plt.close()

In [30]:
def evaluate_adecs_v2(adecs, tr_labels, tr_features, tr_bias_labels, vl_labels, vl_features, vl_bias_labels, BIAS_AMOUNT, it, STEP, save_results_to):
    tr_accuracy = []
    vl_accuracy = []
    train_TN_rate = []
    train_TP_rate = []
    val_TN_rate = []
    val_TP_rate = []
    bias_detected_ratio = []

    for i, adec in enumerate(adecs):
        tr_idxs = np.where(tr_labels == i)[0]
        tr_in_class_samples = tr_features[tr_idxs]
        tr_in_class_blabels = tr_bias_labels[tr_idxs]

        vl_idxs = np.where(vl_labels == i)[0]
        vl_in_class_samples = vl_features[vl_idxs]
        vl_in_class_blabels = vl_bias_labels[vl_idxs]

        adec.fit(tr_in_class_samples)
        tr_predictions = adec.predict(tr_in_class_samples)
        vl_predictions = adec.predict(vl_in_class_samples)

        # Calcolo delle metriche
        tr_accuracy.append(np.count_nonzero(tr_predictions == tr_in_class_blabels) / len(tr_in_class_blabels))
        vl_accuracy.append(np.count_nonzero(vl_predictions == vl_in_class_blabels) / len(vl_in_class_blabels))

        #y_true=-1
        unbiased_samples = tr_in_class_samples[tr_in_class_blabels == -1]
        #y_true=1
        biased_samples = tr_in_class_samples[tr_in_class_blabels == 1]
        #y_pred=-1
        predictions_unbiased = tr_predictions[tr_in_class_blabels == -1]
        #y_pred=1
        predictions_biased = tr_predictions[tr_in_class_blabels == 1]

        #y_true=-1
        val_unbiased_samples = vl_in_class_samples[vl_in_class_blabels == -1]
        #y_true=1
        val_biased_samples = vl_in_class_samples[vl_in_class_blabels == 1]
        #y_pred=-1
        val_predictions_unbiased = vl_predictions[vl_in_class_blabels == -1]
        #y_pred=1
        val_predictions_biased = vl_predictions[vl_in_class_blabels == 1]

        tp_train=np.sum((tr_predictions==1)&(tr_in_class_blabels==1))
        fn_train=np.sum((tr_predictions==-1)&(tr_in_class_blabels==1))

        tn_train=np.sum((tr_predictions==-1)&(tr_in_class_blabels==-1))
        fp_train=np.sum((tr_predictions==1)&(tr_in_class_blabels==-1))

        tp_val=np.sum((vl_predictions==1)&(vl_in_class_blabels==1))
        fn_val=np.sum((vl_predictions==-1)&(vl_in_class_blabels==1))

        tn_val=np.sum((vl_predictions==-1)&(vl_in_class_blabels==-1))
        fp_val=np.sum((vl_predictions==1)&(vl_in_class_blabels==-1))



        train_TP_rate.append(tp_train/(tp_train+fn_train))
        train_TN_rate.append(tn_train/(tn_train+fp_train))

        val_TP_rate.append(tp_val/(tp_val+fn_val))
        val_TN_rate.append(tn_val/(tn_val+fp_val))

        bias_detected_ratio.append( np.count_nonzero(tr_predictions == 1) / len(tr_in_class_blabels))

        # Visualizzazione e salvataggio delle immagini (in base a STEP)
        print("--------------------------------------------------------------------------")
        print(f"Class {i}: ")
        print(f"\t -- Train Accuracy (bias_amount={BIAS_AMOUNT}): {tr_accuracy[i]:.3f}")
        print(f"\t\t -- Train true negative rate: {train_TN_rate[i]:.3f}")
        print(f"\t\t -- Train true positive rate: {train_TP_rate[i]:.3f}")
        print(f"\t -- Validation Accuracy (bias_amount={BIAS_AMOUNT}): {vl_accuracy[i]:.3f}")
        print(f"\t\t -- Validation true negative rate: {val_TN_rate[i]:.3f}")
        print(f"\t\t -- Validation true positive rate: {val_TP_rate[i]:.3f}")

        visualize_with_tsne(tr_in_class_samples, tr_predictions, save_results_to, title=f"Class {i}(B={BIAS_AMOUNT}) -- Train Features - Step {it + 1}")
        visualize_with_tsne(vl_in_class_samples, vl_predictions, save_results_to, title=f"Class {i}(B={BIAS_AMOUNT}) -- Valid Features - Step {it + 1}")

        if STEP == (it + 1):
            # Solo nell'ultima iterazione
            tr_conf_matrix = confusion_matrix(tr_in_class_blabels, tr_predictions)
            disp = ConfusionMatrixDisplay(tr_conf_matrix, display_labels=["Unbiased", "Biased"])
            disp.plot()
            plt.title(f"Confusion Matrix of Class {i} (B={BIAS_AMOUNT}) -- Train Set")
            plt.savefig(save_results_to + f"ConfMat_Class_{i}(B={BIAS_AMOUNT})_trainset.png")
            plt.close()

            vl_conf_matrix = confusion_matrix(vl_in_class_blabels, vl_predictions)
            disp = ConfusionMatrixDisplay(vl_conf_matrix, display_labels=["Unbiased", "Biased"])
            disp.plot()
            plt.title(f"Confusion Matrix of Class {i} (B={BIAS_AMOUNT}) -- Validation Set")
            plt.savefig(save_results_to + f"ConfMat_Class_{i}(B={BIAS_AMOUNT})_valset.png")
            plt.close()
    return adecs,tr_accuracy, vl_accuracy,train_TN_rate,train_TP_rate,val_TN_rate,val_TP_rate,bias_detected_ratio

## 5*20

In [31]:
# 20 iterations (1nn 1ad)
BIAS_AMOUNT = 0.95
LEARNING_RATE_STEP_0=0.001
EPOCHS_STEP_0=30
LEARNING_RATE_ITER=0.001
STEP=5
EPOCHS_ITER=20
BATCH_SIZE = 256
ground_truth=False
rand=False
MAX_CONT=0.2

model_name="test_1"
os.makedirs(f"./cifar10c_tv/{str(BIAS_AMOUNT).replace('.', '')}_{model_name}", exist_ok=True)
save_results_to = f"/home/vito/Marinelli/cifar10c_tv/{str(BIAS_AMOUNT).replace('.', '')}_"+model_name+"/"
if __name__ == "__main__":
  
  train_set=CIFAR10C(env="train",bias_amount=0.95)
  val_set=CIFAR10C(env="val",bias_amount=0.95)
  test_set=CIFAR10C(env="test",bias_amount=0.95)

  # Objects responsible for loading batches of data during training, validation, and testing
  train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True, num_workers=0)
  val_loader   = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True, num_workers=0)
  test_loader  = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False, pin_memory=True, num_workers=0)

  R18_model = TV_RESNET18()
  model_save_path = save_results_to+model_name+"_step0.pt"
  
  # print("step 0")
  # R18_model.train_model_step0(
  #   train_loader,
  #   val_loader,
  #   test_loader,
  #   learning_rate=LEARNING_RATE_STEP_0,
  #   num_epochs=EPOCHS_STEP_0
  #   )
  # R18_model.save_model(model_save_path)
  
  # to upload
  R18_model = TV_RESNET18.load_model(model_save_path)
  # extraction of the features from the previous trained model
  tr_features, tr_labels, tr_bias_labels = R18_model.extract_features(train_loader)
  vl_features, vl_labels, vl_bias_labels = R18_model.extract_features(val_loader)
  #y_val, y_true, class_contaminations = R18_model.predict_from(val_loader,max_cont=MAX_CONT)
  class_contaminations=[0.2 for i in range (10)] ##starting contamination fixed
  print(class_contaminations)
  # instances of the OC-SVM classifier for unsupervised outlier detection
  
  adecs = [OneClassSVM(nu=class_contaminations[i]) for i in range(10)]
  
  adecs,tr_accuracy, vl_accuracy,train_TN_rate,train_TP_rate,val_TN_rate,val_TP_rate,bias_detected_ratio = evaluate_adecs_v2(
    adecs,
    tr_labels,
    tr_features,
    tr_bias_labels,
    vl_labels,
    vl_features,
    vl_bias_labels,
    BIAS_AMOUNT,
    -1,
    0,
    save_results_to
    )

  R18_model_iter = TV_RESNET18()
  log_file_path=save_results_to+model_name+".csv"
  learning_rate=LEARNING_RATE_ITER
  # trains the model with bias information
  # mlp_model_w = CmnistMLP.load_model(model_save_path)
  for it in range(STEP):
    print("step",it+1)
    R18_model_iter.train_model_iter(
      train_loader,
      val_loader,
      test_loader,
      adecs,
      ground_truth,
      log_file_path,
      bias_detected_ratio,
      rand,
      learning_rate=LEARNING_RATE_ITER,
      num_epochs=EPOCHS_ITER,
      bias_amount=BIAS_AMOUNT
      )
    
    model_save_path_w = save_results_to+model_name+".pt"
    # to save
    R18_model_iter.save_model(model_save_path_w)
    # to upload
    #mlp_model_w = MnistLP_weighted.load_model(model_save_path_w)
    # extraction of the features from the previous trained model
    
    #y_val, y_true, class_contaminations = R18_model_iter.predict_from(val_loader,max_cont=MAX_CONT)
    tr_features, tr_labels, tr_bias_labels = R18_model_iter.extract_features(train_loader)
    vl_features, vl_labels, vl_bias_labels = R18_model_iter.extract_features(val_loader)
          
    class_contaminations=[0.2 for i in range (10)]
    print(class_contaminations)

    
    adecs = [OneClassSVM(nu=class_contaminations[i]) for i in range(10)]

    adecs,tr_accuracy, vl_accuracy,train_TN_rate,train_TP_rate,val_TN_rate,val_TP_rate,bias_detected_ratio=evaluate_adecs_v2(
      adecs,
      tr_labels,
      tr_features,
      tr_bias_labels,
      vl_labels,
      vl_features,
      vl_bias_labels,
      BIAS_AMOUNT,
      it,
      STEP,
      save_results_to
      )



    

[0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2]
--------------------------------------------------------------------------
Class 0: 
	 -- Train Accuracy (bias_amount=0.95): 0.819
		 -- Train true negative rate: 0.688
		 -- Train true positive rate: 0.826
	 -- Validation Accuracy (bias_amount=0.95): 0.813
		 -- Validation true negative rate: 0.923
		 -- Validation true positive rate: 0.808
--------------------------------------------------------------------------
Class 1: 
	 -- Train Accuracy (bias_amount=0.95): 0.826
		 -- Train true negative rate: 0.763
		 -- Train true positive rate: 0.829
	 -- Validation Accuracy (bias_amount=0.95): 0.828
		 -- Validation true negative rate: 0.955
		 -- Validation true positive rate: 0.822
--------------------------------------------------------------------------
Class 2: 
	 -- Train Accuracy (bias_amount=0.95): 0.846
		 -- Train true negative rate: 0.977
		 -- Train true positive rate: 0.840
	 -- Validation Accuracy (bias_amount=0.95): 0.863
		 

ValueError: Expected more than 1 value per channel when training, got input size torch.Size([1, 512, 1, 1])