# Experiment

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import v2

from typing import Any

import datasets


torch.manual_seed(17)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [149]:
dataset_id = "microsoft/cats_vs_dogs"

dataset = datasets.load_dataset(dataset_id) # -> DatasetDict('train': {..})
dataset = dataset.get('train')
dataset

Dataset({
    features: ['image', 'labels'],
    num_rows: 23410
})

In [46]:
"Helper functions"
def pil_to_tensor(img: Any) -> torch.Tensor: 
    tensor = v2.functional.pil_to_tensor(img)
    return tensor

def tensor_to_pil(tensor: torch.Tensor) -> Any: 
    img = v2.functional.to_image(tensor)
    return img    

def image_preprocessing(img, size=[224, 224]) -> torch.Tensor: 
    img = v2.functional.resize(img, size=size)
    return img

def unique_labels(labels: list) -> dict:
    rst = {}
    for label in labels:
        if label not in rst.keys():
            rst[label] = 0
        else:
            rst[label] += 1

    return rst


## Dataset

In [197]:
class CatDogDataset(Dataset): 
    def __init__(
            self, 
            dataset: datasets.Dataset, 
            is_transform: bool = True, 
            resize: tuple = (32, 32)): 
        super().__init__()
        self.dataset = dataset
        self.is_transform = self._transform(resize, is_transform)

    def __getitem__(self, idx) -> tuple: 
        entity = self.dataset[idx]
        image = entity.get('image')
        image = image.convert("L")    # [1, H, W] :: PIL
        tensor = pil_to_tensor(image)           # [..] :: Torch
        if self.is_transform: 
            tensor = self.is_transform(tensor)
        labels = entity.get('labels')
        return (tensor, labels) 
    

    def __len__(self) -> int: 
        return len(self.dataset)

    @staticmethod
    def _transform(resize: tuple, is_transform: bool = True) -> torch.Tensor: 
        if is_transform: 
            transformer = v2.Compose([
                    v2.RandomResizedCrop(size=resize, antialias=True),
                    v2.RandomHorizontalFlip(p=0.5),
                    v2.ToDtype(torch.float32, scale=True),
                    v2.Normalize(mean=[0.456], std=[0.225]),
            ])
        else: 
            transformer = v2.Compose([
                v2.Resize(size=resize), 
                v2.ToDtype(torch.float32, scale=True),
                v2.Normalize(mean=[0.456], std=[0.225]),
            ])
        return transformer
    


## Model Architecture

In [None]:
class LogisticRegressionModel(nn.Module):
    def __init__(self, input_size, num_classes, device: str):
        super(LogisticRegressionModel, self).__init__()
        self.fc = nn.Linear(input_size, num_classes)
        self.fc = nn.Sequential([
            nn.Linear(input_size, input_size//2), 
            nn.Linear(input_size//2, num_classes)
        ])
        self.output = nn.Sigmoid()
        self.device = device

    def forward(self, x):
        x = x.view(x.size(0), -1).to(self.device)  # Flatten the image [C, H, W] -> [C, H * W]
        return self.output(self.fc(x))

"""
For those who do not want to use nn.Sigmoid() layer
You can use: torch.sigmoid(self.fc(x))
"""

'\nFor those who do not want to use nn.Sigmoid() layer\nYou can use: torch.sigmoid(self.fc(x))\n'

## Training and Evaluating

In [189]:
# 1. Hyperparameters
batch_size = 64
learning_rate = 0.001
epochs = 50
input_size = 32 * 32 * 1  # Image size after resizing (32x32 with 1 channels)
patience = 5  # Early stopping patience
checkpoint_path = "./best_model.pth"  # Path to save the best model

In [None]:
dataset_id = "microsoft/cats_vs_dogs"

dataset = datasets.load_dataset(dataset_id) # -> DatasetDict('train': {..})
dataset = dataset.get('train')
dataset

Dataset({
    features: ['image', 'labels'],
    num_rows: 23410
})

In [None]:
"""
2. Setup dataset => test_size: 0.1
train: 80%
validate_size: 20%
val: 10%
test: 10%
"""
dataset = dataset.train_test_split(test_size=0.2)   # DatasetDict({'train': {..}, 'test': {..}})
train_ds = dataset.get('train')
validate_cluster = dataset.get('test')


validate_cluster = validate_cluster.train_test_split(test_size=0.5) 
val_ds = validate_cluster.get('train')
test_ds = validate_cluster.get('test')

In [199]:
# 3. Implement Dataset and DataLoader
train_dataset = CatDogDataset(dataset=train_ds)
val_dataset = CatDogDataset(dataset=val_ds, is_transform=False)
test_dataset = CatDogDataset(dataset=test_ds, is_transform=False)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [200]:
# 4. Logistic Regression Model
n_classes = 2 # Cat and Dog
model = LogisticRegressionModel(input_size=input_size, 
                                num_classes=n_classes, 
                                device=device)
model.to(device=device)

LogisticRegressionModel(
  (fc): Linear(in_features=1024, out_features=2, bias=True)
  (output): Sigmoid()
)

In [165]:
# 5. Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()    # -> BinaryCrossEntropy: the dataset labels will be merged as [0, 1]
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [166]:
# 6. Training Function
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss, correct = 0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(dim=1) == labels).sum().item()
    
    accuracy = 100 * correct / len(loader.dataset)
    return total_loss / len(loader), accuracy

# 7. Validation Function
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            correct += (outputs.argmax(dim=1) == labels).sum().item()
    
    accuracy = 100 * correct / len(loader.dataset)
    return total_loss / len(loader), accuracy

In [None]:
# 8.1. Training Loop
for epoch in range(1, epochs + 1):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch}/{epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.2f}%\n")

print("Training complete!")

In [None]:
# 8.2. Save the model

In [None]:
# 9. Inference test

