In [8]:
pip install --user torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

Looking in indexes: https://download.pytorch.org/whl/cu118
Collecting torchaudio
  Using cached https://download.pytorch.org/whl/cu118/torchaudio-2.5.1%2Bcu118-cp312-cp312-win_amd64.whl (4.0 MB)
Collecting torch
  Using cached https://download.pytorch.org/whl/cu118/torch-2.5.1%2Bcu118-cp312-cp312-win_amd64.whl (2700.1 MB)
Installing collected packages: torch, torchaudio
Successfully installed torch-2.5.1+cu118 torchaudio-2.5.1+cu118
Note: you may need to restart the kernel to use updated packages.


In [4]:
##installs pytorch on a cuda-capable windows machine using pip

%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

%pip install torchsummary

%pip install numpy

%pip install matplotlib

%pip install opencv-python

%pip install scikit-learn

%pip install pandas

%pip install pickle5

SyntaxError: invalid syntax (3373289064.py, line 3)

In [1]:
import torch
import torch.nn as nn
import os
import cv2
import numpy as np
import pandas as pd
import csv
import matplotlib.pyplot as plt
import warnings
from tqdm import tqdm
warnings.filterwarnings('ignore')

In [2]:
from torchvision import models
from torchvision import transforms
from sklearn.svm import LinearSVC
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier


In [3]:
if(os.path.split(os.getcwd())[1] == "Transfer_Learning" or os.path.split(os.getcwd())[1] == "semi-supervised"):
    os.chdir("..")
print("Current Working Directory: {}".format(os.path.split(os.getcwd())[1]))

Current Working Directory: progettoVIPM


In [4]:
cuda=False

In [5]:
from torch.utils.data import Dataset
from enum import Enum

class datasets(Enum):
    TRAINING_LABELED = ["train_small.csv", "train_set"]
    TRAINING_UNLABELED = ["train_unlabeled.csv", "train_set"]
    TEST = ["val_info.csv", "val_set"]
    TEST_DEGRADED = ["val_info.csv", "val_set_degraded"]

class networks(Enum):
    ALEXNET = [227, models.alexnet(pretrained=True), "AlexNet"] #227?
    RESNET50 = [224, models.resnet50(pretrained=True), "ResNet50"]
    GOOGLENET = [224, models.googlenet(pretrained=True), "GoogLeNet"]
    MOBILENET = [224, models.mobilenet_v3_small(pretrained=True), "mobilenet_v3_small"]

class ImageDataset(Dataset):
    def __init__(self, dataset, network_input_size):
        super().__init__()
        self.images_names = []
        self.labels = []
        dataset = dataset.value
        annotations_file = dataset[0]
        img_dir = dataset[1]
        with open(annotations_file, newline='') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                self.images_names.append("./{}/{}".format(img_dir, row[0]))
                self.labels.append(row[1])
        self.images_names = np.array(self.images_names)
        self.labels = np.array(self.labels)
        # in base al valore passato si sceglie la rete che utilizzerà il dataset, serve per modificare le dimensioni delle immagini
        self.im_size = network_input_size
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index):
        image_bgr = cv2.resize((cv2.imread(self.images_names[index], cv2.IMREAD_COLOR).astype(np.double)/255), 
                                       (self.im_size,self.im_size), 
                                        interpolation=cv2.INTER_CUBIC).astype(np.float32)
        image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
        #moveaxis serve per avere come dimensione dell'immagine (3, righe, colonne) invece di (righe, colonne, 3)
        image = np.moveaxis(image_rgb, -1, 0)
        # eventualmente si può aggiungere l'alternativa di fare random cropping dell'immagine
        label = self.labels[index]           
        if(cuda):
            return torch.from_numpy(image).cuda(), label
        else:
            return image, label

In [6]:
from torchsummary import summary
from torch.utils.data import DataLoader
from copy import deepcopy

