### Group Name: Big Bang Theory

Team members:

1. Arman Feili: 2101835
2. Sohrab Seyyedi Parsa: 2101087
3. Milad Torabi: 2103454
4. Sharifeh Alaei: 2050840

In [1]:
from google.colab import drive
# Mount drive from Google
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [3]:

!pip install timm

Collecting timm
  Downloading timm-1.0.9-py3-none-any.whl.metadata (42 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/42.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.4/42.4 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Downloading timm-1.0.9-py3-none-any.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m53.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: timm
Successfully installed timm-1.0.9


In [4]:
1# my_lib/__init__.py
import os
import PIL.Image
import timm
import torch.nn as nn
import torch
import torchvision.transforms as T
import torch
import numpy as np
import random
import json
import pandas as pd
import numpy as np
import argparse

from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset
from datetime import datetime
from dataclasses import dataclass
from sklearn.model_selection import KFold
from typing import Tuple

root_path = '/content/gdrive/MyDrive/Big-Bang-Theory/Code'

csv_train_file = root_path + "/../dataset/train.csv"
csv_test_file = root_path + "/../dataset/test.csv"

root_train_images = root_path + "/../dataset/images/train"
root_test_images = root_path + "/../dataset/images/test"

csv_split_file: str = root_path + "/../dataset/fold.csv"

outputs = root_path + "/../outputs"

@dataclass
class VisualConfig:
    def to_json(self, path: str):
        with open(path, "w") as fp:
            json.dump(self.__dict__, fp, indent=2)

    @classmethod
    def from_json(self, path: str):
        with open(path, "r") as fp:
            json_obj = json.load(fp)
        return VisualConfig(**json_obj)

    project_name: str = "project"
    random_state: int = 42
    device: str = "cuda"
    seed: int = 42
    num_classes: int = 4
    model_name: str = "resnet18"
    pretrained: bool = True
    train_input_size: Tuple[int, int] = (224, 224)
    test_input_size: Tuple[int, int] = (224, 224)
    aug_color_jitter_b: float = 0.1
    aug_color_jitter_c: float = 0.1
    aug_color_jitter_s: float = 0.1
    norm_mean: Tuple[float, float, float] = (0.485, 0.456, 0.406)
    norm_std: Tuple[float, float, float] = (0.229, 0.224, 0.225)
    fold: int = 0

    root_path = '/content/gdrive/MyDrive/Big-Bang-Theory/Code'

    csv_train_file = root_path + "/../dataset/train.csv"
    csv_test_file = root_path + "/../dataset/test.csv"

    root_train_images = root_path + "/../dataset/images/train"
    root_test_images = root_path + "/../dataset/images/test"

    csv_split_file: str = root_path + "/../dataset/fold.csv"

    outputs = root_path + "/../outputs"

    num_epochs: int = 15
    batch_size: int = 32
    test_batch_size: int = 32
    num_workers: int = 0
    lr: float = 1.0e-3
    weight_decay: float = 1e-4

class DFDataset(Dataset):
    def __init__(self, root, df, transform=None):
        super().__init__()
        self.images = [os.path.join(root, str(row[1]["image_name"])) for row in df.iterrows()]
        self.target = df["target"].values if "target" in df.columns else None
        self.transform = transform
        self.image_cache = {}  # Dictionary to store cached images

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if idx in self.image_cache:
            img = self.image_cache[idx]
        else:
            try:
                img = PIL.Image.open(self.images[idx])
                self.image_cache[idx] = img  # Cache the image
            except (PIL.UnidentifiedImageError, IOError) as e:
                print(f"Error loading image {self.images[idx]}: {e}")
                return None  # or handle appropriately depending on your requirement

        if self.transform:
            img = self.transform(img)

        return img if self.target is None else (img, self.target[idx])

class VisualModelTimm(nn.Module):
    def __init__(self, model_name, num_classes, pretrained=True):
        super().__init__()
        self.encoder = timm.create_model(model_name, num_classes=0, pretrained=pretrained)
        config = timm.get_pretrained_cfg(model_name=model_name, allow_unregistered=True).to_dict()
        with torch.no_grad():
            emb = self.encoder(torch.rand(1, *config["input_size"]))
            self.embedding_size = emb.shape[1]
        self.head = nn.Linear(self.embedding_size, num_classes)

    def encode(self, x):
        return self.encoder(x)

    def forward(self, x):
        return self.head(self.encode(x))

class RunningMean:
    def __init__(self, name: str = ""):
        self.name = name
        self.restart()

    def restart(self):
        self.mean = 0
        self.n = 0

    def update(self, value):
        self.mean = self.mean + (value - self.mean) / (self.n + 1)
        self.n += 1

    def __str__(self):
        return f"{self.mean}"

def get_preprocessing(config: VisualConfig, is_training=True):
    transform = [T.RandomRotation(15),
                 T.RandomResizedCrop(size=(128, 128), scale=(0.8, 1)),  # Resized smaller
                 T.ColorJitter(config.aug_color_jitter_b, config.aug_color_jitter_c, config.aug_color_jitter_s, 0.0),
                 T.RandomHorizontalFlip()] if is_training else [T.Resize((128, 128))]  # Test resize smaller
    transform += [T.ToTensor(), T.Normalize(config.norm_mean, config.norm_std)]
    return T.Compose(transform)

def seed_everything(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    random.seed(seed)

def get_output_folder(project_name):
    return project_name + "_" + datetime.now().strftime("%m-%d-%Y-%H-%M-%S")


def train_epoch(epoch, model, loader, criterion) -> dict:
    model.train()
    mean_loss = RunningMean()
    correct = 0
    total = 0

    print(f"Epoch {epoch} started...")

    for batch_idx, (image_batch, label_batch) in enumerate(loader):
        image_batch, label_batch = image_batch.to(cfg.device), label_batch.to(cfg.device)

        # Forward pass
        logits = model(image_batch)
        loss = criterion(logits, label_batch)

        # Backward pass and optimization
        optim.zero_grad()
        loss.backward()
        optim.step()
        lr_sched.step()

        # Update running mean of the loss
        mean_loss.update(loss.item())

        # Calculate accuracy
        _, predicted = torch.max(logits, 1)
        total += label_batch.size(0)
        correct += (predicted == label_batch).sum().item()

        # Print the loss for the current batch
        print(f"Epoch {epoch}, Batch {batch_idx + 1}/{len(loader)}: Loss = {loss.item():.4f}")

    accuracy = 100 * correct / total

    print(f"Epoch {epoch} completed. Mean Loss = {mean_loss.mean:.4f}")

    # Return both mean loss and accuracy
    return {"mean_loss": mean_loss.mean, "accuracy": accuracy}

# Evaluation Loop
def eval_epoch(epoch, model, loader, criterion) -> dict:
    model.eval()
    y_pred_list, prob_pred_list, y_true_list = [], [], []
    mean_loss = RunningMean()
    for image_batch, label_batch in loader:
        image_batch, label_batch = image_batch.to(cfg.device), label_batch.to(cfg.device)
        with torch.no_grad():
            logits = model(image_batch)
        y_pred_list += list(torch.argmax(logits, dim=1).cpu().numpy())
        prob_pred_list += [torch.softmax(logits, dim=1).cpu().numpy()]
        y_true_list += list(label_batch.cpu().numpy())
        mean_loss.update(criterion(logits, label_batch).item())
    return {"acc": accuracy_score(y_true_list, y_pred_list), "probabilities": np.vstack(prob_pred_list), "mean_loss": mean_loss}

# Test Loop
def test_epoch(epoch, model, loader, df_test) -> dict:
    model.eval()
    y_pred_list, prob_pred_list = [], []
    target_list = [] if hasattr(loader.dataset, 'target') and loader.dataset.target is not None else None

    for batch in loader:
        if target_list is not None:
            image_batch, label_batch = batch
            target_list += list(label_batch.cpu().numpy())  # Collect the targets
        else:
            image_batch = batch  # Only images in the test set

        image_batch = image_batch.to(cfg.device)
        with torch.no_grad():
            logits = model(image_batch)

        y_pred_list += list(torch.argmax(logits, dim=1).cpu().numpy())
        prob_pred_list += [torch.softmax(logits, dim=1).cpu().numpy()]

    results = {"probabilities": np.vstack(prob_pred_list), "prediction": y_pred_list}
    if target_list is not None:
        results["target"] = target_list

    return results

def save_checkpoint(model, optimizer, epoch, output_folder, eval_score):
    print(f"Output folder: {output_folder}")  # Debugging output folder path

    """Save model, optimizer state, and current epoch."""
    checkpoint_path = os.path.join(output_folder, f"model_checkpoint_{epoch}_{eval_score:.4f}.pth")
    print(f"Saving checkpoint at {checkpoint_path}")  # Add this line for debugging
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")

def load_checkpoint(model, optimizer, output_folder):
    """Load model and optimizer state from a checkpoint if available."""
    # Print the output folder to verify path
    print(f"Loading from checkpoint folder: {output_folder}")

    # List files in the output folder
    checkpoint_files = [f for f in os.listdir(output_folder) if f.startswith("model_checkpoint")]
    print(f"Checkpoint files found: {checkpoint_files}")

    if not checkpoint_files:
        print("No checkpoint found, starting training from scratch.")
        return model, optimizer, 0

    # Find the latest checkpoint
    latest_checkpoint = max(checkpoint_files, key=os.path.getctime)
    checkpoint_path = os.path.join(output_folder, latest_checkpoint)

    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch'] + 1  # Resume from the next epoch
    print(f"Loaded checkpoint from {checkpoint_path}, resuming from epoch {epoch}")

    return model, optimizer, epoch



In [None]:


df = pd.read_csv(csv_train_file)
kf = KFold(5, shuffle=True, random_state=42)

df_fold = pd.DataFrame()
df_fold["fold"] = np.zeros(len(df))

for fold, (train_idx, val_idx) in enumerate(kf.split(df)):
    df_fold.loc[val_idx, "fold"] = int(fold)

df_fold["fold"] = df_fold["fold"].astype(int)
df_fold.to_csv(csv_split_file, index=False)

# TEST_config.py
cfg = VisualConfig(random_state=1000)
cfg.to_json("example.json")
print(cfg)

# TEST_models_visual.py
model = VisualModelTimm(model_name="resnet50", num_classes=4, pretrained=False)
embedding = model.encode(torch.rand(8, 3, 224, 224))
print(model)
print(embedding.shape)

# TEST_preprocessing_visual.py
config = VisualConfig()
tr_train = get_preprocessing(config, is_training=True)
tr_test = get_preprocessing(config, is_training=False)

df_train = pd.read_csv(csv_train_file)
train_ds = DFDataset(root_train_images, df_train, transform=tr_train)
print(train_ds[5])
print(tr_train)
print(tr_test)

if __name__ == "__main__":
    import sys
    if 'ipykernel' in sys.modules:
        args = None  # Bypass argument parsing when running in Jupyter
    else:
        parser = argparse.ArgumentParser()
        parser.add_argument("--cfg", type=str, required=False, default=None)
        args = parser.parse_args()

    if args is not None and args.cfg is not None:
        print("Loading config...")
        cfg = VisualConfig.from_json(args.cfg)
    else:
        cfg = VisualConfig()

    seed_everything(cfg.seed)

    model = VisualModelTimm(cfg.model_name, cfg.num_classes, pretrained=cfg.pretrained)

    train_transform = get_preprocessing(cfg, is_training=True)
    val_transform = get_preprocessing(cfg, is_training=False)
    test_transform = get_preprocessing(cfg, is_training=False)

    df = pd.read_csv(cfg.csv_train_file)
    split = pd.read_csv(cfg.csv_split_file)

    if cfg.fold == -1:
        df_train = df
        evaluate = False
    else:
        df_train = df.loc[split["fold"] != cfg.fold, :]
        df_val = df.loc[split["fold"] == cfg.fold, :]
        evaluate = True

    df_test = pd.read_csv(cfg.csv_test_file)

    # Create Data Loaders with optimized batch size and reduced num_workers (from 4 to 2)
    train_ds = DFDataset(cfg.root_train_images, df_train, transform=train_transform)
    train_loader = DataLoader(train_ds, batch_size=cfg.batch_size // 2, num_workers=0, shuffle=True, drop_last=False)

    if evaluate:
        val_ds = DFDataset(cfg.root_train_images, df_val, transform=val_transform)
        val_loader = DataLoader(val_ds, batch_size=cfg.test_batch_size // 2, num_workers=0, shuffle=False)

    test_ds = DFDataset(cfg.root_test_images, df_test, transform=test_transform)
    test_loader = DataLoader(test_ds, batch_size=cfg.test_batch_size // 2, num_workers=0, shuffle=False, drop_last=False)

    optim = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    total_iterations = cfg.num_epochs * len(train_loader)
    lr_sched = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optim, T_max=total_iterations, eta_min=cfg.lr * 0.01)
    criterion = nn.CrossEntropyLoss()

    output_folder = f"{outputs}/{get_output_folder(cfg.project_name)}"
    os.makedirs(output_folder, exist_ok=True)

    # Load model and optimizer from checkpoint if available
    model, optim, start_epoch = load_checkpoint(model, optim, output_folder)

    model.to(cfg.device)

    # Start training from the loaded epoch
    for epoch in range(start_epoch, cfg.num_epochs):
        train_results = train_epoch(epoch, model, train_loader, criterion)
        print(f"Training Accuracy: {train_results['accuracy']}%")

        if evaluate:
            val_results = eval_epoch(epoch, model, val_loader, criterion)
            print(f"Validation Accuracy: {val_results['acc']*100:.2f}%")
            df_val.loc[:, [f"prob_{i}" for i in range(cfg.num_classes)]] = val_results["probabilities"]
            eval_score = val_results["acc"]
            df_val.to_csv(os.path.join(output_folder, f"fold_{cfg.fold}_valpred_{epoch}_{eval_score:.4f}.csv"), index=False)
        else:
            eval_score = -1

        # Save the model and optimizer after each epoch
        save_checkpoint(model, optim, epoch, output_folder, eval_score)

        # Modify test results saving section
        test_results = test_epoch(epoch, model, test_loader, df_test)
        df_test.loc[:, [f"prob_{i}" for i in range(cfg.num_classes)]] = test_results["probabilities"]
        df_test["prediction"] = test_results["prediction"]

        if "target" in test_results:
            df_test["target"] = test_results["target"]  # Add the target column if available

        # Save to CSV with or without the target column
        df_test.to_csv(os.path.join(output_folder, f"fold_{cfg.fold}_testpred_{epoch}_{eval_score:.4f}.csv"), index=False)

VisualConfig(project_name='project', random_state=1000, device='cuda', seed=42, num_classes=4, model_name='resnet18', pretrained=True, train_input_size=(224, 224), test_input_size=(224, 224), aug_color_jitter_b=0.1, aug_color_jitter_c=0.1, aug_color_jitter_s=0.1, norm_mean=(0.485, 0.456, 0.406), norm_std=(0.229, 0.224, 0.225), fold=0, csv_split_file='/content/gdrive/MyDrive/Big-Bang-Theory/Code/../dataset/fold.csv', num_epochs=15, batch_size=32, test_batch_size=32, num_workers=0, lr=0.001, weight_decay=0.0001)
VisualModelTimm(
  (encoder): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (act1): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): B

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/46.8M [00:00<?, ?B/s]

Loading from checkpoint folder: /content/gdrive/MyDrive/Big-Bang-Theory/Code/../outputs/project_10-01-2024-12-56-37
Checkpoint files found: []
No checkpoint found, starting training from scratch.
Epoch 0 started...
Epoch 0, Batch 1/200: Loss = 1.4490
Epoch 0, Batch 2/200: Loss = 1.3491
Epoch 0, Batch 3/200: Loss = 1.3697
Epoch 0, Batch 4/200: Loss = 1.4374
Epoch 0, Batch 5/200: Loss = 1.4210
Epoch 0, Batch 6/200: Loss = 1.3024
Epoch 0, Batch 7/200: Loss = 1.3584
Epoch 0, Batch 8/200: Loss = 1.3659
Epoch 0, Batch 9/200: Loss = 1.5367
Epoch 0, Batch 10/200: Loss = 1.2462
Epoch 0, Batch 11/200: Loss = 1.3743
Epoch 0, Batch 12/200: Loss = 1.3601
Epoch 0, Batch 13/200: Loss = 1.3938
