In [None]:
#import libraries
import torch
import torch.nn as nn
import shutil
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset,DataLoader,random_split
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import os
from PIL import Image,ImageFile
import torch.optim as optim
import pandas as pd 
import copy
from torch.optim import lr_scheduler
from google.colab import drive
drive.mount('/content/drive')

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

In [None]:
! unzip "/content/drive/My Drive/Finetuning_Artwork_models/Dataset_arte_fake.zip"

In [None]:
! unzip "/content/drive/My Drive/Finetuning_Artwork_models/Dataset_arte_vera.zip"

In [None]:
#Inserire i due folder(real e fake) in una nuova cartella


fake_dir = 'Dataset_arte_fake'
real_dir = 'Dataset_arte_vera'

os.mkdir("Artworks")

shutil.move(fake_dir,'Artworks')
shutil.move(real_dir,'Artworks')


ARTWORK DATASET


In [5]:
#Classe per gestire gli artwork rispetto alla loro autenticità

class ArtworkDataset(Dataset):
  def __init__(self,links,transform):
      self.data = links
      self.transform = transform

  def __len__(self):
    return self.data.index.shape[0]
    
  def __getitem__(self,idx):
        img = Image.open(self.data.iloc[idx,0])
        label_index = self.data.iloc[idx, 1]
        if (img.mode != 'RGB'):
            img = img.convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label_index


In [6]:
# Creazione del csv contenente il path degli artwork e l'autenticità

path = 'Artworks' 

data = [] # Crea una lista vuota per immagazinare i percorsi degli artwork e le etichette (real o fake)

# Cicla la cartella degli artwork falsi e aggiunge il percorso e l'etichetta alla lista "data"

for dirpath, dirnames, filenames in os.walk(os.path.join(path,fake_dir)):
    for filename in filenames:
        if filename.endswith(".jpg"): # only consider jpg files
            filepath = os.path.join(dirpath, filename)
            data.append((filepath, "0"))


# Cicla la cartella degli artwork vero e aggiunge il percorso e l'etichetta alla lista "data"

for filename in os.listdir(os.path.join(path,real_dir)):
    if filename.endswith(".jpg"): 
       filepath = os.path.join(path,real_dir,filename)
       data.append((filepath,"1"))

# Converte la lista "data" in un dataframe pandas
df = pd.DataFrame(data, columns=["path", "label"])

# Salva il dataframe in un file csv
df.to_csv("image_labels.csv", index=False)
 

LOAD PRETRAINED MODEL

In [7]:
!pip install timm
import timm

model = timm.create_model('vit_small_patch16_224',pretrained=True, num_classes=2)

