<a href="https://colab.research.google.com/github/IIF0403/Thesis/blob/main/Linear_evaluation(u17).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Check GPU
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)

In [None]:
import numpy as np
import pandas as pd 
import torch
import torch.nn as nn
import torch.nn.functional as F
from google.colab import files
from google.colab import output
from google.colab import drive
from torch.nn.utils.rnn import pack_sequence
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from sklearn.model_selection import train_test_split
from copy import deepcopy
import random
from datetime import datetime
from sklearn.metrics import accuracy_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
#Importing Dataset class from other ipynb file
!pip install import_ipynb
drive.mount('/content/drive')
%cd '/content/drive/MyDrive/Colab Notebooks'

#!ls
import import_ipynb
from SimSiam_training import *


In [None]:
##### LINEAR EVALUATION: training classifier on top of frozen model and evaluating #####
def Accuracy_score(labels, preds):
  if len(labels)!=len(preds):
    print("sizes does not match")
  else:
    total=0
    correct=0
    for i in range(len(labels)):
      if labels[i]==preds[i]:
        correct+=1
      total+=1
    return (correct/total)

## Function to calculate classification_accuracy given the test_set
def evaluate_classifier(test_loader, backbone_model, classifier):
  classifier.eval()
  accuracies = []
  for batch in enumerate(test_loader):
    time_series_batch = batch[1]['time_series']
    label_batch = batch[1]['label']

    with torch.no_grad():
      feature = backbone_model(time_series_batch)
      y_hat = classifier(feature)
      pred = torch.max(y_hat,1)[1]
      #print("pred: ", pred)
      #print("true: ", label_batch)
      accuracy = Accuracy_score(label_batch, pred) 
      accuracies.append(accuracy)
  
  Accuracy = np.mean(accuracies)
  return Accuracy


## function to save checkpoint of linear classifier
def save_checkpoint_classifier(SaveName, model, epoch, optimizer, loss_list, accuracy_list, lr, train_bs):
  drive.mount('/content/drive')
  PATH = f"/content/drive/MyDrive/checkpoints/classifier_{SaveName}_{datetime.now().strftime('%d%m')}_ep:{epoch}.pth"
  checkpoint = {'epoch': epoch, 
              'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'loss_list': loss_list, 
              'accuracy_list': accuracy_list,
              'lr': lr,
              'train_bs' : train_bs}
  torch.save(checkpoint, PATH)

## function to load checkpoint of linear classifier
def load_checkpoint_classifier(name,date, e):
  drive.mount('/content/drive')
  PATH = f"/content/drive/MyDrive/checkpoints/classifier_{name}_{date}_ep:{e}.pth"
  checkpoint = torch.load(PATH)

  epoch = checkpoint['epoch']
  train_loss = checkpoint['loss_list'][-1]
  max_accuracy = max(checkpoint['accuracy_list'])

  print("checkpoint:", date," Epoch: ", epoch, " Max_accuracy: ",accuracy," Loss: ", train_loss)
  return checkpoint



