libraries

In [40]:
from PIL import Image
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import os
from sklearn.preprocessing import MultiLabelBinarizer
import random
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, hamming_loss
import time
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import torch.nn.functional as F

Configuration

In [41]:
BATCH_SIZE = 64
EPOCHS = 10
LEARNING_RATE = 0.001
#DEVICE = torch.device("cpu")
DEVICE = torch.device("mps")
UNFREEZE_EVERY_N_EPOCHS = 16

Load your data

In [42]:
df = pd.read_csv('art/artelingo_release.csv')
df = df[:1000]

# Update image paths
df['image_file'] = df['image_file'].str.replace('YOUR/PATH/TO/WIKIART', 'wikiart')


Data

In [43]:
def preprocess_dataset(df):
    # Iterate through the dataframe and drop rows with non-existing image paths
    indices_to_drop = []
    for idx, row in df.iterrows():
        image_path = row['image_file']
        if not os.path.exists(image_path):
            #print(f"Image not found: {image_path}. Skipping...")
            indices_to_drop.append(idx)

    df = df.drop(indices_to_drop)
    df.reset_index(drop=True, inplace=True)
    return df


paint_emotion_dict = {}
for i in range(0,1000) :
    if df.iloc[i,-2] not in paint_emotion_dict.keys():
        paint_emotion_dict[df.iloc[i,-2]] = []
    paint_emotion_dict[df.iloc[i,-2]].append(df.iloc[i,1])

paint_emotion_dict

df2 = pd.DataFrame()

paths = []
labels = []
for k,v in paint_emotion_dict.items():
    paths.append(k)
    labels.append(sorted(list(set(v))))

df2['image_file'] = paths
df2['emotions'] = labels

df2.head()

# Preprocess the dataset to remove rows with non-existing image paths
preprocessed_df = preprocess_dataset(df2)

preprocessed_df.head(3)

# Define your transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

NUM_CLASSES = len(df['emotion_label'].unique())

EmotionDataset

In [44]:
class EmotionDataset(Dataset):
    def __init__(self, df, transform=None, subset_percentage=0.2):
        self.df = df.sample(frac=subset_percentage, random_state=42)  # Randomly sample a subset of data
        self.transform = transform
        self.mlb = MultiLabelBinarizer()
        self.labels = self.mlb.fit_transform(self.df.emotions)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row['image_file']).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return image, label

# Create dataset
dataset = EmotionDataset(preprocessed_df, transform)

Train - Test split

In [45]:
train_size = int(0.8 * len(dataset))
validation_size = len(dataset) - train_size
train_dataset, validation_dataset = torch.utils.data.random_split(dataset, [train_size, validation_size])

Data Loader

In [46]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=False)

VGG Model

In [47]:
# Load pre-trained VGG model
model = models.vgg16(pretrained=True)



# Freeze the model
for param in model.parameters():
    param.requires_grad = True

print(model)



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

Model changes

In [48]:
""" # Replace the classifier (fully connected) layers
num_ftrs = model.classifier[6].in_features   #model.classifier typically refers to the fully connected layer
model.classifier[6] = nn.Sequential(         #in_features refers to the number of input nodes
    nn.Linear(num_ftrs, 768),
    nn.ReLU(),
    nn.BatchNorm1d(768),
    nn.Dropout(0.5),
    nn.Linear(768, 128),
    nn.ReLU(),
    nn.BatchNorm1d(128),
    nn.Dropout(0.5),
    nn.Linear(128, NUM_CLASSES),
    nn.Sigmoid()
) """

model = model.to(DEVICE)

criterion = nn.L1Loss()
#criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
print(model.classifier[0])

Linear(in_features=25088, out_features=4096, bias=True)


In [49]:
# DEVICE = torch.device("cpu")
# input= 'wikiart/Ukiyo_e/hiroshige_a-bridge-across-a-deep-gorge.jpg' 
# image = Image.open(input)
# #image.show()

# transform = transforms.Compose([
#     transforms.Resize((224, 224)),  # Resize the image to the size expected by the model
#     transforms.ToTensor(),  # Convert the image to a PyTorch tensor
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet mean and std
# ])

     

