# Fashion Mnist DNN ONNX 변환 Tutorial

## 외부 파일 가져오기 & requirements 설치

In [None]:
from google.colab import drive
drive.mount("/content/drive")
import os
import sys
from datetime import datetime

drive_project_root = "/content/drive/MyDrive/#fastcampus"
sys.path.append(drive_project_root)
!pip install -r "/content/drive/MyDrive/#fastcampus/requirements.txt"

In [None]:
gpu_info = !nvidia-smi
gpu_info = "\n".join(gpu_info)
print(gpu_info)

In [None]:
from abc import abstractmethod
from typing import Optional
from typing import Dict
from typing import List
from typing import Union
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from omegaconf import OmegaConf
from omegaconf import DictConfig
import hydra
from hydra.core.config_store import ConfigStore
import pytorch_lightning as pl

import onnx
import onnxruntime as ort
from onnx_tf.backend import prepare
import tensorflow as tf
import torch
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch_optimizer import RAdam
from torch_optimizer import AdamP
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import random_split
from torchvision.datasets import FashionMNIST
from torchvision import transforms
import wandb

from efficientnet_pytorch import EfficientNet

In [None]:
from data_utils import dataset_split
from config_utils import flatten_dict
from config_utils import register_config
from config_utils import configure_optimizers_from_cfg
from config_utils import get_loggers
from config_utils import get_callbacks
from custom_math import softmax

## 모델 (Multi-layer Perceptron) (MLP) ! 정의
## 모델 MLPWithDropout 정의


In [None]:
class BaseLightningModule(pl.LightningModule):
    def __init__(self, cfg: DictConfig):
        pl.LightningModule.__init__(self)
        self.cfg = cfg
        self.loss_function = nn.CrossEntropyLoss()
    
    @abstractmethod
    def forward(self, x):
        raise NotImplementedError()
    
    def configure_optimizers(self):
        self._optimizers, self._schedulers = configure_optimizers_from_cfg(self.cfg, self)
        return self._optimizers, self._schedulers
    
    def _forward(self, images, labels, mode: str):

        assert mode in ["train", "val", "test"]

        # get predictions
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

        # get loss (Loss 계산)
        loss = self.loss_function(outputs, labels)
        corrects = torch.sum(preds == labels.data)
        acc = corrects / len(outputs)

        return {
            f"{mode}_loss": loss,
            f"{mode}_acc": acc,
        }, {
            f"{mode}_outputs": outputs,
            f"{mode}_preds": preds,
            f"{mode}_images": images,
            f"{mode}_labels": labels,
            f"{mode}_corrects": corrects,
        }

    
    def training_step(self, batch, batch_idx):
        images, labels = batch
        logs, _ = self._forward(images, labels, mode="train")
        self.log_dict(logs)
        logs["loss"] = logs["train_loss"]
        return logs
    
    def validation_step(self, batch, batch_idx):
        images, labels = batch
        logs, _ = self._forward(images, labels, mode="val")
        self.log_dict(logs)
        logs["loss"] = logs["val_loss"]
        return logs
    
    def test_step(self, batch, batch_idx):
        images, labels = batch
        logs, logs_detail = self._forward(images, labels, mode="test")
        self.log_dict(logs)
        logs["loss"] = logs["test_loss"]
        logs.update(logs_detail)
        return logs
    
    def test_epoch_end(self, step_end_outputs):
        
        model_outputs = torch.cat([o["test_outputs"] for o in step_end_outputs]).detach().cpu().numpy()
        labels = torch.cat([o["test_labels"] for o in step_end_outputs]).detach().cpu().numpy()
        preds = torch.cat([o["test_preds"] for o in step_end_outputs]).detach().cpu().numpy()
        corrects = torch.cat([o["test_corrects"] for o in step_end_outputs]).detach().cpu().numpy()
        losses = torch.cat([o["test_loss"] for o in step_end_outputs]).detach().cpu().numpy()

        final_outs = softmax(model_outputs, axis=1)

        fpr = {}
        tpr = {}
        thresh = {}
        n_class = self.cfg.data.n_class

        for i in range(n_class):
            fpr[i], tpr[i], thresh[i] = roc_curve(test_labels_list, model_outputs[:, i], pos_label=i)

        # plot.
        for i in range(n_class):
            plt.plot(fpr[i], tpr[i], linestyle="--", label=f"Class {i} vs Rest")
        plt.title("Multi-class ROC Curve")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.legend(loc="best")
        # plt.show()

        auc_score = roc_auc_score(
            test_labels_list, test_outputs_list, multi_class="ovo", average="macro"
        )

        acc = corrects / len(corrects)
        mean_loss = np.mean(losses)

        return {
            "test_auc_score": auc_score,
            "test_accuracy": acc,
            "test_loss": mean_loss
        }
    
