In [1]:
import os
import random
from platform import architecture

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from torchvision import transforms, models
from torchvision.transforms.functional import to_pil_image
from sklearn.model_selection import KFold
from collections import defaultdict
from optuna import trial

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset, ConcatDataset
import optuna
import wandb
# Project utilities
import utils
from train import train_model_with_hyperparams

VGG19 = 'VGG19'
ALEXNET = 'AlexNet'

# Set seed
SEED = 42
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

np.random.seed(SEED)
random.seed(SEED)
# torch.backends.cudnn.deterministic = True
# torch.use_deterministic_algorithms = True

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Check if you're working locally or not
if not (os.path.exists(utils.CSV_PATH) and os.path.exists(utils.OPTIMIZED_DIR)):
    print(f"[!] You are NOT on the project's directory [!]\n"
          f"Please run the following command (in either CMD or anaconda prompt): \n"
          f"jupyter notebook --notebook-dir PROJECT_DIR\n"
          r"Where PROJECT_DIR is the project's directory in your computer e.g: C:\Users\amitr5\PycharmProjects\deep_van_gogh")

### Loading our data
We will load the optimized datasets from our custom dataset object


In [3]:
class NumPyDataset(Dataset):
    def __init__(self, file_path):
        data = np.load(file_path)
        self.images = data["images"]
        self.labels = data["labels"]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        x = torch.tensor(self.images[idx], dtype=torch.float32)
        y = torch.tensor(self.labels[idx], dtype=torch.long)
        return x, y

dataset = NumPyDataset(os.path.join(utils.OPTIMIZED_DIR, 'dataset.npz'))

You can find the optimized dataset files <a href="https://drive.google.com/drive/folders/1TBlNcRsRHJ7_rxh_h7_yn_-Ak66Uj_mp?usp=sharing">HERE</a><br/>
Loading the train and test datasets:

In [4]:
classes = pd.read_csv(utils.CSV_PATH)
train_indices, val_indices = train_test_split(classes[classes['subset'] == 'train'].index.tolist(), test_size=0.2, random_state=SEED)
train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)
test_dataset = Subset(dataset, classes[classes['subset'] == 'test'].index.tolist())

### Data Augmentation

In [38]:
# torch.manual_seed(SEED)
# n_times = 25
# dropout_transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     *([transforms.RandomErasing(p=0.5, scale=(0.01, 0.01), ratio=(1, 1))]*n_times),
#     transforms.Grayscale(num_output_channels=3),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229,0.224, 0.225])
# ])
# dropout_dataset = Subset(preprocessing.ImageFolderForBinaryClassification(root=ROOT, transform=dropout_transform, target='is_van_gogh'), train_indices)
# augmented_train_dataset = ConcatDataset([dropout_dataset, train_dataset])
# augmented_train_dataset = train_dataset
# train_loader = DataLoader(augmented_train_dataset, batch_size=128, shuffle=True, num_workers=4, pin_memory=True, prefetch_factor=8)
# val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4)

# Fine tuning VGG19

In [13]:
class FinedTunedModel(nn.Module):
    def __init__(self, base_model, architecture:str):
        super(FinedTunedModel, self).__init__()
        self._architecture = architecture  # Save the base model architecture
        base_children_list = list(base_model.children())
        self.features_extractor = nn.Sequential(*base_children_list[:-1]).to(device)
        for param in self.features_extractor.parameters():
            param.requires_grad = False

        # Modify the classifier to fit to our problem (2 classes)
        self.classifier = nn.Sequential(*base_children_list[-1])
        self.classifier[-1] = nn.Linear(4096, 2).to(device)  # Replaces the final layer of the base model's classifier with a new fully connected layer

    def forward(self, x):
        base_model_output = self.features_extractor(x)
        return self.classifier(torch.flatten(base_model_output, start_dim=1))
    @property
    def architecture(self):
        return self._architecture



In [14]:
vgg19 = models.vgg19(weights=models.VGG19_Weights.DEFAULT).to(device) # Load pre-trained VGG19 model
vgg_model = FinedTunedModel(vgg19, VGG19).to(device)
vgg_model

