In [45]:
import os
import json
import shutil
import polars as pl
from PIL import Image
from copy import deepcopy
from plotly.subplots import make_subplots

import torch
from torch import nn, Tensor
from torch.utils.data import Dataset, DataLoader
from torchvision.io import decode_image
from torchvision.transforms import v2

import torchmetrics as tm
from torchinfo import summary
from lightning import Fabric

In [46]:
# Common config/params for easy access and edit
# Optimizer, loss function, etc. will be added later
# If we declare here, the autocomplete will not work
cfg = {
    'csv_path': 'data/raw/data.csv',
    'img_dir': 'data/raw/images',
    'model_dir': 'model',
    'sample_size': 2000,
    'img_size': (128, 128),
    'seed': 1337,
    'lr': 0.001,
    'batch_size': 64,
    'epochs': 1,
    'patience': 5
}

# Set seed for reproducibility 
Fabric.seed_everything(cfg['seed'])

In [47]:
# Use dict so we can iterate the keys if needed
# Looping variable directly will create an object copy
# By using dict keys we will get a reference instead of copy
df = {}

df_tmp = pl.read_csv(cfg['csv_path'])
df_tmp = df_tmp.sample(n = cfg['sample_size'], shuffle = True, seed = cfg['seed'])

# Use half of the samples as test data
df['test'] = df_tmp.head(int(0.5 * cfg['sample_size']))

# The other half as train and val data (80/20)
df_tmp = df_tmp.tail(len(df_tmp) - len(df['test']))
df['val'] = df_tmp.tail(int(0.2 * len(df_tmp)))
df['train'] = df_tmp.head(len(df_tmp) - len(df['val']))

del df_tmp
df['train']

Id,Subject Focus,Eyes,Face,Near,Action,Accessory,Group,Collage,Human,Occlusion,Info,Blur,Pawpularity
str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
"""f35cd61df35c10d103128cedd62939…",0,1,1,1,0,0,0,0,0,0,0,0,25
"""ba0fcc8a37286942e2c8b371242191…",0,1,1,1,0,0,0,0,1,1,0,0,40
"""d1c4f27d9f606497f90d0770fcb006…",0,1,1,1,0,0,0,0,0,0,0,0,38
"""6d66abfff024243564764a27cb8258…",0,1,1,1,0,1,0,0,0,0,0,0,49
"""76b49ec9e93679193ead2bc697978a…",0,1,1,1,0,0,0,0,0,0,0,0,40
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""ba72b0442c45364dfa3ce88c4ee8cf…",0,0,1,0,0,0,1,1,0,1,1,0,60
"""07ed3551e1cdc7ccf14990420e1120…",0,1,1,1,0,0,0,0,0,0,0,0,21
"""93668760e9a4c9438be39119dd9525…",0,1,1,1,0,0,0,0,0,0,0,0,33
"""e8e745df7d70fac506a0eb1e25cac0…",0,0,1,1,0,0,0,0,0,0,0,0,30


In [48]:
fig = make_subplots(
    rows = 2,
    cols = 5,
    subplot_titles = df['train']['Pawpularity'][0:10]
)

for i in range(2):
    for j in range(5):
        image = df['train']['Id'][i * 5 + j]
        image = cfg['img_dir'] + f'/{image}.jpg'

        with Image.open(image) as image:
            image = image.resize(cfg['img_size'])

            fig.add_image(
                z = image,
                row = i + 1,
                col = j + 1
            )

fig.update_layout(title = 'Pawpularity Sample')
fig.update_xaxes(showticklabels = False)
fig.update_yaxes(showticklabels = False)

fig.show()

In [49]:
fig = make_subplots(rows = 3, cols = 1)

for i, key in enumerate(['train', 'val', 'test']):
    fig.add_histogram(
        x = df[key]['Pawpularity'],
        name = key,
        row = i + 1,
        col = 1
    )

fig.update_layout(title = 'Pawpularity Histogram')

We don't actually use test data in this notebook, but it will be used in the real pipeline later

