In [None]:
!pip install torch
!pip install torchvision
!pip install optuna

!pip install torch_tb_profiler
!pip install pytorch-lightning
# fix for collab env
!ln -sf /opt/bin/nvidia-smi /usr/bin/nvidia-smi

In [None]:
# GRAPHING AND FUN STUFF
from torch.profiler import *

import pandas as pd
import seaborn as sns

import matplotlib
import matplotlib.pyplot as plt

# nice data table exploration in collab https://colab.research.google.com/notebooks/data_table.ipynb#scrollTo=jcQEX_3vHOUz
from google.colab import data_table
data_table.enable_dataframe_formatter()

In [None]:
# DATASET LOAD
from torchvision.datasets import CIFAR100
from torchvision import transforms
from torch import utils
import os

trans = transforms.Compose([
  transforms.Resize((128,128)),
  transforms.ToTensor(),
])

# TODO should do if gpu check
kwargs = { "pin_memory": True, "num_workers": os.cpu_count() }
dataset_train = CIFAR100(root="/content/image-net", download=True, transform=trans)
dataset_test = CIFAR100(root="/content/image-net", train=False, transform=trans)
train_loader = utils.data.DataLoader(dataset_train, **kwargs)
test_loader = utils.data.DataLoader(dataset_test, **kwargs)

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to /content/image-net/cifar-100-python.tar.gz


100%|██████████| 169001437/169001437 [00:01<00:00, 89436344.94it/s]


Extracting /content/image-net/cifar-100-python.tar.gz to /content/image-net


In [None]:
# COMMON RESNET UTILITIES
import torch
from torch import optim, nn,  utils, Tensor
from itertools import repeat


#helper literally just to ensure all biases are false as in the pytorch implmentation
def conv2d_helper(in_channels, out_channels, kernel_size, stride, padding):
  return nn.Conv2d(in_channels, out_channels, kernel_size, stride=stride, padding=padding, bias=True)

class ResBlock(nn.Module):

  def __init__(self, in_channels, out_channels, kernel_size, dropout_percent=0.5, special_i_skip=False, half_res=False):
    super().__init__()
    self.block = nn.Sequential(
      conv2d_helper(in_channels, out_channels, kernel_size, stride=(2 if half_res else 1), padding=1),
      nn.BatchNorm2d(out_channels), # from other example
      nn.ReLU(),
      conv2d_helper(out_channels, out_channels, kernel_size, stride=1, padding=1),
      nn.BatchNorm2d(out_channels), # from other example
      # nn.Dropout(p=dropout_percent) # from other example
    )
    self.skip_connection = conv2d_helper(in_channels, out_channels, kernel_size=1, stride=2, padding=0) if half_res else nn.Identity()

  def forward(self, x):
    return nn.functional.relu(self.block(x) + self.skip_connection(x))

def create_meta_block(in_channels, out_channels, repeats, half_res=True):
  # every conv is a 3x3 kernel
  # error to call with less than 1
  return nn.Sequential(
    ResBlock(in_channels, out_channels, kernel_size=3, half_res=half_res),
    *repeat(ResBlock(out_channels, out_channels, kernel_size=3), repeats-1)
  )

In [None]:
# RESNET FROM SCRATCH IMPLEMENTATION IN PYTORCH
class NativeResNet34(nn.Module):
  def __init__(self, num_classes):
    super().__init__()

    init_conv = nn.Sequential(
      conv2d_helper(in_channels=3, out_channels=64, kernel_size=7,
                stride=2, padding=3),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
    )

    self.model = nn.Sequential(
      init_conv,
      create_meta_block(64, 64, 3, False),
      create_meta_block(64, 128, 4),
      create_meta_block(128, 256, 6),
      create_meta_block(256, 512, 3)
    )
    self.lin = nn.Linear(in_features=512, out_features=num_classes)

  def forward(self, x):
    map = self.model(x)
    pooled_map = nn.functional.avg_pool2d(map, kernel_size=map.shape[-2:], stride=1)
    pooled_map = torch.flatten(pooled_map, start_dim=1)
    scores = self.lin(pooled_map)
    return scores

class NativeResNet18(nn.Module):
  def __init__(self, num_classes):
    super().__init__()

    init_conv = nn.Sequential(
      conv2d_helper(in_channels=3, out_channels=64, kernel_size=7,
                stride=2, padding=3),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
    )

    self.model = nn.Sequential(
      init_conv,
      create_meta_block(64, 64, 2, False),
      create_meta_block(64, 128, 2),
      create_meta_block(128, 256, 2),
      create_meta_block(256, 512, 2)
    )
    self.lin = nn.Linear(in_features=512, out_features=num_classes)

  def forward(self, x):
    map = self.model(x)
    pooled_map = nn.functional.avg_pool2d(map, kernel_size=map.shape[-2:], stride=1)
    pooled_map = torch.flatten(pooled_map, start_dim=1)
    scores = self.lin(pooled_map)
    return scores

In [None]:
### LIGHTNING WRAPPER
import pytorch_lightning as pl