def extract_features_of_dataset(dataset, dataset_type, input_size, transfer_network, data_file):
    if(os.path.exists(data_file)):
        numpy_feat = np.load(data_file).astype("float32")
        if(cuda):            
            net_features = torch.from_numpy(numpy_feat).to(device="cuda")  
        else:
            net_features=-1
        y = [] #if the dataset is unlabled it will remain empty
        if (dataset_type != "unlabled"):    
            annotations_file = dataset.value[0]
            with open(annotations_file, newline='') as csvfile:
                reader = csv.reader(csvfile)
                for row in reader:
                    y.append(row[1])
            y = np.array(y).astype("int")        
    else:
        dataset_holder = ImageDataset(dataset=dataset, network_input_size=input_size)
        loader = DataLoader(dataset=dataset_holder, shuffle=False, batch_size=1)

        if(cuda):      
            net_features = torch.zeros(len(dataset_holder), transfer_network.classifier[-1].out_features)
            y = np.zeros(len(dataset_holder)).astype("int")
            transfer_network.eval()
            with torch.no_grad():
                i = 0               
                print("extracting features:")
                for X_batch, y_batch in tqdm(loader):
                    net_features[i]=transfer_network(X_batch)                           
                    if (dataset_type != "unlabled"):  
                        y[i] = y_batch[0]
                    i=i+1    
                    
            #The following code copies the neural features to a numpy array stored in the cpu in order to use it in sklearn(non-neural) classifiers
            numpy_feat = np.zeros(net_features.shape)
            i = 0 
            print("copying features:")
            for i in tqdm(range(len(net_features))):
                numpy_feat[i]= net_features[i].cpu().numpy()    
            np.save(data_file, numpy_feat)

        else: #non-cuda case
            
            net_features=-1
            numpy_feat = np.zeros((len(dataset_holder), transfer_network.classifier[-1].out_features))
            y = np.zeros(len(dataset_holder)).astype("int")
            transfer_network.eval()
            with torch.no_grad():
                i = 0               
                print("extracting features:")
                for X_batch, y_batch in tqdm(loader):
                    numpy_feat[i]=transfer_network(X_batch)                           
                    if (dataset_type != "unlabled"):  
                        y[i] = y_batch[0]
                    i=i+1    
            np.save(data_file, numpy_feat)
            
    print("Done feat extraction, total n° of istances in {}: {}".format(dataset_type, len(numpy_feat)))
    print("Feature vector shape of {}: {}".format(dataset_type, numpy_feat.shape))
    if (dataset_type != "unlabled"):  
        print("Label vector shape of {}: {}".format(dataset_type, y.shape))

    return net_features, numpy_feat, y
    

def extrac_features(train_set, test_set, network, layers_to_remove):
    
    net_input_size = network.value[0]
    net = deepcopy(network.value[1])
    if (cuda):
        net.cuda()
    fine_tune_layers = nn.Sequential(*[net.classifier[i] for i in range((len(net.classifier) - layers_to_remove), len(net.classifier))])
    net.classifier = nn.Sequential(*[net.classifier[i] for i in range(len(net.classifier) - layers_to_remove)])

    train_data_file = "./Transfer_Learning/neural_features/Train_{}_minus{}_{}.npy".format(network.value[2], layers_to_remove, train_set.value[1])
    net_features_train, numpy_feat_train, y_train = extract_features_of_dataset(dataset=train_set, dataset_type="Train",
                                                                                input_size=net_input_size,
                                                                                transfer_network=net,data_file=train_data_file)
    
    test_data_file = "./Transfer_Learning/neural_features/Test_{}_minus{}_{}.npy".format(network.value[2], layers_to_remove, test_set.value[1])
    net_features_test, numpy_feat_test, y_test = extract_features_of_dataset(dataset=test_set, dataset_type="Test",
                                                                                input_size=net_input_size,
                                                                                transfer_network=net,data_file=test_data_file)
    
    return net_features_train, numpy_feat_train, y_train, net_features_test, numpy_feat_test, y_test, fine_tune_layers
    

    
def extrac_features_from_unlabled_dataset(dataset, network, layers_to_remove):
    
    net_input_size = network.value[0]
    net = deepcopy(network.value[1])
    
    fine_tune_layers = nn.Sequential(*[net.classifier[i] for i in range((len(net.classifier) - layers_to_remove), len(net.classifier))])
    net.classifier = nn.Sequential(*[net.classifier[i] for i in range(len(net.classifier) - layers_to_remove)])

    unlabled_data_file = "./Transfer_Learning/neural_features/Unlabled_{}_minus{}_{}.npy".format(network.value[2], layers_to_remove, dataset.value[1])
    net_features, numpy_feat, y = extract_features_of_dataset(dataset=dataset, dataset_type="unlabled",
                                                                                input_size=net_input_size,
                                                                                transfer_network=net,data_file=unlabled_data_file)  
    fake_labels = [-1 for _ in range(len(numpy_feat))]  
    return net_features, numpy_feat, fake_labels, fine_tune_layers
    

In [7]:
# t sta per "tensor", ovvero il vettore sulla gpu, mentre "n" sta per "numpy", ovvero il vettore sulla cpu
chosen_net = networks.MOBILENET
layers_to_remove = 3
X_train_mobi_t, X_train_mobi_n, y_train, X_test_mobi_t, X_test_mobi_n, y_test, fine_tune_layers = extrac_features(train_set=datasets.TRAINING_LABELED,
                                                                                            test_set=datasets.TEST,
                                                                                            network=chosen_net,
                                                                                            layers_to_remove=layers_to_remove)
print("Original classification layers:{}".format(chosen_net.value[1].classifier[:]))
print("Classification layers to fine tune:{}".format(fine_tune_layers[:]))