# def output_shape(model, layer_index, input_tensor):
#     # Extract layers up to and including the target layer
#     layers = list(model.features.children())
#     target_layers = nn.Sequential(*layers[:layer_index + 1])
    
#     # Perform a forward pass through the target layers
#     with torch.no_grad():
#         output = target_layers(input_tensor)
    
#     # Return the output and its shape
#     return output, output.shape


# layer_index = 30 ##########

# image_tensor = transform(image)
# print("image shape:", image_tensor.shape)
# print("______")


# #target_layer = target_layer()
# out_put, out_size = output_shape(model,layer_index, image_tensor)
# print("Output shape of the  layer:", out_size)
# channels = out_size[0]
# print(out_size[0])
# print("______")



# pooled_output = F.adaptive_avg_pool2d(out_put, (1, 1))
# print("Output shape  ", pooled_output.shape)
# print("______")

# falttened_output = torch.flatten(pooled_output) 
# print("flattened shape  ", falttened_output.shape)
# print("______")

# DEVICE = torch.device("mps")

VGG Layers

In [50]:
x = torch.randn(64, 64, 112, 112)

x = F.adaptive_avg_pool2d(x,(1,1))

x = x.squeeze()

# print(x.shape) 

In [51]:
x.shape

torch.Size([64, 64])

In [52]:
class ModifiedVGG(nn.Module):
    def __init__(self, original_model,layer_index = 5):
        super(ModifiedVGG, self).__init__()
        # #num_ftrs = original_model.classifier[6].in_features 
        # print(num_ftrs)
        # self.classifier[6] = nn.Sequential(
        #     nn.Linear(192, 10),
        #     nn.ReLU(),
        #     nn.BatchNorm1d(768),
        #     nn.Dropout(0.5),
        #     nn.Linear(768, 128),
        #     nn.ReLU(),
        #     nn.BatchNorm1d(128),
        #     nn.Dropout(0.5),
        #     nn.Linear(128, NUM_CLASSES),
        #     nn.Sigmoid()
        # )
        
        self.classifier = nn.Linear(128, NUM_CLASSES)                           #0-64 /// 5-128 // 10-256 // 30-512
        layers = list(original_model.features.children())
        self.back_bone = nn.Sequential(*layers[:layer_index+1])
        #self.back_bone = nn.Sequential(*layers[:layer_index + 1])
        
        # Global Average Pooling
        # x = F.adaptive_avg_pool2d(out_put, (1, 1))
        
        self.pool=F.adaptive_avg_pool2d
         
    
    def forward(self, x):
        #print(2)
        output = self.back_bone(x) 
        #print(3)
        #print(output.shape)
        pooled_output = self.pool(output,(1, 1))
        pooled_output = pooled_output.squeeze()
        #print(4)
        # falttened_output = torch.flatten(pooled_output)  
        #print(pooled_output.shape)
        #print(5)
   
        
        
        final_output = self.classifier(pooled_output)
        #print(6)
        return final_output
    


# Load the original VGG model
original_model = models.vgg16(pretrained=True)

# Create an instance of the modified model
model = ModifiedVGG(original_model,layer_index=5)
model.to(DEVICE)



