In [1]:
import argparse
import os
from typing import List
from typing import Optional

import lightning.pytorch as pl
import optuna
from optuna.integration import PyTorchLightningPruningCallback
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torchvision import datasets
from torchvision import transforms

In [2]:
PERCENT_VALID_EXAMPLES = 0.1
BATCHSIZE = 128
CLASSES = 10
EPOCHS = 10
DIR = os.getcwd()

In [4]:

class Net(nn.Module):
    def __init__(self, dropout: float, output_dims: List[int]):
        super().__init__()
        layers: List[nn.Module] = []

        input_dim: int = 28 * 28
        for output_dim in output_dims:
            layers.append(nn.Linear(input_dim, output_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            input_dim = output_dim

        layers.append(nn.Linear(input_dim, CLASSES))

        self.layers: nn.Module = nn.Sequential(*layers)

    def forward(self, data: torch.Tensor) -> torch.Tensor:
        logits = self.layers(data)
        return F.log_softmax(logits, dim=1)
    
Net(dropout=0.2, output_dims=[128, 64, 3])

Net(
  (layers): Sequential(
    (0): Linear(in_features=784, out_features=128, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.2, inplace=False)
    (3): Linear(in_features=128, out_features=64, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.2, inplace=False)
    (6): Linear(in_features=64, out_features=3, bias=True)
    (7): ReLU()
    (8): Dropout(p=0.2, inplace=False)
    (9): Linear(in_features=3, out_features=10, bias=True)
  )
)

In [6]:
class LightningNet(pl.LightningModule):
    def __init__(self, dropout: float, output_dims: List[int]):
        super().__init__()
        self.model = Net(dropout, output_dims)

    def forward(self, data: torch.Tensor) -> torch.Tensor:
        return self.model(data.view(-1, 28 * 28))

    def training_step(self, batch, batch_idx: int) -> torch.Tensor:
        data, target = batch
        output = self(data)
        return F.nll_loss(output, target)

    def validation_step(self, batch, batch_idx: int) -> None:
        data, target = batch
        output = self(data)
        pred = output.argmax(dim=1, keepdim=True)
        accuracy = pred.eq(target.view_as(pred)).float().mean()
        self.log("val_acc", accuracy, sync_dist=True)
        self.log("hp_metric", accuracy, on_step=False, on_epoch=True, sync_dist=True)

    def configure_optimizers(self) -> optim.Optimizer:
        return optim.Adam(self.model.parameters())


In [7]:
class FashionMNISTDataModule(pl.LightningDataModule):
    def __init__(self, data_dir: str, batch_size: int):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

    def setup(self, stage: Optional[str] = None) -> None:
        self.mnist_test = datasets.FashionMNIST(
            self.data_dir, train=False, download=True, transform=transforms.ToTensor()
        )
        mnist_full = datasets.FashionMNIST(
            self.data_dir, train=True, download=True, transform=transforms.ToTensor()
        )
        self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000])

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.mnist_train, batch_size=self.batch_size, shuffle=True, pin_memory=True
        )

    def val_dataloader(self) -> DataLoader:
        return DataLoader(
            self.mnist_val, batch_size=self.batch_size, shuffle=False, pin_memory=True
        )

    def test_dataloader(self) -> DataLoader:
        return DataLoader(
            self.mnist_test, batch_size=self.batch_size, shuffle=False, pin_memory=True
        )

In [12]:

def objective(trial: optuna.trial.Trial) -> float:
    # We optimize the number of layers, hidden units in each layer and dropouts.
    n_layers = trial.suggest_int("n_layers", 1, 3)
    dropout = trial.suggest_float("dropout", 0.2, 0.5)
    output_dims = [
        trial.suggest_int("n_units_l{}".format(i), 4, 128, log=True) for i in range(n_layers)
    ]

    model = LightningNet(dropout, output_dims)
    datamodule = FashionMNISTDataModule(data_dir=DIR, batch_size=BATCHSIZE)
    callback = PyTorchLightningPruningCallback(trial, monitor="val_acc")

    trainer = pl.Trainer(
        logger=True,
        limit_val_batches=PERCENT_VALID_EXAMPLES,
        enable_checkpointing=False,
        max_epochs=EPOCHS,
        accelerator="auto" if torch.cuda.is_available() else "cpu",
        devices=2,
        callbacks=[callback],
        strategy="ddp_notebook",
        # strategy="ddp_spawn",
    )
    hyperparameters = dict(n_layers=n_layers, dropout=dropout, output_dims=output_dims)
    trainer.logger.log_hyperparams(hyperparameters)
    trainer.fit(model, datamodule=datamodule)

    callback.check_pruned()

    return trainer.callback_metrics["val_acc"].item()