Done feat extraction, total n° of istances in Train: 5020
Feature vector shape of Train: (5020, 1024)
Label vector shape of Train: (5020,)
Done feat extraction, total n° of istances in Test: 11994
Feature vector shape of Test: (11994, 1024)
Label vector shape of Test: (11994,)
Original classification layers:Sequential(
  (0): Linear(in_features=576, out_features=1024, bias=True)
  (1): Hardswish()
  (2): Dropout(p=0.2, inplace=True)
  (3): Linear(in_features=1024, out_features=1000, bias=True)
)
Classification layers to fine tune:Sequential(
  (0): Hardswish()
  (1): Dropout(p=0.2, inplace=True)
  (2): Linear(in_features=1024, out_features=1000, bias=True)
)


In [8]:
if (cuda):
    print(X_train_mobi_t.shape)
print(X_train_mobi_n.shape)
print(y_train.shape)
if (cuda):
    print(X_test_mobi_t.shape)
print(X_test_mobi_n.shape)
print(y_test.shape)

(5020, 1024)
(5020,)
(11994, 1024)
(11994,)


In [9]:
# t sta per "tensor", ovvero il vettore sulla gpu, mentre "n" sta per "numpy", ovvero il vettore sulla cpu
chosen_net = networks.MOBILENET
layers_to_remove = 3
X_unlabled_mobi_t, X_unlabled_mobi_n, fake_labels, fine_tune_layers = extrac_features_from_unlabled_dataset(dataset=datasets.TRAINING_UNLABELED,
                                                                                            network=chosen_net,
                                                                                            layers_to_remove=layers_to_remove)
print("Original classification layers:{}".format(chosen_net.value[1].classifier[:]))
print("Classification layers to fine tune:{}".format(fine_tune_layers[:]))

Done feat extraction, total n° of istances in unlabled: 113455
Feature vector shape of unlabled: (113455, 1024)
Original classification layers:Sequential(
  (0): Linear(in_features=576, out_features=1024, bias=True)
  (1): Hardswish()
  (2): Dropout(p=0.2, inplace=True)
  (3): Linear(in_features=1024, out_features=1000, bias=True)
)
Classification layers to fine tune:Sequential(
  (0): Hardswish()
  (1): Dropout(p=0.2, inplace=True)
  (2): Linear(in_features=1024, out_features=1000, bias=True)
)


In [10]:
print(X_unlabled_mobi_n.shape)

(113455, 1024)


In [23]:
num_rows = X_unlabled_mobi_n.shape[0]
partOfUnlabeledToSelect = 0.10
num_samples = int(num_rows * partOfUnlabeledToSelect)
np.random.seed(42)
# Get random indices
random_indices = np.random.choice(num_rows, num_samples, replace=False)

# Select the random rows
X_selected = X_unlabled_mobi_n[random_indices, :]
#y_selected = fake_labels[random_indices]

In [24]:
X_train_mixed = np.concatenate((X_train_mobi_n, X_selected))#prima era X_train_mobi_n, X_unlabled_mobi_n
print(X_train_mixed.shape)
y_train_mixed = np.concatenate((y_train, fake_labels[:num_samples]))#prima era y_train, fake_labels
print(y_train_mixed.shape)

(16365, 1024)
(16365,)


In [25]:
import pickle
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

def model_building(model, model_name, X_train, X_test, y_train, y_test):
    model.fit(X_train,y_train)
    y_pred = model.predict(X_test)
    acc=accuracy_score(y_test, y_pred)
    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=model.classes_)
    disp.plot()
    plt.savefig("./semi-supervised/model_metrics/ConfM_{}.pdf".format(model_name))
    cm = np.array(cm)
    np.save("./semi-supervised/model_metrics/ConfM_{}.npy".format(model_name), cm)
    with open('./semi-supervised/models/{}.pkl'.format(model_name),'wb') as f:
        pickle.dump(model,f)
    return acc

In [26]:
from sklearn.semi_supervised import LabelSpreading

model  = [LabelSpreading(kernel="knn", max_iter=5, n_jobs=-1, n_neighbors=3), "LabelSpreading"]
model[0].fit( X_train_mixed, y_train_mixed)
print("finito di inferire le labels sugli non annotati")

guessedAndRealLabels = model[0].transduction_


model2 = RandomForestClassifier()
model2.fit(X_train_mixed, guessedAndRealLabels) 
print("finito di fittare il classificatore")
guessedTestLabels = model2.predict(X_test_mobi_n)
score = accuracy_score(y_test, guessedTestLabels)
# summarize score
print('Accuracy: %.3f' % (score*100))
np.save("./semi-supervised/model_metrics/Accuracies_{}_minus{}_randomForest_labelSpreading_{}_ofunlabeled.npy".format(chosen_net, layers_to_remove, partOfUnlabeledToSelect), score)
# 5% unlabeled ->6.9 di acc
# 10% unlabeled ->6.9 di acc

finito di inferire le labels sugli non annotati
finito di fittare il classificatore
Accuracy: 6.053
