# Andrew

# Import

In [110]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
from torchvision.transforms import v2 as transforms
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
import pytorch_lightning as pl
from PIL import Image
import os
from tqdm.auto import tqdm

# Dataset

In [111]:
class CatDogDataset(Dataset):
    def __init__(self, df, split, base_dir, transform=None):
        self.transform = transform
        self.split = split
        self.base_dir = base_dir

        # Store image paths and labels
        if split != "test":
            self.data_list = [(data[0], 1 if data[1] == "dog" else 0) for data in df.itertuples(index=False)]
        else:
            self.data_list = [(data[0], -1) for data in df.itertuples(index=False)]
    
    def __len__(self):
        return len(self.data_list)
    
    def __getitem__(self, idx):
        img_path, label = self.data_list[idx]
        img_full_path = os.path.join(self.base_dir, img_path)
        
        # Load image on demand
        image = Image.open(img_full_path).convert("RGB").resize((224, 224))
        image = np.array(image) / 255
        
        if self.transform:
            image = self.transform(image)
        
        return image, label, img_path

# DataModule (Loaders)

In [112]:
class CatDogDataModule(pl.LightningDataModule):
    def __init__(self, data_dir="/kaggle/input/witcom-cat-vs-dog", batch_size=128):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        
        # RGB mean and std
        self.mean = [0.485, 0.456, 0.406]
        self.std = [0.229, 0.224, 0.225]
        
        self.train_transform = transforms.Compose([
            transforms.ToImage(),
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToDtype(torch.float32, scale=True),
            transforms.Normalize(mean=self.mean, std=self.std)
        ])
        
        self.val_test_transform = transforms.Compose([
            transforms.ToImage(),
            transforms.Resize((224, 224)),
            transforms.ToDtype(torch.float32, scale=True),
            transforms.Normalize(mean=self.mean, std=self.std)
        ])

        self.df_train = None
        self.df_val = None
        self.df_test = None


    def setup(self, stage=None):
        if self.df_train is None and stage in (None, 'fit'):
            df = pd.read_csv(os.path.join(self.data_dir, "train.csv"))
            self.df_train, self.df_val = train_test_split(
                df, train_size=0.7, random_state=42, stratify=df['label']
            )

        if self.df_test is None and stage in (None, 'test'):
            self.df_test = pd.read_csv(os.path.join(self.data_dir, "test.csv"))

    def train_dataloader(self):
        self.train_dataset = CatDogDataset(
            self.df_train, "train", os.path.join(self.data_dir, "train/train"), self.train_transform
        )
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, pin_memory=True, num_workers=3)

    def val_dataloader(self):
        self.val_dataset = CatDogDataset(
            self.df_val, "val", os.path.join(self.data_dir, "train/train"), self.val_test_transform
        )
        return DataLoader(self.val_dataset, batch_size=self.batch_size, pin_memory=True, num_workers=3)

    def test_dataloader(self):
        self.test_dataset = CatDogDataset(
            self.df_test, "test", os.path.join(self.data_dir, "test/test"), self.val_test_transform
        )
        return DataLoader(self.test_dataset, batch_size=self.batch_size, pin_memory=True, num_workers=3)

# Model

In [113]:
class PredictionHead(nn.Module):
    def __init__(self, feature_dims):
        super(PredictionHead, self).__init__()
        self.activate = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Sequential()
        for i in range(len(feature_dims)-1):
            self.fc.add_module(f'fc_{i}', nn.Linear(feature_dims[i], feature_dims[i+1]))
            if i != len(feature_dims)-2:
                self.fc.add_module(f'a_{i}', self.activate)

    def forward(self, x):
        return self.fc(x)

class CatDogModel(pl.LightningModule):
    def __init__(self, lr=8e-5, gamma=0.85, head_features=[1]):
        super().__init__()
        self.save_hyperparameters()
        self.model = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1)
        feature = [1280]
        feature.extend(head_features)
        self.model.classifier = PredictionHead(feature)
        self.criterion = nn.BCEWithLogitsLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y, _ = batch
        output = self(x).squeeze()
        loss = self.criterion(output, y.float())
        
        preds = torch.sigmoid(output) >= 0.5
        acc = (preds == y.bool()).float().mean()
        
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch
        output = self(x).squeeze()
        loss = self.criterion(output, y.float())
        
        preds = torch.sigmoid(output) >= 0.5
        acc = (preds == y.bool()).float().mean()
        
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
    
        return loss

    def test_step(self, batch, batch_idx):
        x, _, names = batch
        output = self(x).squeeze()
        probs = torch.sigmoid(output)
        self.test_probs.append(probs.cpu())
        self.test_names.extend(names)

    def on_test_epoch_start(self):
        self.test_probs = []
        self.test_names = []

    def on_test_epoch_end(self):
        all_probs = torch.cat(self.test_probs, dim=0).numpy()
        all_names = self.test_names
        predicted_classes = np.where(all_probs >= 0.5, "dog", "cat")

        # Export answer
        pd.DataFrame({
            "filename": all_names,
            "label": predicted_classes
        }).to_csv("andew_EffNet.csv", index=False)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=self.hparams.gamma)
        return {
            "optimizer": optimizer,
            "scheduler": scheduler,
            "interval": "step",
            "frequency": len(self.trainer.datamodule.train_dataloader()) // 4  # Simulate SUB_EPOCHES
        }

# Train and Test

In [114]:
data_module = CatDogDataModule()
model = CatDogModel()

# Trainer configuration
trainer = pl.Trainer(
    max_epochs=1,
    accelerator="gpu",
    devices=1,
    log_every_n_steps=1
)

# Train and test
trainer.fit(model, data_module)
trainer.test(model, data_module)

/usr/local/lib/python3.11/dist-packages/pytorch_lightning/core/optimizer.py:378: Found unsupported keys in the optimizer configuration: {'interval', 'frequency', 'scheduler'}


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Testing: |          | 0/? [00:00<?, ?it/s]

[{}]