# core

> Training and evaluating. 

In [1]:
#| default_exp core
%load_ext autoreload
%autoreload 2

In [2]:
#| hide
from nbdev.showdoc import *
# 数据集决定
# - 类别数量
# - 最佳增强方式
# 决定预处理和模型的部分结构

In [3]:
# | export
from pydantic import BaseModel


class ClassificationTaskConfig(BaseModel):
    vtab_dir: str = "/home/ai_pitch_perfector/datasets/vtab-1k/"
    subset_name: str = "cifar"
    initial_batch_size: int = 64
    experiment_index: int = 0  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 表示是第几次重复实验

In [4]:
config = ClassificationTaskConfig()

In [13]:
# | export
import lightning as L


def init_env(seed: int = 42):
    # Ensure that all operations are deterministic on GPU (if used) for reproducibility
    L.seed_everything(seed)
    import torch

    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


from sota_image_classification import data
from sota_image_classification.data.vtab import VtabSplit, VtabDataset

# data.create_dataset()
from functools import partial

partial_vtab = partial(
    VtabDataset,
    vtab_dir=config.vtab_dir,
    subset_name=config.subset_name,
)
get_vtab_dataset = (
    lambda split, batch_size=config.initial_batch_size: data.create_loader(
        partial_vtab(split=split),
        input_size=(3, 224, 224),
        batch_size=batch_size,
        num_workers=4,
    )
)


def init_data(self, config):
    # return # data in format [(img:PIL, label:int)]
    self.train_dataset = get_vtab_dataset(VtabSplit.TRAIN)
    self.val_dataset = get_vtab_dataset(VtabSplit.VAL)
    self.train_val_dataset = get_vtab_dataset(VtabSplit.TRAIN_AND_VAL)
    self.test_dataset = get_vtab_dataset(VtabSplit.TEST)
    self.num_of_classes = len(self.train_dataset.dataset.classes)

In [6]:
self = lambda: None
init_data(self, config)
train_dataset = self.train_dataset
next(iter(train_dataset))
train_dataset.dataset[0]
# train_dataset.dataset.classes
num_of_classes = len(train_dataset.dataset.classes)
num_of_classes

100

In [7]:
def init_backbone(self, config):
    from transformers import Dinov2Model, Dinov2Config

    self.backbone = Dinov2Model(
        Dinov2Config(
            hidden_size=768 // 6,
            num_attention_heads=12 // 6,
            num_hidden_layers=12,
            mlp_ratio=4,
        )
    )
    # self.backbone.forward = partial(self.backbone.forward,
    #                                 # output_hidden_states=True,
    #                                 # return_dict=False
    #                                 )
    old_forward = self.backbone.forward
    self.backbone.forward = lambda x: old_forward(x).pooler_output

    self.hidden_dim = 768 // 6


import torch.nn as nn


def init_head(self, hidden_dim: int, num_of_classes: int):
    self.head = nn.Linear(in_features=hidden_dim, out_features=num_of_classes)

In [8]:
self = lambda: None
init_backbone(self, config)
# try_data = train_dataset.dataset[0][0]
try_data = next(iter(train_dataset))[0]
# try_data[0].shape
# self.backbone.cuda()(try_data).last_hidden_state.shape
# self.backbone.cuda()(try_data).pooler_output.shape
self.backbone.cuda()(try_data).shape

torch.Size([64, 128])

In [None]:
def ensure_array(x:torch.Tensor):
    return x.cpu().detach().numpy()


In [9]:
from sklearn.metrics import *

# from scipy.special import softmax
import torch.nn.functional as F
import numpy as np
import torch

def ensure_array(x:torch.Tensor):
    return x.cpu().detach().numpy()


def compute_classification_metrics(
    y_true: np.ndarray,  # 1d array-like, or label indicator array / sparse matrix
    y_pred_logits: np.ndarray,  # label indicator array / sparse matrix
    logits_to_prob: bool = False,  # function to convert logits to probabilities
):
    
    # print(type(y_pred_logits)) # <class 'numpy.ndarray'>
    # y_pred_probs = softmax(y_pred_logits)# label indicator array / sparse matrix
    y_pred_probs = (
        np.array(F.softmax(torch.Tensor(y_pred_logits), dim=1))
        if logits_to_prob
        else y_pred_logits
    )  # label indicator array / sparse matrix
    y_pred = np.argmax(y_pred_logits, axis=1)
    # target_names = labels # dataset['train'].features[label_column_name].names
    # report_dict = classification_report(y_true, y_pred_probs, target_names=target_names, output_dict=True)
    top_k_res = {
        f"acc{k}": top_k_accuracy_score(y_true, y_pred_probs, k=k)
        for k in [1, 2, 3, 5, 10, 20]
    }
    balance_res = dict(
        roc_auc=roc_auc_score(
            y_true, y_pred_probs, average="macro", multi_class="ovr"
        ),  # ovr更难一些，会不平衡
        matthews_corrcoef=matthews_corrcoef(y_true, y_pred),
        f1=f1_score(y_true, y_pred, average="macro"),
        precision=precision_score(y_true, y_pred, average="macro"),
        recall=recall_score(y_true, y_pred, average="macro"),
        log_loss=log_loss(
            y_true,
            y_pred_probs,
        ),
        balanced_accuracy=balanced_accuracy_score(y_true, y_pred),
        cohen_kappa=cohen_kappa_score(y_true, y_pred),
        hinge_loss=hinge_loss(y_true, y_pred_probs, labels=ids),
    )

    # return top_k_res| balance_res| report_dict
    return top_k_res | balance_res

In [10]:
#| export
import lightning as L
from overrides import override
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class ClassificationTask(L.LightningModule):
    init_data = init_data
    init_head = init_head
    init_backbone = init_backbone
    

    def __init__(self, config: ClassificationTaskConfig)->None:
        super().__init__()
        self.config = config
        init_env(config.experiment_index) # use index as the seed for reproducibility
        # init_data(self, config)
        self.init_data(config)
        self.init_backbone(config)
        self.init_head(self.hidden_dim, self.num_of_classes)
        # https://blog.csdn.net/qq_43391414/article/details/118421352 logsoftmax+nll的速度快，但是没有label smoothing
        self.model = nn.Sequential(self.backbone, self.head, nn.LogSoftmax(dim=1))
        # self.model = nn.Sequential(self.backbone, self.head, nn.Softmax(dim=1))
        self.forward = self.model.forward
    # @override
    # def forward(self, x):
        # out = self.backbone(x)
        # out = self.head(out)
        # return F.log_softmax(out, dim=1)
        # return self.model(x)
    def forward_loss(self, image_tensor, label_tensor):
        logits = self(image_tensor)
        # return F.nll_loss(logits, label_tensor)
        return F.cross_entropy(logits, label_tensor, label_smoothing=0.1)
    # @override
        
    def training_step(self, batch, batch_idx):
        loss = self.forward_loss(*batch)
        self.log("train_loss", loss)
        return loss
    
    def evaluate(self, batch, stage=None):
        loss = self.forward_loss(*batch)

    # @override
    # def 

In [11]:
task = ClassificationTask(config)
task.cuda()(try_data).shape
# import inspect
# inspect.getsource(task.init_data)
# task.init_data
# task.train_dataset

Seed set to 0


torch.Size([64, 100])

In [12]:
#| hide
import nbdev; nbdev.nbdev_export()