In [None]:
import os, sys
from tqdm import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import zipfile
import time
import shutil
import collections
from pathlib import Path

In [None]:
import warnings
warnings.filterwarnings('ignore')

### Loading packages

In [None]:
import sys
from pathlib import Path

here_path = Path().resolve()
repo_path = here_path.parents[1]
sys.path.append(str(repo_path))

In [None]:
from py.utils import verifyDir, verifyFile, verifyType

In [None]:
from py.config import Config

cfg = Config()

np.random.seed(cfg.RANDOM_STATE)
cfg.DATA_PATH, cfg.MODEL_PATH

In [None]:
QSCORE_PATH=f"{cfg.DATA_PATH}pp2/{cfg.SCORING_METHOD}/{cfg.PLACE_LEVEL}/"
IMAGES_PATH = f"{cfg.DATA_PATH}pp2/images/"
MODEL_PATH = f"{cfg.MODEL_PATH}pp2/cnn/{cfg.SCORING_METHOD}/{cfg.PLACE_LEVEL}/"
FEATURES_PATH = f"{cfg.MODEL_PATH}pp2/features/{cfg.SCORING_METHOD}/{cfg.PLACE_LEVEL}/"

In [None]:
verifyDir(FEATURES_PATH)

### Verify GPU

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch_type = torch.float32 if device.type == "cuda" else torch.float16
device, torch_type

### Loading data

In [None]:
NUM_CLASSES = 1 if "reg" in cfg.ML_TASK else 2

In [None]:
%%time
data_df = pd.read_csv(f"{QSCORE_PATH}scores.csv", sep=";", low_memory=False)
data_df["image_path"] = f"{IMAGES_PATH}" + data_df["image_path"]
data_df["image_id"] = data_df["image_id"].apply(str)
data_df.sort_values(by=[cfg.PERCEPTION_METRIC], ascending=False, inplace=True)
data_df

In [None]:
from py.models.datasets.transformations import ImageTransforms

transforms_list = ImageTransforms().get(model_name=cfg.MODEL_FEATURE_NAME)
transforms_list

In [None]:
from torch.utils.data import Dataset
from PIL import Image

class ImagesLabels(Dataset):
    def __init__(self, dataset, transform=None):
        self.image_ids = dataset["image_id"].tolist()
        self.image_paths = dataset["image_path"].tolist()
        self.targets = dataset["target"].tolist()
        self.labels = dataset["label"].tolist()
        self.transform = transform
        
    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.image_paths)
        
    def __getitem__(self, idx):
        """
        Args:
            idx (int): Index of the sample to retrieve.

        Returns:
            A single sample (image, label) where the label can be inferred from the filename or other metadata.
        """
        image = Image.open(self.image_paths[idx]).convert("RGB")
        image_path = self.image_paths[idx]
        image_id = self.image_ids[idx]
        
        # Apply any transforms if specified
        if self.transform:
            image = self.transform(image)

        # Example label from filename (e.g., assuming format class_index.jpg)
        target = self.targets[idx]
        label = self.labels[idx]

        return {"images_id": image_id, "images": image, "images_path": image_path, "targets": target, "labels": label }

In [None]:
%%time
from py.models.datasets import PlacePulse

pp = PlacePulse(data_df)
pp.DataPreparation(delta=cfg.DELTA, emotion=cfg.PERCEPTION_METRIC, city=cfg.CITY_STUDIED)
pp.TaskPreparation(task_type=cfg.ML_TASK)
pp.DataSplit()
pp.DataFormat(data_formater=ImagesLabels, transforms_list=transforms_list)
pp.DataLoader(batch_size=cfg.BATCH_SIZE, shuffle_train=False)
pp.plot()

print(f"Train samples: {len(pp.train_df)}")
print(f"Test samples: {len(pp.test_df)}")

### Loading Model

In [None]:
from py.models.classification.cnn.vgg import VGG16

model = VGG16(num_classes=2, use_gap=True)
model.load_state_dict(torch.load(f"{MODEL_PATH}{cfg.MODEL_FEATURE_NAME}_best_model.pth"))
model.to(device)
model.eval()

#### Feature Extraction

In [None]:
train_loader = pp.dataloaders["train"]
val_loader = pp.dataloaders["val"]

In [None]:
def get_features(data_loader):
    images_features = []
    images_path = []
    images_id = []
    targets = []
    labels = []
    
    for i, batch in enumerate(tqdm(data_loader)):
        batch_images = batch['images'].to(device)
        batch_paths = batch['images_path']
        batch_ids = batch['images_id']
        batch_targets = batch['targets']
        batch_labels = batch['labels']

        with torch.no_grad():
            x = model.feature_maps(batch_images)
            x = model.avgpool(x)
            x = torch.flatten(x, 1)
            features = model.classifier[:-1](x)

            images_features.extend(features.cpu().detach().numpy().tolist())
            images_path.extend(batch_paths)
            images_id.extend(batch_ids)
            targets.extend(batch_targets.cpu().detach().numpy().tolist())
            labels.extend(batch_labels)

    return images_id, images_path, np.array(images_features).tolist(), targets, labels

In [None]:
%%time
train_images_id, train_images_path, train_images_features, train_targets, train_labels  = get_features(train_loader)

In [None]:
%%time
test_images_id, test_images_path, test_images_features, test_targets, test_labels  = get_features(val_loader)

### Saving features

In [None]:
features_dict = {"train": {
                    "image_id": train_images_id,
                    "image_path": train_images_path,
                    "features": train_images_features,
                    "target": train_targets,
                    "label": train_labels,
                }, 
                 "test": {
                    "image_id": test_images_id,
                    "image_path": test_images_path,
                    "features": test_images_features,
                    "target": test_targets,
                    "label": test_labels,
                }, 
                }

In [None]:
%%time
import pickle

with open(f"{FEATURES_PATH}{cfg.MODEL_FEATURE_NAME}_features.pkl", "wb") as f:
    pickle.dump(features_dict, f)