In [1]:
import os

import torch
import torchinfo
import pytorch_lightning as pl
import albumentations as A
import cv2 as cv
import numpy as np
import pandas as pd
import torch.nn as nn
import seaborn as sns
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torchmetrics
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from sklearn.model_selection import train_test_split
from pytorch_lightning.loggers import CSVLogger

sns.set_theme(style="darkgrid", palette="muted")

In [2]:
# create pytorch dataset
class PatternDataset(Dataset):
    def __init__(self, base_path:str, data:pd.DataFrame, tfms:A.Compose=None):
        self.base_path =base_path
        self.images_dir = os.path.join(self.base_path, "images")
        self.data = data
        self.tfms = tfms
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx:int):
        row = self.data.iloc[idx]
        target = row.class_id
        img_path = os.path.join(self.images_dir, row.class_name, row.file_name)
        img = cv.imread(img_path)
        img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
        img = img / 255. 
        if self.tfms is not None:
            img = self.tfms(image=img)["image"]
        
        img = img[np.newaxis, ...]
        img = torch.tensor(img, dtype=torch.float32)
        target = torch.tensor(target, dtype=torch.long)
        return img, target

In [3]:
# define data path
base_path = os.path.dirname(os.getcwd())
base_path = os.path.dirname(base_path)
base_path = os.path.join(base_path, "data/describable_textures_dtd")
meta_data = "unbalanced_data.csv"

# load the meta data
df = pd.read_csv(os.path.join(base_path, meta_data))

# train test split, stratify according to class ids
y = df.class_id
print(y.nunique())
train_df, valid_df = train_test_split(df, test_size=300, stratify=y)

# image augmentations
transforms = A.Compose([
    A.Resize(224, 224)
])

47


In [4]:
# create datasets
train_dataset = PatternDataset(base_path, data=train_df, tfms=transforms)
valid_dataset = PatternDataset(base_path, data=valid_df, tfms=transforms)

# get class weights
class_sample_counts = train_df.class_id.value_counts().values
weights = 1. / torch.tensor(class_sample_counts, dtype=torch.float)
train_targets = train_df.class_id.values
sample_weights = weights[train_targets]

# create samplers
train_sampler = WeightedRandomSampler(
    sample_weights, num_samples=len(sample_weights), replacement=True
)

# create dataloaders
train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=False,
    pin_memory=True,
    sampler=train_sampler,
)
valid_loader = DataLoader(
    valid_dataset, batch_size=32, shuffle=False, pin_memory=True
)

In [5]:
# model
class CNNModel(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(3, 3))
        self.maxpool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(3, 3))
        self.maxpool2 = nn.MaxPool2d(kernel_size=2)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=(3, 3))
        self.maxpool3 = nn.MaxPool2d(kernel_size=2)
        self.conv4 = nn.Conv2d(64, 128, kernel_size=(3, 3))
        self.maxpool4 = nn.MaxPool2d(kernel_size=2)
        self.conv5 = nn.Conv2d(128, 128, kernel_size=(3, 3))
        self.maxpool5 = nn.MaxPool2d(kernel_size=2)

        self.fc1 = nn.LazyLinear(out_features=256)
        self.dropout = nn.Dropout(p=0.25)
        self.fc2 = nn.Linear(in_features=256, out_features=num_classes)

    def forward(self, x):
        x = self.maxpool1(self.conv1(x))
        x = F.relu(x)
        x = self.maxpool2(self.conv2(x))
        x = F.relu(x)
        x = self.maxpool3(self.conv3(x))
        x = F.relu(x)
        x = self.maxpool4(self.conv4(x))
        x = F.relu(x)
        x = self.maxpool5(self.conv5(x))
        x = F.relu(x)

        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [6]:
num_epochs = 50

In [7]:
class LightningModel(pl.LightningModule):
    def __init__(self, model, lr, cosine_t_max):
        super().__init__()

        self.lr = lr
        self.cosine_t_max = cosine_t_max
        self.model = model

        self.save_hyperparameters(ignore=['model'])   
        self.train_acc = torchmetrics.Accuracy()
        self.val_acc = torchmetrics.Accuracy()
        self.test_acc = torchmetrics.Accuracy()

    def forward(self, x):
        return self.model(x)

    def _shared_step(self, batch):
        features, true_labels = batch
        logits = self(features)
        loss = F.cross_entropy(logits, true_labels)
        predicted_labels = torch.argmax(logits, dim=1)
        return loss, true_labels, predicted_labels
    
    def training_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.log("train_loss", loss)
        self.train_acc(predicted_labels, true_labels)
        self.log(
            "train_acc", self.train_acc, prog_bar=True, on_epoch=True, on_step=False
        )
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)

        self.log("val_loss", loss, prog_bar=True)
        self.val_acc(predicted_labels, true_labels)
        self.log("val_acc", self.val_acc, prog_bar=True)
    
    def test_step(self, batch, batch_idx):
        loss, true_labels, predicted_labels = self._shared_step(batch)
        self.test_acc(predicted_labels, true_labels)
        self.log("test_acc", self.test_acc)
    
    def configure_optimizers(self):
        opt = torch.optim.Adam(self.parameters(), lr=self.lr)
        sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=self.cosine_t_max)

        return {
            "optimizer": opt,
            "lr_scheduler": {
                "scheduler": sch,
                "monitor": "train_loss",
                "interval": "epoch",
                "frequency": 1
            },
        }

In [8]:
from pytorch_lightning.callbacks import ModelCheckpoint
callbacks = [
    ModelCheckpoint(save_top_k=1, mode="max", monitor="val_acc", save_last=True)
]

In [11]:
pl.seed_everything(23)
# dm = CustomDataModule()

pt_model = CNNModel()
pl_model = LightningModel(model=pt_model, lr=3e-4, cosine_t_max=1)

Global seed set to 23


In [12]:
trainer = pl.Trainer(
    max_epochs=num_epochs,
    auto_lr_find=True,
    accelerator="cpu",
    # devices="auto",
    logger = CSVLogger(save_dir="logs/", name="test_pl_model"),
    deterministic=True,
    callbacks=callbacks
)
results = trainer.tune(model=pl_model, train_dataloaders=train_loader, val_dataloaders=valid_loader)

Trainer already configured with model summary callbacks: [<class 'pytorch_lightning.callbacks.model_summary.ModelSummary'>]. Skipping setting a default `ModelSummary` callback.
GPU available: True (cuda), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
  rank_zero_warn(
  rank_zero_warn(


IndexError: Target 11 is out of bounds.