In [None]:
pruner: optuna.pruners.BasePruner = optuna.pruners.MedianPruner()
storage = "sqlite:///example.db"
study = optuna.create_study(
    study_name="pl_ddp",
    storage=storage,
    direction="maximize",
    pruner=pruner,
    load_if_exists=True,
)
study.optimize(objective, n_trials=100, timeout=600)

print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
trial = study.best_trial

print("  Value: {}".format(trial.value))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
import torch
from torch import nn
from torchvision.models import (efficientnet_b0, EfficientNet_B0_Weights,
                                efficientnet_b1, EfficientNet_B1_Weights, 
                                efficientnet_b2, EfficientNet_B2_Weights, 
                                efficientnet_b3, EfficientNet_B3_Weights, 
                                efficientnet_b4, EfficientNet_B4_Weights, 
                                efficientnet_v2_m, EfficientNet_V2_M_Weights, 
                                efficientnet_v2_s, EfficientNet_V2_S_Weights)

NUM_CLASS = 3
class Network(nn.Module):
    def __init__(self, base_model, dropout: float, output_dims: List[int]):
        super().__init__()

        self.base_model = base_model
        input_dim: int = base_model.classifier[1].in_features

        layers: List[nn.Module] = []
        for output_dim in output_dims:
            layers.append(nn.Linear(input_dim, output_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            input_dim = output_dim
        layers.append(nn.Linear(input_dim, NUM_CLASS))

        self.base_model.classifier = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.base_model(x)

Network(base_model=efficientnet_b4(), dropout=0.3, output_dims=[128, 64, 32])


In [None]:
class LightningNet(pl.LightningModule):
    def __init__(self, base_model, dropout: float, output_dims: List[int]) -> None:
        super().__init__()
        self.model = Network(base_model, dropout, output_dims)

    def forward(self, data: torch.Tensor) -> torch.Tensor:
        return self.model(data)

    def training_step(self, batch: List[torch.Tensor], batch_idx: int) -> torch.Tensor:
        data, target = batch
        output = self(data)
        return F.nll_loss(output, target)

    def validation_step(self, batch: List[torch.Tensor], batch_idx: int) -> None:
        data, target = batch
        output = self(data)
        pred = output.argmax(dim=1, keepdim=True)
        accuracy = pred.eq(target.view_as(pred)).float().mean()
        self.log("val_acc", accuracy)
        self.log("hp_metric", accuracy, on_step=False, on_epoch=True)

    def configure_optimizers(self) -> optim.Optimizer:
        return optim.Adam(self.model.parameters())

In [63]:


import cv2
import torch
from torch.utils.data import Dataset

class ImageDataset(Dataset):
    def __init__(self, df, input_shape, transform=None):
        self.df = df
        self.transform = transform
        self.input_shape = input_shape

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.loc[idx, 'img_path']
        img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
        xmin, ymin, xmax, ymax = self.df.loc[idx, ['xmin', 'ymin', 'xmax', 'ymax']].values
        xmin, ymin, xmax, ymax = int(xmin), int(ymin), int(xmax), int(ymax)

        img = img[ymin:ymax, xmin:xmax]
        img = cv2.resize(img, self.input_shape[:-1], interpolation=cv2.INTER_LINEAR)

        if self.transform is not None:
            img = self.transform(img)

        label = int(self.df.loc[idx, 'sparse_label'])
        return torch.from_numpy(img.transpose((2, 0, 1))), torch.tensor(label)

# Example of using torchvision transforms for data augmentation
from torchvision import transforms

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.2),
    transforms.ToTensor(),
])

In [64]:
import torchmetrics


In [66]:
help(
    torchmetrics.Recall
)

Help on class Recall in module torchmetrics.classification.precision_recall:

class Recall(torchmetrics.classification.base._ClassificationTaskWrapper)
 |  Recall(task: Literal['binary', 'multiclass', 'multilabel'], threshold: float = 0.5, num_classes: Optional[int] = None, num_labels: Optional[int] = None, average: Optional[Literal['micro', 'macro', 'weighted', 'none']] = 'micro', multidim_average: Optional[Literal['global', 'samplewise']] = 'global', top_k: Optional[int] = 1, ignore_index: Optional[int] = None, validate_args: bool = True, **kwargs: Any) -> torchmetrics.metric.Metric
 |  
 |  Compute `Recall`_.
 |  
 |  .. math:: \text{Recall} = \frac{\text{TP}}{\text{TP} + \text{FN}}
 |  
 |  Where :math:`\text{TP}` and :math:`\text{FN}` represent the number of true positives and
 |  false negatives respectively. The metric is only proper defined when :math:`\text{TP} + \text{FN} \neq 0`. If this
 |  case is encountered for any class/label, the metric for that class/label will be set