ModifiedVGG(
  (classifier): Linear(in_features=128, out_features=9, bias=True)
  (back_bone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
)

accuracy

In [53]:
def accuracy_score_cal(trues, preds):
    # print(trues.shape)
    total_acc = 0
    for row1, row2 in zip(trues, preds):
        total_acc += accuracy_score(row1, row2, normalize=False)
        # print(f'current acc {total_acc}')
        # print(f'row1 : {row1}\nrow : {row2}')
    return total_acc

history = {'epoch': [], 'train_loss': [], 'train_acc': [], 'train_f1': [], 'train_precision': [], 'train_recall': [], 'train_hamming_loss':[],
               'val_loss': [], 'val_acc': [], 'val_f1': [], 'val_precision': [], 'val_recall': [], 'val_hamming_loss':[]}

Model Training

In [55]:
val_loader = validation_loader
device = DEVICE
num_epochs = 50
csv_file = "output.csv"
since = time.time()
best_val_acc = 0.0


for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0.0
    train_total = 0.0
    train_predictions = []
    train_targets = []
    
    # Record the start time for this epoch
    start_time = time.time()

    # Initialize cumulative metrics for training
    train_hamming_loss_cum = 0.0
    train_f1_cum = 0.0
    train_precision_cum = 0.0
    train_recall_cum = 0.0

    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        predicted = (outputs > 0.5).float()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Convert predicted and labels tensors to NumPy arrays
        predicted_numpy = predicted.cpu().numpy()
        labels_numpy = labels.cpu().numpy()



        # Update metrics
        batch_loss = loss.item()
        #batch_total = labels.size(0)
        batch_total = images.size(0)
        batch_correct = (predicted_numpy == labels_numpy).sum()
        #batch_correct = accuracy_score_cal(labels_numpy, predicted_numpy)
        
        
        """ train_loss += loss.item() * images.size(0)
        train_correct += accuracy_score_cal(labels_numpy, predicted_numpy)
        train_total += labels.size(0) """
        
        #train_loss += batch_loss * batch_total
        train_loss += loss.item() * images.size(0)
        #train_correct += batch_correct * batch_total
        train_correct += accuracy_score_cal(labels_numpy, predicted_numpy)
        #train_total += batch_total
        train_total += 64*9

        train_predictions.extend(predicted_numpy.tolist())
        train_targets.extend(labels_numpy.tolist())

        # Calculate batch metrics
        train_hamming_loss_batch = hamming_loss(train_targets, train_predictions)
        train_f1_batch = f1_score(train_targets, train_predictions, average='micro')
        train_precision_batch = precision_score(train_targets, train_predictions, average='micro')
        train_recall_batch = recall_score(train_targets, train_predictions, average='micro')

       
       # Print batch-level metrics
        print("---",f"Epoch {epoch+1}/{num_epochs} - Batch {batch_idx+1}/{len(train_loader)} - "
              f"Batch Loss: {batch_loss:.4f} - Batch Acc: {batch_correct / batch_total:.4f} - "
              f"Batch Hamming Loss: {train_hamming_loss_batch:.4f} - "
              f"Batch F1: {train_f1_batch:.4f} - Batch Precision: {train_precision_batch:.4f} - Batch Recall: {train_recall_batch:.4f}")

       
        # Accumulate batch metrics
        train_hamming_loss_cum += train_hamming_loss_batch
        train_f1_cum += train_f1_batch
        train_precision_cum += train_precision_batch
        train_recall_cum += train_recall_batch

    # After processing all batches in the epoch
    train_loss /= train_total
    train_acc = train_correct / train_total

    # Calculate average metrics for the entire epoch
    train_hamming_loss = train_hamming_loss_cum / len(train_loader)
    train_f1 = train_f1_cum / len(train_loader)
    train_precision = train_precision_cum / len(train_loader)
    train_recall = train_recall_cum / len(train_loader)


    
    model.eval()
    val_loss = 0.0
    val_correct = 0.0
    val_total = 0.0
    val_total_acc = 0.0
    val_predictions = []
    val_targets = []

    # Initialize cumulative metrics for validation
    val_hamming_loss_cum = 0.0
    val_f1_cum = 0.0
    val_precision_cum = 0.0
    val_recall_cum = 0.0

    with torch.no_grad():
      for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        predicted = (outputs > 0.5).float()
        loss = criterion(outputs, labels)

        # Convert predicted and labels tensors to NumPy arrays
        predicted_numpy = predicted.cpu().numpy()
        labels_numpy = labels.cpu().numpy()



        val_loss += loss.item() * images.size(0)
        val_correct += accuracy_score_cal(labels_numpy, predicted_numpy)
        val_total +=labels.size(0)
        val_total_acc += 64*9

        val_predictions.extend(predicted_numpy.tolist())
        val_targets.extend(labels_numpy.tolist())

        # Calculate validation metrics
        val_loss /= val_total
        val_acc = val_correct / val_total_acc
        val_hamming_loss = hamming_loss(val_targets, val_predictions)
        val_f1 = f1_score(val_targets, val_predictions, average='micro')
        val_precision = precision_score(val_targets, val_predictions, average='micro')     
        val_recall = recall_score(val_targets, val_predictions, average='micro')
      
        # Calculate elapsed time for this epoch
        time_elapsed = time.time() - start_time

        # Print the validation metrics for the epoch
        print(f"Epoch {epoch+1}/{num_epochs} - "
                f"Train Loss: {train_loss:.4f} - Train Acc: {train_acc:.4f} - "
                f"Train Hamming Loss: {train_hamming_loss:.4f} - "
                f"Train F1: {train_f1:.4f} - Train Precision: {train_precision:.4f} - Train Recall: {train_recall:.4f} - "
                f"Val Loss: {val_loss:.4f} - Val Acc: {val_acc:.4f} - "
                f"Val Hamming Loss: {val_hamming_loss:.4f} - "
                f"Val F1: {val_f1:.4f} - Val Precision: {val_precision:.4f} - Val Recall: {val_recall:.4f} - "
                f"Time elapsed: {time_elapsed:.0f}s")

        # Append metrics to history dictionary
        history['epoch'].append(epoch + 1)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['train_hamming_loss'].append(train_hamming_loss)
        history['train_f1'].append(train_f1)
        history['train_precision'].append(train_precision)
        history['train_recall'].append(train_recall)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['val_hamming_loss'].append(val_hamming_loss)
        history['val_f1'].append(val_f1)
        history['val_precision'].append(val_precision)
        history['val_recall'].append(val_recall)

        if val_acc > best_val_acc:
          best_val_acc = val_acc
          torch.save(model.state_dict(), "best_model_vgg.pth")

    


# Final print
time_elapsed = time.time() - since
print(f"Training complete in {time_elapsed:.0f}s")
print(f"Best validation accuracy: {best_val_acc:.4f}")

# Save history dictionary to a CSV file
df = pd.DataFrame(history)
df.to_csv(csv_file, index=False)

print(history)

# Plotting
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history['epoch'], history['train_loss'], label='Train Loss')
plt.plot(history['epoch'], history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training & Validation Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['epoch'], history['train_acc'], label='Train Accuracy')
plt.plot(history['epoch'], history['val_acc'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training & Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

--- Epoch 1/50 - Batch 1/3 - Batch Loss: 1.0188 - Batch Acc: 5.2500 - Batch Hamming Loss: 0.4167 - Batch F1: 0.1176 - Batch Precision: 0.0769 - Batch Recall: 0.2500
--- Epoch 1/50 - Batch 2/3 - Batch Loss: 0.8748 - Batch Acc: 5.6406 - Batch Hamming Loss: 0.3950 - Batch F1: 0.1366 - Batch Precision: 0.0902 - Batch Recall: 0.2812
--- Epoch 1/50 - Batch 3/3 - Batch Loss: 0.7822 - Batch Acc: 5.6552 - Batch Hamming Loss: 0.3907 - Batch F1: 0.1321 - Batch Precision: 0.0877 - Batch Recall: 0.2675
Epoch 1/50 - Train Loss: 0.0833 - Train Acc: 0.4983 - Train Hamming Loss: 0.4008 - Train F1: 0.1288 - Train Precision: 0.0849 - Train Recall: 0.2663 - Val Loss: 0.9916 - Val Acc: 0.3715 - Val Hamming Loss: 0.4056 - Val F1: 0.0988 - Val Precision: 0.0656 - Val Recall: 0.2000 - Time elapsed: 12s
--- Epoch 2/50 - Batch 1/3 - Batch Loss: 0.8790 - Batch Acc: 5.5938 - Batch Hamming Loss: 0.3785 - Batch F1: 0.1550 - Batch Precision: 0.1031 - Batch Recall: 0.3125
--- Epoch 2/50 - Batch 2/3 - Batch Loss: 0.91

KeyboardInterrupt: 

Run


In [None]:

csv_file = 'training_history_vgg.csv'
print('start runnig ...')



start runnig ...