# TODO: add below things in the configs.
# cfg.data.n_class
# cfg.opt.lr_schedulers
# cfg.opt.optimizers 

In [None]:
# Define Model.

class MLP(nn.Module):
    def __init__(self, in_dim: int, h1_dim: int, h2_dim: int, out_dim: int):
        super().__init__()
        self.linear1 = nn.Linear(in_dim, h1_dim)
        self.linear2 = nn.Linear(h1_dim, h2_dim)
        self.linear3 = nn.Linear(h2_dim, out_dim)
        self.relu = F.relu
        pass
    
    def forward(self, input):
        x = torch.flatten(input, start_dim=1)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        out = self.linear3(x)
        # out = F.softmax(out)
        return out

class PLMLP(BaseLightningModule):
    def __init__(self, cfg: DictConfig):
        BaseLightningModule.__init__(self, cfg=cfg)
        self.linear1 = nn.Linear(cfg.model.in_dim, cfg.model.h1_dim)
        self.linear2 = nn.Linear(cfg.model.h1_dim, cfg.model.h2_dim)
        self.linear3 = nn.Linear(cfg.model.h2_dim, cfg.model.out_dim)
        self.relu = F.relu
        pass
    
    def forward(self, input):
        x = torch.flatten(input, start_dim=1)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        out = self.linear3(x)
        # out = F.softmax(out)
        return out


class MLPWithDropout(MLP):
    def __init__(self, in_dim: int, h1_dim: int, h2_dim: int, out_dim: int, dropout_prob: float):
        super().__init__(in_dim, h1_dim, h2_dim, out_dim)
        self.dropout1 = nn.Dropout(dropout_prob)
        self.dropout2 = nn.Dropout(dropout_prob)
    
    def forward(self, input):
        x = torch.flatten(input, start_dim=1)
        x = self.relu(self.linear1(x))
        x = self.dropout1(x)
        x = self.relu(self.linear2(x))
        x = self.dropout2(x)
        out = self.linear3(x)
        # out = F.softmax(out)
        return out


## CNN 모델 정의

In [None]:
_cnn_cfg_dict: dict = {
    "layer_1": {
        "conv2d_in_channels": 1,
        "conv2d_out_channels": 32,
        "conv2d_kernel_size": 3,
        "conv2d_padding": 1,
        "maxpool2d_kernel_size": 2,
        "maxpool2d_stride": 2,
    },
    "layer_2": {
        "conv2d_in_channels": 32,
        "conv2d_out_channels": 64,
        "conv2d_kernel_size": 3,
        "conv2d_padding": 0,
        "maxpool2d_kernel_size": 2,
        "maxpool2d_stride": 1,
    },
    "fc_1": {
        "in_features": 2304, #  수정 필요!
        "out_features": 512,
    },
    "fc_2": {
        "in_features": 512,
        "out_features": 128,        
    },
    "fc_3": {
        "in_features": 128,
        "out_features": 10,
    },
    "dropout_prob": 0.25,
}
_cnn_cfg = OmegaConf.create(_cnn_cfg_dict)
print(OmegaConf.to_yaml(_cnn_cfg))