model = model.to(device)
print(model)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting timm
  Downloading timm-0.6.13-py3-none-any.whl (549 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m549.1/549.1 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting huggingface-hub
  Downloading huggingface_hub-0.14.1-py3-none-any.whl (224 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.5/224.5 kB[0m [31m28.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: huggingface-hub, timm
Successfully installed huggingface-hub-0.14.1 timm-0.6.13
VisionTransformer(
  (patch_embed): PatchEmbed(
    (proj): Conv2d(3, 384, kernel_size=(16, 16), stride=(16, 16))
    (norm): Identity()
  )
  (pos_drop): Dropout(p=0.0, inplace=False)
  (norm_pre): Identity()
  (blocks): Sequential(
    (0): Block(
      (norm1): LayerNorm((384,), eps=1e-06, elementwise_affine=True)
      (attn): Attention(
        (qkv): Linear(in_features=384, o

In [8]:
#Pesi di tutti i modelli resi non allenabili
for param in model.parameters(): 
    param.requires_grad = False


In [9]:
#Reso allenabile l'ultimo layer del modello
for p in model.head.parameters(): 
    p.requires_grad=True

SPLIT IN TRAINING AND VALIDATION SET

In [10]:
dataset = df
dataset['label'] = dataset['label'].astype(int)
dataset

Unnamed: 0,path,label
0,Artworks/Dataset_arte_fake/stylegan3-2/stylega...,0
1,Artworks/Dataset_arte_fake/stylegan3-2/stylega...,0
2,Artworks/Dataset_arte_fake/stylegan3-2/stylega...,0
3,Artworks/Dataset_arte_fake/stylegan3-2/stylega...,0
4,Artworks/Dataset_arte_fake/stylegan3-2/stylega...,0
...,...,...
88146,Artworks/Dataset_arte_vera/henri-fantin-latour...,1
88147,Artworks/Dataset_arte_vera/albrecht-adam_festi...,1
88148,Artworks/Dataset_arte_vera/fons-heijnsbroek_au...,1
88149,Artworks/Dataset_arte_vera/joan-brull_tanqueu-...,1


In [11]:
# Dividi il dataset in due parti (training + validation e test)
train_val_data, test = train_test_split(dataset.values, test_size=0.1, random_state=1)

# Dividi la parte di training + validation nei set di training e validation
train, validation = train_test_split(train_val_data, test_size=0.1, random_state=1)

In [12]:
train_links = pd.DataFrame(train, columns = dataset.columns)
validation_links = pd.DataFrame(validation, columns = dataset.columns)
test_links = pd.DataFrame(test, columns = dataset.columns)

BUILDING DATA LOADERS

In [13]:
data_transforms = transforms.Compose([
                                transforms.Resize(224),
                                transforms.CenterCrop(224),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                               
     
])



batch_size = 32

train_set = ArtworkDataset( train_links, data_transforms)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, 
                               drop_last=False,num_workers=2)

validation_set = ArtworkDataset( validation_links, data_transforms)

validation_loader = DataLoader(validation_set, batch_size=batch_size, shuffle=True, 
                               drop_last=False,num_workers=2)

test_set = ArtworkDataset( test_links,data_transforms)

test_loader = DataLoader(test_set,batch_size=batch_size, shuffle = True,
                              drop_last=False,num_workers=2)


TRAINING

In [14]:
class EarlyStopping():
    """
    Early stopping to stop the training when the loss does not improve after
    certain epochs.
    """
    def __init__(self, patience=5, min_delta=0.001):
        """
        :param patience: how many epochs to wait before stopping when loss is
               not improving
        :param min_delta: minimum difference between new loss and old loss for
               new loss to be considered as an improvement
        """
        self.patience = patience
        self.min_delta = min_delta
        self.wait = 0
        self.best_loss = None
        self.early_stop = False
    def __call__(self, current_loss):
        if self.best_loss == None:
            self.best_loss = current_loss
        elif (current_loss - self.best_loss) < -self.min_delta:
            self.best_loss = current_loss
            self.wait = 0
            torch.save(model.state_dict(),'RealArt_vs_FakeArt_vit_small_patch16_224.pt')
            if os.path.exists('/content/drive/My Drive/Finetuning_Artwork_models/Modelli/RealArt_vs_FakeArt_vit_small_patch16_224.pt'):
               os.remove('/content/drive/My Drive/Finetuning_Artwork_models/Modelli/RealArt_vs_FakeArt_vit_small_patch16_224.pt')
               shutil.move('RealArt_vs_FakeArt_vit_small_patch16_224.pt','/content/drive/My Drive/Finetuning_Artwork_models/Modelli')
            else:
               shutil.move('RealArt_vs_FakeArt_vit_small_patch16_224.pt','/content/drive/My Drive/Finetuning_Artwork_models/Modelli')
            
        else:
            self.wait = self.wait + 1
            print(f"INFO: Early stopping counter {self.wait} of {self.patience}")
            if self.wait >= self.patience:
                self.early_stop = True

In [15]:
def fine_tune(model, train_loader, validation_loader, criterion, optimizer, scheduler, early_stop ,num_epochs = 100):
    best_model = copy.deepcopy(model)
    best_acc = 0.0
    best_epoch=0
    stop = False
    
    for epoch in range(1, num_epochs + 1):
        if stop:
            break
        print(f'Epoch {epoch}/{num_epochs}')
        print('-'*120)

        data_loader = None
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                data_loader = train_loader
            else:
                model.eval()   # Set model to evaluate mode
                data_loader = validation_loader

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in tqdm(data_loader):
                
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    outputs = nn.Softmax(dim = 1)(outputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / (len(data_loader) * data_loader.batch_size)
            epoch_acc = running_corrects.double() / (len(data_loader) * data_loader.batch_size)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            
            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_epoch = epoch
                best_model = copy.deepcopy(model)
               
                
            if phase == 'val':
                early_stop(epoch_loss)
                print('-'*120, end = '\n\n')
                stop=early_stop.early_stop
                
                
    print(f'Best val Acc: {best_acc:4f}')
    print(f'Best epoch: {best_epoch:03d}')

    # load best model 
    return best_model         

In [16]:
if not 'RealArt_vs_FakeArt_vit_small_patch16_224.pt' in os.listdir():
   criterion = nn.CrossEntropyLoss()
   optimizer = optim.Adam(model.parameters(), lr=1e-3)
   scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
   early_stop= EarlyStopping(patience = 3, min_delta = 0.001)
   ImageFile.LOAD_TRUNCATED_IMAGES = True
   best_model_head=fine_tune(model, train_loader, validation_loader, criterion, optimizer, scheduler, early_stop, num_epochs = 30)
   torch.save(best_model_head, 'RealArt_vs_FakeArt_vit_small_patch16_224.pt')
   if os.path.exists('/content/drive/My Drive/Finetuning_Artwork_models/Modelli/RealArt_vs_FakeArt_vit_small_patch16_224.pt'):
               os.remove('/content/drive/My Drive/Finetuning_Artwork_models/Modelli/RealArt_vs_FakeArt_vit_small_patch16_224.pt')
               shutil.move('RealArt_vs_FakeArt_vit_small_patch16_224.pt','/content/drive/My Drive/Finetuning_Artwork_models/Modelli')
   else:
               shutil.move('RealArt_vs_FakeArt_vit_small_patch16_224.pt','/content/drive/My Drive/Finetuning_Artwork_models/Modelli')

Epoch 1/30
------------------------------------------------------------------------------------------------------------------------


100%|██████████| 2232/2232 [05:12<00:00,  7.15it/s]


train Loss: 0.3453 Acc: 0.9686


100%|██████████| 248/248 [00:34<00:00,  7.12it/s]


val Loss: 0.3368 Acc: 0.9758
------------------------------------------------------------------------------------------------------------------------

Epoch 2/30
------------------------------------------------------------------------------------------------------------------------


100%|██████████| 2232/2232 [05:02<00:00,  7.37it/s]


train Loss: 0.3363 Acc: 0.9761


100%|██████████| 248/248 [00:33<00:00,  7.41it/s]


val Loss: 0.3359 Acc: 0.9761
INFO: Early stopping counter 1 of 3
------------------------------------------------------------------------------------------------------------------------

Epoch 3/30
------------------------------------------------------------------------------------------------------------------------


100%|██████████| 2232/2232 [05:01<00:00,  7.39it/s]


train Loss: 0.3353 Acc: 0.9771


100%|██████████| 248/248 [00:34<00:00,  7.26it/s]


val Loss: 0.3361 Acc: 0.9758
INFO: Early stopping counter 2 of 3
------------------------------------------------------------------------------------------------------------------------

Epoch 4/30
------------------------------------------------------------------------------------------------------------------------


100%|██████████| 2232/2232 [05:01<00:00,  7.40it/s]


train Loss: 0.3345 Acc: 0.9776


100%|██████████| 248/248 [00:33<00:00,  7.32it/s]


val Loss: 0.3362 Acc: 0.9754
INFO: Early stopping counter 3 of 3
------------------------------------------------------------------------------------------------------------------------

Best val Acc: 0.976058
Best epoch: 002


TESTING

In [17]:
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

# funzione per il testing del modello
def test_model(model, test_loader):
    model.eval() # imposto il modello in modalità di valutazione
    test_loss = 0
    correct = 0
    pred_list = []
    true_list = []
    # inizializza la barra di avanzamento
    pbar = tqdm(total=len(test_loader))
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item() # sommo il loss di ogni batch
            
            pred = output.argmax(dim=1, keepdim=True) # ottengo la predizione del modello
            pred_list.extend(pred.cpu().numpy()) # aggiungo la predizione alla lista
            true_list.extend(target.cpu().numpy()) # aggiungo il target alla lista
            
            correct += pred.eq(target.view_as(pred)).sum().item() # aggiorno il contatore di classificazioni corrette
            # aggiorna la barra di avanzamento
            pbar.update(1)
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    recall = recall_score(true_list, pred_list, average='macro') # calcolo la recall
    precision = precision_score(true_list, pred_list, average='macro') # calcolo la precision
    f1 = f1_score(true_list, pred_list, average='macro') # calcolo la F1 score
    auc = roc_auc_score(true_list, pred_list) # calcolo l'AUC
    
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%), Recall: {:.2f}%, Precision: {:.2f}%, F1: {:.2f}%, AUC: {:.2f}%\n'.format(
        test_loss, correct, len(test_loader.dataset), accuracy, recall*100, precision*100, f1*100, auc*100))
    
    return accuracy, recall, precision, f1, auc

In [18]:
accuracy,recall,precision,f1,auc = test_model(model,test_loader)

100%|██████████| 276/276 [00:41<00:00,  6.73it/s]



Test set: Average loss: 0.1109, Accuracy: 8603/8816 (97.58%), Recall: 97.59%, Precision: 97.52%, F1: 97.55%, AUC: 97.59%

