In [None]:
import os, sys
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import zipfile
import time
import shutil
import collections
from pathlib import Path

In [None]:
import warnings
warnings.filterwarnings('ignore')

### Loading packages

In [None]:
import sys
from pathlib import Path

here_path = Path().resolve()
repo_path = here_path.parents[1]
sys.path.append(str(repo_path))

In [None]:
from py.utils import verifyDir, verifyFile, verifyType

In [None]:
from py.config import Config

cfg = Config()

np.random.seed(cfg.RANDOM_STATE)
cfg.DATA_PATH, cfg.MODEL_PATH

In [None]:
QSCORE_PATH=f"{cfg.DATA_PATH}pp1/Qscores/"
IMAGES_PATH = f"{cfg.DATA_PATH}pp1/images/"
MODEL_PATH = f"{cfg.MODEL_PATH}pp1/{cfg.YEAR_STUDIED}/cnn/"

In [None]:
verifyDir(MODEL_PATH)

### Verify GPU

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch_type = torch.float32 if device.type == "cuda" else torch.float16
device, torch_type

### Loading data

In [None]:
NUM_CLASSES = 1 if "reg" in cfg.ML_TASK else 2

In [None]:
%%time
data_df = pd.read_csv(f"{QSCORE_PATH}scores.csv", sep=";", low_memory=False)
data_df["image_path"] = f"{IMAGES_PATH}{cfg.YEAR_STUDIED}/" + data_df["image_path"]
data_df["image_id"] = data_df["image_id"].apply(str)
data_df.sort_values(by=[cfg.PERCEPTION_METRIC], ascending=False, inplace=True)
data_df

In [None]:
from py.models.datasets.transformations import ImageTransforms

transforms_list = ImageTransforms().get(model_name=cfg.MODEL_FEATURE_NAME)
transforms_list

In [None]:
from torch.utils.data import Dataset
from PIL import Image

class ImagesLabels(Dataset):
    def __init__(self, dataset, transform=None):
        self.image_paths = dataset["image_path"].tolist()
        self.targets = dataset["target"].tolist()
        self.transform = transform
        
    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.image_paths)
        
    def __getitem__(self, idx):
        """
        Args:
            idx (int): Index of the sample to retrieve.

        Returns:
            A single sample (image, label) where the label can be inferred from the filename or other metadata.
        """
        image = Image.open(self.image_paths[idx]).convert("RGB")
        
        # Apply any transforms if specified
        if self.transform:
            image = self.transform(image)

        # Example label from filename (e.g., assuming format class_index.jpg)
        label = self.targets[idx]

        return {"images": image, "targets": label }

In [None]:
%%time
from py.models.datasets import PlacePulse

pp = PlacePulse(data_df)
pp.DataPreparation(delta=cfg.DELTA, emotion=cfg.PERCEPTION_METRIC)
pp.TaskPreparation(task_type=cfg.ML_TASK)
pp.DataSplit()
pp.DataFormat(data_formater=ImagesLabels, transforms_list=transforms_list)
pp.DataLoader(batch_size=cfg.BATCH_SIZE, shuffle_train=False)
pp.plot()

print(f"Train samples: {len(pp.train_df)}")
print(f"Test samples: {len(pp.test_df)}")

### Loading model

In [None]:
from py.models.classification.cnn import ConvolutionClassifier

tm = ConvolutionClassifier(model_name=cfg.MODEL_FEATURE_NAME, num_classes=NUM_CLASSES)

tm.to_device(device)
tm.model_zoo()
tm.print_trainable_parameters(log_params=True)
tm.get_model()

### Training

In [None]:
import torch.nn as nn
import torch.optim as optim

train_loader = pp.dataloaders["train"]
val_loader = pp.dataloaders["val"]

criterion = nn.MSELoss() if "reg" in cfg.ML_TASK else nn.CrossEntropyLoss()
optimizer = optim.AdamW(
                filter(lambda p: p.requires_grad, tm.model.parameters()), 
                lr=1e-4,
                weight_decay=5e-4,
            )