class LightningResNet(pl.LightningModule):
  def __init__(self, backbone, learning_rate=0.1, batch_size=1024):
    super().__init__()
    # saves all args as lightning hyperparams
    # this saves them to the checkpoint and other logging mechanisms AND makes them accessible as self.ARG
    self.save_hyperparameters()
    self.backbone = backbone
    self.loss = nn.CrossEntropyLoss()

  def forward(self, x):
    return self.backbone(x)

  # MUST RETURN THE LOSS
  def training_step(self, batch, batch_idx):
    x, y = batch
    scores = self.forward(x)
    loss = self.loss(scores, y)

    # Logging to TensorBoard (if installed) by default
    self.log('train_loss', loss)
    return loss


  def validation_step(self, batch, batch_idx):
    x, y = batch
    scores = self.forward(x)
    loss = self.loss(scores, y)

    # calculate acc
    labels_hat = torch.argmax(scores, dim=1)
    val_acc = torch.sum(y == labels_hat).item() / (float(len(y)))

    # log the outputs!
    self.log_dict({'val_loss': loss, 'val_acc': val_acc})


  def test_step(self, batch, batch_idx):
    x, y = batch
    scores = self.forward(x)
    loss = self.loss(scores, y)

    # calculate acc
    labels_hat = torch.argmax(scores, dim=1)
    test_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'test_loss': loss, 'test_acc': test_acc})


  # MUST RETURN THE OPTIMIZER
  def configure_optimizers(self):
    optimizer = optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
    scheduler1 = ExponentialLR(optimizer, gamma=0.9)
    return optimizer
    # can return a Dictionary, with an "optimizer" key, and (optionally) a "lr_scheduler" key whose value is a single LR scheduler or lr_scheduler_config.

  def train_dataloader(self): #
    return utils.data.DataLoader(dataset_train, batch_size=self.hparams.batch_size)

  def val_dataloader(self): #
    return utils.data.DataLoader(dataset_train, batch_size=self.hparams.batch_size)

  def test_dataloader(self): #
    return utils.data.DataLoader(dataset_test, batch_size=self.hparams.batch_size)


In [None]:
model = LightningResNet(learning_rate=1e-1, backbone=NativeResNet18(num_classes=1000))

# from torchsummary import summary
# summary(model.cuda(), (3, 128, 128))

/usr/local/lib/python3.10/dist-packages/pytorch_lightning/utilities/parsing.py:199: Attribute 'backbone' is an instance of `nn.Module` and is already saved during checkpointing. It is recommended to ignore them using `self.save_hyperparameters(ignore=['backbone'])`.


In [None]:
%load_ext tensorboard
%tensorboard --logdir {checkpoint_dir}

In [None]:
### TRAIN DEFAULT
from pytorch_lightning.loggers import tensorboard

# may be default_root_dir
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
checkpoint_callback = ModelCheckpoint(dirpath=checkpoint_dir, save_top_k=3, monitor="val_loss")
# note as I learned if the logger has a default dir it will prefer that OVER DEFAULT ROOT DIR but manually setting dirpath fixes that
# https://github.com/Lightning-AI/pytorch-lightning/blob/90d04b5b86f37994cdceccc6de32f0e93b1cc7f0/src/lightning/pytorch/callbacks/model_checkpoint.py#L623
trainer = Trainer(callbacks=[checkpoint_callback], log_every_n_steps=10)

# # automatically restores model, epoch, step, LR schedulers, etc...
# trainer.fit(model, ckpt_path=resume_checkpoint)

trainer.fit(model)

# loaders as part of module should seperate to datamodule at some point
checkpoint_callback.best_model_path

ValueError: mount failed

In [None]:
from pytorch_lightning.tuner import Tuner
# Auto-scale batch size with binary search power actually its much faster to find ok result
# tuner = Tuner(trainer)
# tuner.scale_batch_size(model, mode="power")

# Fit as normal with new batch size
# trainer.fit(model)
# trainer.fit(model, train_dataloaders=train_loader)

In [None]:
# native train loop
from tqdm import tqdm

model = NativeResNet18()

optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

running_loss = 0.
last_loss = 0.
# Here, we use enumerate(training_loader) instead of
# iter(training_loader) so that we can track the batch
# index and do some intra-epoch reporting
for i, data in tqdm(enumerate(train_loader)):
    # Every data instance is an input + label pair
    inputs, labels = data

    # Zero your gradients for every batch!
    optimizer.zero_grad()

    # Make predictions for this batch
    outputs = model(inputs)

    # Compute the loss and its gradients
    loss = loss_fn(outputs, labels)
    loss.backward()

    # Adjust learning weights
    optimizer.step()

    # Gather data and report
    running_loss += loss.item()
    if i % 1000 == 999:
        last_loss = running_loss / 1000 # loss per batch
        print('  batch {} loss: {}'.format(i + 1, last_loss))
        tb_x = epoch_index * len(train_loader) + i + 1
        tb_writer.add_scalar('Loss/train', last_loss, tb_x)
        running_loss = 0.

In [None]:
### OPTUNA
import optuna
from pytorch_lightning import Trainer

def objective(trial):

    # Suggest a learning rate
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)

    # Set model's learning rate at creation
    model = LightningResNet(learning_rate = lr, batch_size = 2048)

    # Assuming you have a DataLoader instance named `train_dataloader`
    trainer = Trainer(max_epochs=1, limit_train_batches=1000, limit_val_batches=None, limit_test_batches=None)
    trainer.fit(model, train_loader)

    results = trainer.test(model, test_loader)
    # Return the validation loss or any other metric you want to optimize
    return results["test_loss"]

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=100)

In [None]:
# Print the best trial
print(study.best_trial.params)

In [None]:
results