class CNN(nn.Module):
    def __init__(self, cfg: DictConfig = _cnn_cfg):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(
                in_channels=cfg.layer_1.conv2d_in_channels,
                out_channels=cfg.layer_1.conv2d_out_channels,
                kernel_size=cfg.layer_1.conv2d_kernel_size,
                padding=cfg.layer_1.conv2d_padding
            ),
            nn.BatchNorm2d(cfg.layer_1.conv2d_out_channels),
            nn.ReLU(),
            nn.MaxPool2d(
                kernel_size=cfg.layer_1.maxpool2d_kernel_size,
                stride=cfg.layer_1.maxpool2d_kernel_size
            )
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(
                in_channels=cfg.layer_2.conv2d_in_channels,
                out_channels=cfg.layer_2.conv2d_out_channels,
                kernel_size=cfg.layer_2.conv2d_kernel_size,
                padding=cfg.layer_2.conv2d_padding
            ),
            nn.BatchNorm2d(cfg.layer_2.conv2d_out_channels),
            nn.ReLU(),
            nn.MaxPool2d(
                kernel_size=cfg.layer_2.maxpool2d_kernel_size,
                stride=cfg.layer_2.maxpool2d_kernel_size
            )
        )
        self.fc1 = nn.Linear(
            in_features=cfg.fc_1.in_features,
            out_features=cfg.fc_1.out_features,
        )
        self.fc2 = nn.Linear(
            in_features=cfg.fc_2.in_features,
            out_features=cfg.fc_2.out_features,
        )
        self.fc3 = nn.Linear(
            in_features=cfg.fc_3.in_features,
            out_features=cfg.fc_3.out_features,
        )
        self.dropout = nn.Dropout2d(cfg.dropout_prob)


    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.dropout(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out


In [None]:
_efficient_finetune_cfg_dict: dict = {
    "efficient_net_model_name": "efficientnet-b1",
    "num_classes": 10
}
_efficient_finetune_cfg_cfg = OmegaConf.create(_efficient_finetune_cfg_dict)
print(OmegaConf.to_yaml(_efficient_finetune_cfg_cfg))

class EfficientNetFinetune(nn.Module):
    def __init__(self, cfg: DictConfig = _efficient_finetune_cfg_cfg):
        super().__init__()
        self.efficientnet = EfficientNet.from_pretrained(
            cfg.efficient_net_model_name,
            cfg.num_classes
        )
    
    def forward(self, x):
        out = self.efficientnet(x)
        return out

In [None]:
# transform = transforms.Compose(
#     [
#         transforms.Resize(224),
#         transforms.ToTensor(),
#         transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
#         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
#     ]
# )



# data configs
data_fashion_mnist_cfg = {
    "name": "fashion_mnist",
    "data_root": os.path.join(os.getcwd(), "data"),
    "W": 28,
    "H": 28,
    "C": 1,
    "n_class": 10,
}

# model configs 
model_mnist_mlp_cfg = {
    "name": "MLP",
    "in_dim": 28*28,
    "h1_dim": 128,
    "h2_dim": 64,
    "out_dim": 10,
    "feature": {
        "normalize": {
            "mean": [0.5],
            "std": [0.5],
        }
    }
}

# optimizer configs
opt_cfg = {
    "optimizers": [
        {
            "name": "RAdam",
            "kwargs": {
                "lr": 1e-3,
                "betas": (0.9, 0.999),
                "eps": 1e-8,
                "weight_decay": 0,
            },
        }
    ],
    "lr_schedulers": [
        {
            "name": None,
            "kwargs": {}
        }
    ]
}

_merged_cfg_presets = {
    "mlp_fashion_mnist": {
        "data": data_fashion_mnist_cfg,
        "model": model_mnist_mlp_cfg,
        "opt": opt_cfg, 
    },
}

### hydra composition ###
# clear hydra instance first
hydra.core.global_hydra.GlobalHydra.instance().clear()

# register preset configs
register_config(_merged_cfg_presets)


# initializing
hydra.initialize(config_path=None)

# compose
cfg = hydra.compose("mlp_fashion_mnist")

###

# override some cfg 
run_name = f"{datetime.now().isoformat(timespec='seconds')}-{cfg.model.name}-{cfg.data.name}"


## Define train configs
project_root_dir = os.path.join(
    drive_project_root, "runs", "dnn-tutorial-fashion-mnist-runs"
)
save_dir = os.path.join(project_root_dir, run_name)
run_root_dir = os.path.join(project_root_dir, run_name)

# train configs
train_cfg = {
    "train_batch_size": 128,
    "val_batch_size": 32,
    "test_batch_size": 32,
    "train_val_split": [0.9, 0.1],
    "run_root_dir": run_root_dir,
    "trainer_kwargs": {
        "accelerator": "dp",
        "gpus": "0",
        "max_epochs": 50,
        "val_check_interval": 1.0,
        "log_every_n_steps": 100,
        "flush_logs_every_n_steps": 100,
    }
}

# logger configs 
log_cfg = {
    "loggers": {
        "WandbLogger": {
            "project": "fastcampus_fashion_mnist_tutorials",
            "name": run_name,
            "tags": ["fastcampus_fashion_mnist_tutorials"],
            "save_dir": run_root_dir,
        },
        "TensorBoardLogger": {
            "save_dir": project_root_dir,
            "name": run_name,
        }
    },
    "callbacks": {
        "ModelCheckpoint": {
            "save_top_k": 3,
            "monitor": "val_loss",
            "mode": "min",
            "verbose": True,
            "dirpath": os.path.join(run_root_dir, "weights"),
            "filename": "{epoch}-{val_loss:.3f}-{val_acc:.3f}"
        },
        "EarlyStopping": {
            "monitor": "val_loss",
            "mode": "min",
            "patience": 3,
            "verbose": True,
        }
    }
}

# unlock config & set train, log confg
OmegaConf.set_struct(cfg, False)
cfg.train = train_cfg
cfg.log = log_cfg

# lock config
OmegaConf.set_struct(cfg, True)
print(OmegaConf.to_yaml(cfg))



In [None]:
data_root = cfg.data.data_root

# 전처리 부분 (preprocessing) & 데이터 셋 정의.
transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize(
            cfg.model.feature.normalize.mean,
            cfg.model.feature.normalize.std,
        ), # mean, # std
    ]
)