scheduler = optim.lr_scheduler.CosineAnnealingLR(
                    optimizer, T_max=cfg.NUM_EPOCHS, eta_min=1e-6
                )

In [None]:
from py.models.metrics import EvaluationMetrics

task_metrics = EvaluationMetrics(task=cfg.ML_TASK)

In [None]:
def classification_step(data_loader, 
                        model, 
                        optimizer,
                        criterion,
                        task_metrics,
                        cur_epoch,
                        is_train=True):
    running_loss = 0.0
    
    pred_targets = []
    real_targets = []

    if is_train:
        model.train()
    else:
        model.eval()

    with torch.set_grad_enabled(is_train):
        for i, (batch) in enumerate(data_loader):
            batch_images = batch['images'].to(device)
            batch_targets = batch['targets'].to(device)

            # Forward
            batch_predictions = model(batch_images)
            loss = criterion(batch_predictions, batch_targets)
            
            # Backward
            if is_train:
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
            
            running_loss += loss.item()
    
            # Store predictions and labels
            if "reg" in cfg.ML_TASK:
                pred_targets.extend(batch_predictions.cpu().detach().numpy())
            else:
                _, predicted = torch.max(batch_predictions, 1)
                pred_targets.extend(predicted.cpu().detach().numpy())
            
            real_targets.extend(batch_targets.cpu().detach().numpy())
            
            if i % 100 == 0 and is_train:
                print(f"Epoch [{cur_epoch+1}/{cfg.NUM_EPOCHS}] | Step [{i}/{len(data_loader)}] | Loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(data_loader)  # was len(train_loader), should match the loader passed in
    if is_train:
        print(f'Train')
    else:
        print(f'Validation')
    print(f'Loss: {epoch_loss:.4f}')
    eval_metrics = task_metrics.calculate(real_targets, pred_targets)
    return eval_metrics, epoch_loss, real_targets, pred_targets

In [None]:
%%time
best_val_metric = float("inf") if "reg" in cfg.ML_TASK else 0.0 

for epoch in range(cfg.NUM_EPOCHS):
    print(f"Epoch {epoch+1}/{cfg.NUM_EPOCHS}:")

    # Training Phase
    start = time.time()
    train_metrics, train_loss, train_targets, train_predictions = classification_step(
                                                                    data_loader=train_loader,
                                                                    model=tm.model,
                                                                    optimizer=optimizer,
                                                                    criterion=criterion,
                                                                    task_metrics=task_metrics,
                                                                    cur_epoch=epoch,
                                                                    is_train=True
                                                                )
    end = time.time()
    print(f"Training time elapsed: {(end - start)/60:.4f} minutes\n")

    # Validation Phase
    start = time.time()
    val_metrics, val_loss, val_real_targets, val_predictions = classification_step(
                                                                    data_loader=val_loader,
                                                                    model=tm.model,
                                                                    optimizer=optimizer,
                                                                    criterion=criterion,
                                                                    task_metrics=task_metrics,
                                                                    cur_epoch=epoch,
                                                                    is_train=False
                                                                )
    end = time.time()
    print(f"Validation time elapsed: {(end - start)/60:.4f} minutes\n")

    scheduler.step()
    
    # Save the best model if validation accuracy improves
    current_metric = val_metrics["mse"] if "reg" in cfg.ML_TASK else val_metrics["accuracy"]
    is_better = ("reg" in cfg.ML_TASK and current_metric < best_val_metric) or \
                ("class" in cfg.ML_TASK and current_metric > best_val_metric)
        
    if is_better:
        best_val_metric = current_metric
        torch.save(tm.model.state_dict(), f"{MODEL_PATH}{cfg.MODEL_FEATURE_NAME}_best_model.pth")
        print(f"âœ… Epoch {epoch+1}: New best model saved! Metric: {best_val_metric:.4f}")