FinedTunedModel(
  (features_extractor): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (3): ReLU(inplace=True)
      (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (6): ReLU(inplace=True)
      (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): ReLU(inplace=True)
      (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (13): ReLU(inplace=True)
      (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (15): ReLU(inpla

In [15]:
# Optuna for our vgg model with the default config
study = optuna.create_study(study_name=VGG19, direction='minimize')
study.optimize(lambda trial: objective(trial, vgg_model, config={}), n_trials=3)

[I 2025-01-22 16:42:45,007] A new study created in memory with name: VGG19
wandb: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
wandb: Currently logged in as: amitr5 (amitr5-tel-aviv-university). Use `wandb login --relogin` to force relogin


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▅▆▇█
Train Loss,█▄▃▂▁
Validation Accuracy,▁▃▄█▇
Validation Loss,█▄▃▂▁

0,1
Epoch,5.0
Train Accuracy,0.96272
Train Loss,0.0829
Validation Accuracy,0.95349
Validation Loss,0.14031


[I 2025-01-22 16:45:28,355] Trial 0 finished with value: 0.14031435123220515 and parameters: {'learning_rate': 1.758543662708524e-05, 'weight_decay': 2.997062748911493e-05, 'batch_size': 128}. Best is trial 0 with value: 0.14031435123220515.


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▆███
Train Loss,█▃▁▁▁
Validation Accuracy,▅▁█▁▄
Validation Loss,▁▄▅▆█

0,1
Epoch,5.0
Train Accuracy,0.99829
Train Loss,0.00642
Validation Accuracy,0.96033
Validation Loss,0.40956


[I 2025-01-22 16:48:04,051] Trial 1 finished with value: 0.14069072449663922 and parameters: {'learning_rate': 0.00021040501369206028, 'weight_decay': 8.022270892118722e-05, 'batch_size': 64}. Best is trial 0 with value: 0.14031435123220515.


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▇██▇
Train Loss,█▁▁▁▁
Validation Accuracy,▁▁▇▇█
Validation Loss,▁▁▃▅█

0,1
Epoch,5.0
Train Accuracy,0.98871
Train Loss,0.05793
Validation Accuracy,0.96443
Validation Loss,0.54825


[I 2025-01-22 16:51:23,773] Trial 2 finished with value: 0.2567416431067312 and parameters: {'learning_rate': 0.0007730411632807764, 'weight_decay': 1.4028971950181714e-06, 'batch_size': 80}. Best is trial 0 with value: 0.14031435123220515.


In [14]:
# Optuna objective function
def objective(trial, model, config: dict) -> float:
    """
    Generic Optuna objective function.
    :param trial: Optuna trial object.
    :param model: The neural network model to train
    :param config: A dictionary with configurable values such as learning rate ranges, batch size ranges, etc.
    :return:  best_val_loss: The best validation loss achieved during training.
    """
    # Hyperparameter suggestions based on config
    learning_rate = trial.suggest_float("learning_rate",
                                        config.get("lr_min", 1e-5),
                                        config.get("lr_max", 1e-3),
                                        log=True)
    weight_decay = trial.suggest_float("weight_decay",
                                       config.get("wd_min", 1e-6),
                                       config.get("wd_max", 1e-4),
                                       log=True)
    batch_size = trial.suggest_int("batch_size",
                                   config.get("batch_size_min", 32),
                                   config.get("batch_size_max", 128),
                                   step=config.get("batch_size_step", 16))
    patience = config.get("patience", 10)
    epochs = config.get("epochs", 5)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # Load the train DataLoader with the chosen batch_size
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) # Load the val DataLoader with the chosen batch_size

    # Define optimizer and loss function
    criterion = config.get("criterion", nn.CrossEntropyLoss()) # Classification.
    optimizer_class = config.get("optimizer_class", optim.Adam)
    optimizer = optimizer_class(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # Initialize Weights & Biases - the values in the config are the properties of each trial.
    architecture = getattr(model, "architecture", model.__class__.__name__)
    wandb.init(project="deep_van_gogh",
               config={
        "learning_rate": learning_rate,
        "weight_decay": weight_decay,
        "patience": patience,
        "batch_size": batch_size,
        "epochs": epochs,
        "architecture": architecture,
        "dataset": config.get("dataset_name", "Post_Impressionism")
    },
    name=f"{architecture}_trial_{trial.number}") # The name that will be saved in the W&B platform

    # Train the model and get the best validation loss
    best_val_loss = train_model_with_hyperparams(model, train_loader, val_loader, optimizer, criterion,
                                                 epochs=epochs, patience=patience, device=device, trial=trial,
                                                 architecture=architecture)

    # Finish the Weights & Biases run
    wandb.finish()

    # Return best validation loss as the objective to minimize
    return best_val_loss

# TODO:
# make an objective with best accuracy


## Cross-Validation

In [47]:
def cross_validation(dataset:Dataset, **models_dict):
    # Initialize KFold
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    # Track performance for each model
    results = defaultdict(list)

    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
            print(f"\tFold {fold + 1}")
            # Subset the dataset for this fold
            train_subset = Subset(dataset, train_ids)
            val_subset = Subset(dataset, val_ids)

            # Create data loaders
            train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
            val_loader = DataLoader(val_subset, batch_size=64, shuffle=False)

            for model_name, model_dict in models_dict.items():
                print(f"Training :{model_name}")
                # Load the entire model
                base_model = vgg19 if model_dict["architecture"] == VGG19 else alexnet
                model = FinedTunedModel(base_model.to(device), model_dict["architecture"]).to(device)

                # Define loss function and optimizer
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.Adam(model.parameters(), **model_dict['param_groups'])



                # Train the model (implement your training loop here)
                best_val_loss = train_model_with_hyperparams(model, train_loader, val_loader, optimizer, criterion,
                                                             epochs=3, patience=3, device=device, trial=None,
                                                             architecture=None)
                 # Append the results for this fold
                results[model_name].append(best_val_loss)


    # Print final results
    for model_name, model_results in results.items():
         # After all folds, calculate the average fold performance
        mean_perf = sum(results[model_name]) / len(results[model_name])
        print(f"Average Performance for {model_name}: {mean_perf}")

        print(f"{model_name} - Cross-Validation Results: {model_results}")
        print(f"{model_name} - Mean Performance: {sum(model_results) / len(model_results)}")

In [48]:
vgg_path = os.path.join(utils.MODELS_DIR, VGG19)

def get_hyperparameters(path):
    param_groups = torch.load(path, weights_only=True)['optimizer_state_dict']['param_groups'][0]
    return {'lr':param_groups['lr'], 'weight_decay':param_groups['weight_decay']}


model1_param_groups = get_hyperparameters(f"{vgg_path}/best_model_trial_0.pt") # Load hyperparameters
model1_dict = {
    'architecture': VGG19,
    'param_groups': model1_param_groups
}


model2_param_groups =  get_hyperparameters(f"{vgg_path}/best_model_trial_1.pt") # Load hyperparameters
model2_dict = {
    'architecture': VGG19,
    'param_groups': model2_param_groups
}


cross_validation(train_dataset, vgg_model1=model1_dict, vgg_model2=model2_dict)


	Fold 1
Training :vgg_model1



KeyboardInterrupt



# Fine tuning AlexNet

In [7]:
# Load the AlexNet model 
alexnet = models.alexnet(weights=models.AlexNet_Weights.DEFAULT).to(device)
alexnet_model = FinedTunedModel(alexnet, ALEXNET).to(device)
alexnet_model

FinedTunedModel(
  (features_extractor): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
      (1): ReLU(inplace=True)
      (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
      (4): ReLU(inplace=True)
      (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (7): ReLU(inplace=True)
      (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (9): ReLU(inplace=True)
      (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (1): AdaptiveAvgPool2d(output_size=(6, 6))
  )
  (classifier): Sequential(
    (0): Dropout(p=0.5, inplace=False

In [8]:
study = optuna.create_study(study_name=f'{ALEXNET}', direction='minimize')
study.optimize(lambda trial: objective(trial, alexnet_model, config={}), n_trials=3)

[I 2025-01-22 16:01:36,164] A new study created in memory with name: AlexNet
wandb: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
wandb: Currently logged in as: amitr5 (amitr5-tel-aviv-university). Use `wandb login --relogin` to force relogin


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▅▆▇█
Train Loss,█▃▂▁▁
Validation Accuracy,▁█▆▇▆
Validation Loss,█▁▅▂▅

0,1
Epoch,5.0
Train Accuracy,0.99795
Train Loss,0.01104
Validation Accuracy,0.96854
Validation Loss,0.11455


[I 2025-01-22 16:01:57,725] Trial 0 finished with value: 0.10576229227551358 and parameters: {'learning_rate': 8.54475681264745e-05, 'weight_decay': 1.3246324576641997e-06, 'batch_size': 96}. Best is trial 0 with value: 0.10576229227551358.


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▅▆▇█
Train Loss,█▃▂▂▁
Validation Accuracy,▅█▁▄▃
Validation Loss,▂▂▂▁█

0,1
Epoch,5.0
Train Accuracy,0.98598
Train Loss,0.04482
Validation Accuracy,0.95896
Validation Loss,0.3491


[I 2025-01-22 16:02:24,856] Trial 1 finished with value: 0.12651337678614297 and parameters: {'learning_rate': 0.00035744717416844876, 'weight_decay': 3.63939834492625e-06, 'batch_size': 32}. Best is trial 0 with value: 0.10576229227551358.


0,1
Epoch,▁▃▅▆█
Train Accuracy,▁▅▇██
Train Loss,█▃▁▁▁
Validation Accuracy,▁▇▄█▅
Validation Loss,▅▁█▁▃

0,1
Epoch,5.0
Train Accuracy,0.98324
Train Loss,0.05421
Validation Accuracy,0.95896
Validation Loss,0.1584


[I 2025-01-22 16:02:50,670] Trial 2 finished with value: 0.131821774687776 and parameters: {'learning_rate': 0.0008391974354331484, 'weight_decay': 9.707033483493868e-05, 'batch_size': 32}. Best is trial 0 with value: 0.10576229227551358.


analysing results

# Style transfer function

In [4]:
from PIL.Image import Image


#define a function to load an image and pre-process it
def load_image(img_path, shape=(224, 224)):
    image = Image.open(img_path).convert('RGB')
    # Define transformation to resize, normalize, and convert to tensor
    in_transform = transforms.Compose([
        transforms.Resize(shape),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])
    # Apply transformations, remove alpha channel, and add batch dimension
    image = in_transform(image)[:3, :, :].unsqueeze(0)
    return image.to(device)
#define a function to extract features from the network
def get_features(image, model, layers):
    features = {}
    x = image
    for name, layer in model._modules.items():
        x = layer(x)
        if name in layers:
            features[layers[name]] = x
    return features

In [None]:
def style_transfer(model, style_img_path, content_img_path, content_weight=1, style_weight=1e3, num_steps=5001, model_name='vgg19_pretrained'):
    model = model.features #Gives us access to the layers of features

    layers = {
         '0': 'conv1_1', '5': 'conv2_1', '10': 'conv3_1', '19': 'conv4_1',
         '21': 'conv4_2'
    }

    style_weights = {
        'conv1_1': 0.5, 'conv2_1': 0.5, 'conv3_1': 0.5, 'conv4_1': 0.3
    }

    content_layer = 'conv4_2'
    # Prepare model for evaluation, disabling gradient computation
    model.to(device).eval()
    for param in model.parameters():
         param.requires_grad_(False)
        # Load and preprocess the content and style images
    content = load_image(content_img_path).to(device)
    style = load_image(style_img_path).to(device)
         # Extract features from content and style images
    content_features = get_features(content, model, layers)
    style_features = get_features(style, model, layers)
    target = content.clone().requires_grad_(True).to(device)