In [50]:
class PawDataset(Dataset):
    def __init__(self, df: pl.DataFrame, img_dir: str, img_transform = None, transform = None):
        self.df = df
        self.img_dir = img_dir
        self.img_transform = img_transform
        self.transform = transform

    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, index):
        # Image data
        img_col = self.df['Id'][index]
        img_path = os.path.join(self.img_dir, img_col)
        img_path = os.path.abspath(img_path + '.jpg')
        image = decode_image(img_path)

        # Target (must be 2D even if only 1 column)
        # If there's no target, return an empty list instead
        target = []
        if 'Pawpularity' in self.df.columns:
            target = self.df.select(pl.col('Pawpularity') / 100)
            target = target.to_torch()[index]

        # Tabular data (the rest of the columns)
        features = self.df.select(pl.exclude('Id', 'Pawpularity'))
        features = features.to_torch()[index]

        if self.img_transform:
            image = self.img_transform(image)

        if self.transform:
            image = self.transform(image)
            features = self.transform(features)
            target = [] if not target else self.transform(target)

        # Return dict instead of tuple for clarity
        return {
            'image': image,
            'features': features,
            'target': target
        }

In [51]:
loader = {}

for key in ['train', 'val', 'test']:
    # Resize all images to have the same size
    img_transform = [ v2.Resize(cfg['img_size']) ]
    # Convert all data types to the same type
    transform = [ v2.ToDtype(torch.float32) ]

    # When training, apply random transformations
    # Otherwise, leave the image untouched
    if key == 'train':
        img_transform += [
            v2.RandomChoice([
                v2.RandomAffine(
                    # 2D Rotation
                    degrees = [-180, 180],
                    # 3D rotation
                    shear = [-25, 25]
                ),
                v2.ColorJitter(
                    contrast = [0.9, 1.1],
                    saturation = [0.9, 1.1],
                    hue = [-0.1, 0.1]
                )
            ])
        ]

    # Pass the dataset to the dataloader
    loader[key] = DataLoader(
        PawDataset(
            df[key],
            img_dir = cfg['img_dir'],
            img_transform = v2.Compose(img_transform),
            transform = v2.Compose(transform)
        ),
        batch_size = cfg['batch_size'],
        shuffle = True if key == 'train' else False
    )

# Test the shape of the first batch
for ds in loader['train']:
    print('Image shape:', ds['image'].shape)
    print('Features shape:', ds['features'].shape)
    print('Target shape:', ds['target'].shape)
    break

Image shape: torch.Size([64, 3, 128, 128])
Features shape: torch.Size([64, 12])
Target shape: torch.Size([64, 1])


In [52]:
fig = make_subplots(
    rows = 2,
    cols = 5,
    subplot_titles = df['train']['Pawpularity'][0:10]
)

for i in range(2):
    for j in range(5):
        # Float type will mess up the image color
        # So we convert it back for this part only
        image = v2.functional.to_pil_image(
            v2.functional.to_dtype(
                ds['image'][i * 5 + j],
                torch.uint8
            )
        )

        fig.add_image(
            z = image,
            row = i + 1,
            col = j + 1
        )

fig.update_layout(title = 'Pawpularity Sample')
fig.update_xaxes(showticklabels = False)
fig.update_yaxes(showticklabels = False)

fig.show()

In [53]:
class PawModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.img_input = nn.Sequential(
            nn.LazyBatchNorm2d(),

            nn.LazyConv2d(16, 3, padding = 'same'),
            nn.MaxPool2d(2),

            nn.LazyConv2d(32, 3, padding = 'same'),
            nn.MaxPool2d(2),

            nn.LazyConv2d(64, 3, padding = 'same'),
            nn.MaxPool2d(2),

            nn.LazyConv2d(128, 3, padding = 'same'),
            nn.MaxPool2d(2),

            nn.Flatten(),
            nn.LazyLinear(128)
        )

        self.feat_input = nn.LazyLinear(128)

        self.comb_input = nn.Sequential(
            nn.LazyBatchNorm1d(),
            nn.LazyLinear(1),
            nn.Sigmoid()
        )

    def forward(self, image: Tensor, features: Tensor) -> Tensor:
        out1 = self.img_input(image)
        out2 = self.feat_input(features)

        # Combine the previous layer output
        out3 = torch.cat([out1, out2], dim = 1)
        out3 = self.comb_input(out3)

        return out3

model = PawModel()
summary(model, input_data = (ds['image'], ds['features']))