## Function to do LINEAR EVALUATION of a model trained with SimSiam
def Linear_evaluation(trained_model, train_set, test_set, SaveName=None, epochs = 10, train_bs = 512, lr=None, backbone = "FCN"):
  #trained model: a model prevoously trained with SimSiam
  #classification_dataset: The dataset to train and evaluate the classification 
  
  #dataset = classification_dataset.Datasets[0]
  #train_class, test_class = data_split(classification_dataset, train_size=0.5) #Split into training and test set for classifier

  if(lr==None):
    base_lr = 0.05
    lr = (base_lr*train_bs)/256

  train_loader = DataLoader(train_set, batch_size = train_bs, shuffle=True)
  test_loader = DataLoader(test_set, batch_size = train_bs, shuffle=True)

  classes = test_set.classes
 
  ### Linear evaluation on the trained model ###
  #Get the frozen backbone of the trained model
  Backbone_model = trained_model.backbone
  Backbone_model = Backbone_model.to(device)
  Backbone_model = nn.DataParallel(Backbone_model)

  #Linear classifier to train in top of the trained frozen model
  Linear = nn.Linear(128, classes).to(device)
  optimizer_linear = torch.optim.Adam(Linear.parameters(), lr=lr)
  accuracies_linear = []
  losses_linear = []

  ### Linear evaluation on a randomly initialized modeø ## (to compare)
  if (backbone == "ResNet"):
    Random_model = ResNet()
  else:
    Random_model = FCN()

  Random_model = Random_model.to(device)
  Linear_random = nn.Linear(128, classes).to(device) #Linear classifier to train on top of random model
  optimizer_random = torch.optim.Adam(Linear_random.parameters(), lr=lr)
  accuracies_random = []
  losses_random = []


  for e in range(epochs):
    Backbone_model.eval()
    Linear.train()
    loss_list_linear = []

    Random_model.eval()
    Linear_random.train()
    loss_list_random = []

    for batch in enumerate(train_loader):
      time_series_batch = batch[1]['time_series']
      label_batch = batch[1]['label']

      #Train linear classifier on top of backbone model
      Linear.zero_grad()
      with torch.no_grad():
        feature = Backbone_model(time_series_batch.to(device))
      pred = Linear(feature.to(device))
      loss_linear = F.cross_entropy(pred, label_batch)
      loss_linear.backward()
      optimizer_linear.step()
      loss_list_linear.append(loss_linear.item())

      Linear_random.zero_grad()
      with torch.no_grad():
        feature_random = Random_model(time_series_batch.to(device))
      pred_random = Linear_random(feature_random.to(device))
      loss_random = F.cross_entropy(pred_random, label_batch)
      loss_random.backward()
      optimizer_random.step()
      loss_list_random.append(loss_random.item())
      
    Loss_linear = np.mean(loss_list_linear)
    losses_linear.append(Loss_linear)
    accuracy_linear = evaluate_classifier(test_loader, Backbone_model, Linear)
    accuracies_linear.append(accuracy_linear)

    Loss_random = np.mean(loss_list_random)
    losses_random.append(Loss_random)
    accuracy_random = evaluate_classifier(test_loader, Random_model, Linear_random)
    accuracies_random.append(accuracy_random)



    #print("e:  ", e, "SimSiam : accuracy; ",accuracy_linear, " loss; ",Loss_linear )
    #print("e:  ", e, " dataset: ", dataset,"  SimSiam:   accuracy; ",accuracy_linear, " loss; ",Loss_linear, "   Random : accuracy; ", accuracy_random, " loss; ", Loss_random)

  results = [dataset, "linear eval", e , train_bs, lr, accuracies_linear[-1], max(accuracies_linear), losses_linear[-1], max(accuracies_random)]
  print(results)

  if (SaveName !=None):
    save_checkpoint_classifier(SaveName, Linear, e, optimizer_linear, losses_linear, accuracies_linear, lr, train_bs) #Save checkpoint
  
  return results


In [None]:
##### Linear evaluation on each dataset of the big dataset #####

#Big_dataset = ["ChlorineConcentration", "Two_Patterns", "yoga", "uWaveGestureLibrary_X", "NonInvasiveFatalECG_Thorax1", "ECG5000", "FordA", "FordB"]
#dat = ["ChlorineConcentration"]

#Load trained model
Datasets = ["ChlorineConcentration", "ECG5000", "ElectricDevices", "FordA", "FordB", "Two_Patterns", "wafer", "yoga"]

#SaveNames = ["Window38","Window38_sep","Horizon03_sep" ]
#SaveNames = ["Horizon03_sep"]
#SaveName = "Window38"
#SaveName = "Window38_sep"
#SaveName = "Horizon03_sep"
#epoch = 99
#date = 2704

SaveNames = ["ResNet_W38"]
epoch = 99
date = 2904

#backbone = "FCN"
backbone = "ResNet"

batch_size = 40
#lr = 0.001
lr=None
labeled_sizes = [0.2,0.1,0.05]
#labeled_sizes = [0.1]

rounds = 10
epochs = 40

Results = []
for SaveName in SaveNames:
  checkpoint = load_checkpoint(SaveName, date, epoch)
  trained_model, optimizer_model, epoch_model = load_trained_model(checkpoint, backbone=backbone)

  for dataset in Datasets:
    for labeled_size in labeled_sizes:
      Train_set = Timeseries_Dataset([dataset], train=True, Save=None, transform = transforms.Compose( [ ToTensor()] ))
      N_train = len(Train_set)

      Train_set.shuffle()
      Train_set_labeled = Train_set[:int(N_train*labeled_size)]

      Test_set = Timeseries_Dataset([dataset], train=False, Save=None, transform = transforms.Compose( [ ToTensor()] ))

      for round in range(rounds):
        Train_set_labeled.shuffle()
        Test_set.shuffle()
        
        Save_checkpoint_name = None

        results = Linear_evaluation(trained_model, Train_set_labeled, Test_set, SaveName=Save_checkpoint_name, epochs = epochs , train_bs = batch_size, lr=lr, backbone = backbone)
        results.append(SaveName)
        results.append(date)
        results.append(labeled_size)
        
        Results.append(results)
  
    #column_names = ["Dataset", "Type", "epochs", "batch_size", "lr", "Accuracy", "Max Accuracy", "Loss", "Random model accuracy", "trained model used", "date model", "labeled size"]
    #to_Excel(Results, column_names, "lin_eval_"+SaveName+".xlsx")



column_names = ["Dataset", "Type", "epochs", "batch_size", "lr", "Accuracy", "Max Accuracy", "Loss", "Random model accuracy", "trained model used", "date model", "labeled size"]
name = "_Linear_"+SaveNames[0]
to_Excel(Results, column_names, "lin_eval_"+name+".xlsx")