# transform = transforms.Compose(
#     [
#         transforms.Resize(cfg.data.W*cfg.data.H*cfg.data.C),
#         transforms.ToTensor(),
#         transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
#         transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
#     ]
# )

fashion_mnist_dataset = FashionMNIST(data_root, download=True, train=True, transform=transform)
test_dataset = FashionMNIST(data_root, download=True, train=False, transform=transform)

datasets = dataset_split(fashion_mnist_dataset, split=cfg.train.train_val_split)

train_dataset = datasets["train"]
val_dataset = datasets["val"]

train_batch_size = cfg.train.train_batch_size
val_batch_size = cfg.train.val_batch_size
test_batch_size = cfg.train.test_batch_size

train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=0
)
val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=0
)
test_dataloader = torch.utils.data.DataLoader(
    test_dataset, batch_size=val_batch_size, shuffle=False, num_workers=0
)

## 모델 선언 및 손실 함수, 최적화(Optimizer) 정의, Tensorboard Logger 정의 

In [None]:
# model define

def get_pl_model(cfg: DictConfig, checkpoint_path: Optional[str] = None):

    if cfg.model.name == "MLP":
        model = PLMLP(cfg)
    else:
        raise NotImplementedError()
    
    if checkpoint_path is not None:
        model = model.load_from_checkpoint(cfg=cfg, checkpoint_path=checkpoint_path)
    return model

model = get_pl_model(cfg)
print(model)


In [None]:
logger = get_loggers(cfg)
callbacks = get_callbacks(cfg)

trainer = pl.Trainer(
    callbacks=callbacks,
    logger=logger,
    default_root_dir=cfg.train.run_root_dir,
    num_sanity_val_steps=2,
    **cfg.train.trainer_kwargs
)

In [None]:
%load_ext tensorboard
%tensorboard --logdir /content/drive/MyDrive/\#fastcampus/runs/dnn-tutorial-fashion-mnist-runs/

trainer.fit(model, train_dataloader, val_dataloader)
# trainer.test(model, test_dataloader)

In [None]:
# Config file 저장
OmegaConf.save(cfg, os.path.join(run_root_dir, "config.yaml"))

# 서비스 준비

In [None]:
loaded_cfg = OmegaConf.load(os.path.join(run_root_dir, "config.yaml"))

log_model_path = run_root_dir
model_path = os.path.join(
    log_model_path,
    "weights",
    "epoch=1-val_loss=0.420-val_acc=0.848.ckpt"
)

loaded_model = PLMLP.load_from_checkpoint(model_path, cfg=loaded_cfg)
loaded_model.cpu()
loaded_model.eval()

print(loaded_model)

## Pytorch --> Onnx 변환

In [None]:
onnx_path = os.path.join(log_model_path, "plmlp.onnx")
loaded_model.to_onnx(
    onnx_path,
    input_sample=test_dataset[0][0],
    input_names=["imgs"],
    output_names=["logits"],
    opset_version=11,
    dynamic_axes={"imgs": {0: "batch"}},
    verbose=True
)

In [None]:
# testing 
ort_sess = ort.InferenceSession(onnx_path)
print(ort_sess.get_modelmeta())

inputs = [i.name for i in ort_sess.get_inputs()]
outputs = [i.name for i in ort_sess.get_outputs()]
print(inputs, outputs)

In [None]:
# onnx result
onnx_res = ort_sess.run([], {"imgs": test_dataset[0][0].cpu().numpy()})
# pl torch model result
model_res = loaded_model(test_dataset[0][0]).cpu().detach().numpy()

In [None]:
# validation
assert np.isclose(onnx_res[0], model_res).all()

## ONNX to TF


In [None]:
# export to tensorflow from onnx
output_tf_path = os.path.join(log_model_path, "tf_model.pb")
onnx_model = onnx.load(onnx_path)
tf_rep = prepare(onnx_model)
tf_rep.export_graph(output_tf_path)

In [None]:
# tf model load & get outputs
tf_model = tf.saved_model.load(output_tf_path)
tf_outputs = tf_model(imgs=test_dataset[0][0].numpy())

In [None]:
# validation
assert np.isclose(tf_outputs["logits"].numpy(), model_res).all()

## From TF to Onnx


In [None]:
onnx_from_tf_path = os.path.join(log_model_path, "from_tf.onnx")

In [None]:
!python -m tf2onnx.convert --saved-model $output_tf_path --opset 11 --output $onnx_from_tf_path

In [None]:
ort_sess = ort.InferenceSession(onnx_from_tf_path)

In [None]:
# validation
onnx_last_res = ort_sess.run([], {"imgs": test_dataset[0][0].cpu().numpy()})
assert np.isclose(onnx_last_res[0], model_res).all()