Layer (type:depth-idx)                   Output Shape              Param #
PawModel                                 [64, 1]                   --
├─Sequential: 1-1                        [64, 128]                 --
│    └─BatchNorm2d: 2-1                  [64, 3, 128, 128]         6
│    └─Conv2d: 2-2                       [64, 16, 128, 128]        448
│    └─MaxPool2d: 2-3                    [64, 16, 64, 64]          --
│    └─Conv2d: 2-4                       [64, 32, 64, 64]          4,640
│    └─MaxPool2d: 2-5                    [64, 32, 32, 32]          --
│    └─Conv2d: 2-6                       [64, 64, 32, 32]          18,496
│    └─MaxPool2d: 2-7                    [64, 64, 16, 16]          --
│    └─Conv2d: 2-8                       [64, 128, 16, 16]         73,856
│    └─MaxPool2d: 2-9                    [64, 128, 8, 8]           --
│    └─Flatten: 2-10                     [64, 8192]                --
│    └─Linear: 2-11                      [64, 128]                 1,048,7

Declare custom class to help reduce code in the training loop

In [54]:
# Taken and modified from Keras (keras.callbacks.EarlyStopping)
# The Lightning version is too complicated and incompatible with Fabric
class EarlyStopping:
    def __init__(self, monitor = 'val_loss', patience = 0, mode = 'min'):
        super().__init__()
        self.monitor = monitor
        self.patience = patience
        self.mode = mode
        self.on_train_begin()

    def on_train_begin(self):
        self.wait = 0
        self.best = None
        self.best_epoch = 0
        self.best_logs = None
        self.stop_training = False

    def on_epoch_end(self, epoch, logs: dict):
        current = logs.get(self.monitor)
        if current is None: return

        if self.__is_better(current, self.best):
            self.best = current
            self.best_epoch = epoch
            self.best_logs = deepcopy(logs)
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.stop_training = True

    def __is_better(self, current, best):
        if best is None: return True

        if self.mode == 'min':
            return current < best
        else:
            return current > best


# Wrap PyTorch loss function as Lightning metric
# To automatically get average loss at the end of epoch
class LossMetric(tm.Metric):
    def __init__(self, loss_cls: nn.Module, **kwargs):
        super().__init__(**kwargs)
        self.loss_fn = loss_cls(reduction = 'sum')

        self.add_state('sum_loss', default = torch.tensor(0.0), dist_reduce_fx = 'sum')
        self.add_state('total', default = torch.tensor(0), dist_reduce_fx = 'sum')

    # TODO: Should this be synced with Fabric precision setting?
    def update(self, preds: Tensor, target: Tensor) -> None:
        if preds.shape != target.shape:
            raise ValueError('Predictions and target must have the same shape')

        with torch.no_grad():
            self.sum_loss += self.loss_fn(preds, target)
            self.total += target.shape[0]

    def compute(self) -> Tensor:
        return self.sum_loss / self.total


# Quickly save/load text file
class QSave:
    @staticmethod
    def save(obj: dict | str, path: str) -> None:
        with open(path, 'w') as f:
            if isinstance(obj, dict):
                json.dump(obj, f, indent = 2)
            else:
                f.write(obj)

    @staticmethod
    def load(path: str) -> dict | str:
        with open(path, 'w') as f:
            if path.rsplit('.')[-1] == 'json':
                return json.load(f)
            else:
                return f.read()

In [55]:
optimizer = torch.optim.Adam(model.parameters(), lr = cfg['lr'])
criterion = torch.nn.BCELoss()

metrics = {
    'bce': LossMetric(torch.nn.BCELoss),
    'rmse': tm.MeanSquaredError(squared = False)
}

cb = {
    'early_stop': EarlyStopping(
        monitor = 'val_bce',
        patience = cfg['patience'],
        mode = 'min'
    )
}

# ----------

# Save the optimizer and criterion name to log to MLFlow later
# Fabric may change the class name later, so we save them early
cfg['optimizer'] = optimizer.__class__.__name__
cfg['criterion'] = criterion.__class__.__name__
cfg['monitor'] = 'val_bce'
cfg['monitor_mode'] = 'min'

print('MLFlow parameters:', cfg)

# ----------

# Initiate Lightning Fabric with GPU accelerator
# Without this, we have to call "to_device" everywhere
fabric = Fabric(accelerator = 'gpu')

# Set all tensors on these objects to use GPU by wrapping them as Fabric classes
# Once wrapped, the class name and some of its properties will change too
# You can access the original class by adding ".module" or ".optimizer"
model, optimizer = fabric.setup(model, optimizer)
for key in ['train', 'val', 'test']:
    loader[key] = fabric.setup_dataloaders(loader[key])
for key in metrics.keys():
    metrics[key] = fabric.setup_module(metrics[key])

Seed set to 1337


{'csv_path': 'data/raw/data.csv', 'img_dir': 'data/raw/images', 'model_dir': 'model', 'sample_size': 1000, 'img_size': (128, 128), 'seed': 1337, 'lr': 0.001, 'batch_size': 64, 'epochs': 1, 'monitor': 'val_bce', 'patience': 5, 'optimizer': 'Adam', 'criterion': 'BCELoss'}


For examples on how to use Fabric on distributed setup, refer to [this link](https://lightning.ai/docs/fabric/2.5.1/examples/)

See also Fabric [methods](https://lightning.ai/docs/fabric/2.5.1/api/fabric_methods.html), [model hook](https://lightning.ai/docs/fabric/2.5.1/guide/lightning_module.html), [callbacks](https://lightning.ai/docs/fabric/2.5.1/guide/callbacks.html), [launch function](https://lightning.ai/docs/fabric/2.5.1/fundamentals/notebooks.html), and [model saving](https://lightning.ai/docs/fabric/2.5.1/guide/checkpoint/checkpoint.html). Those may be different from my current implementation

In [56]:
# Logs for current and all epochs
logs = {}
history = {}

# Reset early stop state
cb['early_stop'].on_train_begin()

# ----------

for epoch in range(1, cfg['epochs'] + 1):
    # ----------
    # Training epoch start

    model.train()

    for step, ds in enumerate(loader['train']):
        preds = model(ds['image'], ds['features'])
        loss = criterion(preds, ds['target'])

        # Backward pass
        optimizer.zero_grad()
        fabric.backward(loss)
        # Update parameters (weights)
        optimizer.step()

        for name in metrics:
            metrics[name](preds, ds['target'])

    # ----------
    # Training epoch end

    for name in metrics:
        logs[name] = metrics[name].compute().item()
        metrics[name].reset()

    # ----------
    # Validation epoch start

    model.eval()

    with torch.no_grad():
        for step, ds in enumerate(loader['val']):
            preds = model(ds['image'], ds['features'])

            for name in metrics:
                metrics[name](preds, ds['target'])

    # ----------
    # Validation epoch end

    for name in metrics:
        logs['val_' + name] = metrics[name].compute().item()
        metrics[name].reset()

    # ----------
    # Misc at the end of each epoch

    logs['epoch'] = epoch
    cb['early_stop'].on_epoch_end(epoch, logs)
    print(f'End of epoch {epoch}: {logs}')

    # Append current epoch logs to history
    for name in logs.keys():
        result = history.get(name, [])
        history[name] = result + [ logs[name] ]

    # Export current model and history
    if cb['early_stop'].best_epoch == epoch:
        print('Saving best model so far...')

        # Unwrap Fabric class as normal PyTorch class
        _model = model.module
        _optimizer = optimizer.optimizer

        shutil.rmtree(cfg['model_dir'], ignore_errors = True)
        os.makedirs(cfg['model_dir'], exist_ok = True)

        torch.save(_model.state_dict(), cfg['model_dir'] + '/model/model.pth')
        torch.save(_optimizer.state_dict(), cfg['model_dir'] + '/optimizer.pth')
        QSave.save(str(_model), cfg['model_dir'] + '/model.txt')
        QSave.save(history, cfg['model_dir'] + '/history.json')

    fabric.barrier()

    # Stop training if signaled by early stop
    if cb['early_stop'].stop_training:
        print('Early stopping...')
        break

End of epoch 1: {'bce': 0.815307080745697, 'rmse': 0.30014196038246155, 'val_bce': 0.783160924911499, 'val_rmse': 0.31336724758148193, 'epoch': 1}
Saving best model so far...


The pawpularity prediction is on average missed by 20 points, but whatever, I'm going to practice more on the MLOps part rather than the ML itself so we can stop here

In [57]:
cb['early_stop'].best_logs

{'bce': 0.815307080745697,
 'rmse': 0.30014196038246155,
 'val_bce': 0.783160924911499,
 'val_rmse': 0.31336724758148193}