## Importing Modules

필요한 모듈을 Import 합니다.


In [2]:
# Modules About Hydra
from hydra import initialize, initialize_config_module, initialize_config_dir, compose
from hydra.utils import instantiate
from omegaconf import DictConfig, OmegaConf

# Modules About Torch, Numpy
import numpy as np
import torch
import torch.nn.functional as F
import torchmetrics
import torchvision
from torch import nn
from torch.utils.data import TensorDataset, DataLoader, random_split
from torchvision import datasets, transforms

# Modules About Pytorch Lightning
import pytorch_lightning as pl
from pytorch_lightning import LightningModule, LightningDataModule
from pytorch_lightning.loggers import TensorBoardLogger, WandbLogger
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping, ProgressBar

# Modules About Pandas, Matplotlib, Numpy
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Others
from PIL import Image
from typing import List, Any
import sys
import traceback
import yaml
import wandb
import warnings
warnings.filterwarnings("ignore", category=UserWarning)


  from .autonotebook import tqdm as notebook_tqdm


## Configure Dataset

Custom Dataset을 구성합니다.


In [3]:

class MNISTDataModule(pl.LightningDataModule):
    def __init__(self, data_dir, batch_size, pred_batch_size, train_ratio, pred_dataset=None):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.pred_batch_size = pred_batch_size
        self.train_ratio = train_ratio
        self.pred_dataset = pred_dataset

        # Define Transforms
        def repeat_channels(x):
            return x.repeat(3, 1, 1)

        self.transform = transforms.Compose([
            transforms.ToTensor()
            # ViT expects 224x224 images
            # transforms.Resize((224, 224), antialias=True),
            # transforms.Lambda(repeat_channels)  # ViT expects 3 channels
        ])

    def prepare_data(self):
        # Download MNIST Data
        datasets.MNIST(
            self.data_dir, train=True, download=True)
        datasets.MNIST(
            self.data_dir, train=False, download=True)

    def setup(self, stage=None):
        mnist_train = datasets.MNIST(
            self.data_dir, train=True, transform=self.transform)
        mnist_test = datasets.MNIST(
            self.data_dir, train=False, transform=self.transform)

        # Split Dataset

        self.train_dataset, self.val_dataset = random_split(
            mnist_train, list(map(lambda x: int(x * len(mnist_train)), [self.train_ratio, 1-self.train_ratio])))
        self.test_dataset = mnist_test

    # def _collate_fn(self, samples):
    #     이 함수를 사용할 경우
    #     DataLoader에 인자로 collate_fn=_collate_fn 를 추가해야합니다.
    #     pass

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

    def predict_dataloader(self):
        return DataLoader(self.pred_dataset, batch_size=self.pred_batch_size)

    def predict_instantly(self, x: List[Any], y: List[int]):
        to_tensor = torchvision.transforms.ToTensor()
        tensor_x = torch.stack([to_tensor(item) for item in x])
        tensor_y = torch.tensor(y)

        return tensor_x, tensor_y


## Design Model

Model 구조를 정의합니다.


In [4]:
class CNNModel(pl.LightningModule):
    def __init__(self, type):
        super().__init__()
        self.type = type
        self.save_hyperparameters("type")

        self.model_list = {"small": (32, 64), "large": (64, 128)}
        self.id2label = {i: i for i in range(10)}
        self.label2id = {i: i for i in range(10)}
        self.loss_func = nn.CrossEntropyLoss()
        self.model = nn.Sequential(
            # Convolutional layer 1
            nn.Conv2d(1, self.model_list[self.type][0],
                      kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Convolutional layer 2
            nn.Conv2d(self.model_list[self.type][0], self.model_list[self.type]
                      [1], kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Fully connected layers
            nn.Flatten(),

            nn.Linear(self.model_list[self.type][1] * 7 * 7, 128),
            nn.ReLU(),

            nn.Linear(128, 64),
            nn.ReLU(),

            nn.Linear(64, 10),  # assuming output has 10 classes
        )

    def forward(self, x, y):
        logits = self.model(x)
        loss = self.loss_func(logits, y)
        return loss, logits


## Task Model

Task 구조를 정의합니다.


In [5]:
class ClassificationTask(pl.LightningModule):
    def __init__(self, model, num_classes, optimizer, lr_scheduler=None):
        super().__init__()
        self.model = model
        self.num_classes = num_classes
        self.optimizer = optimizer
        self.lr_scheduler = lr_scheduler
        self.training_step_outputs = []
        self.validation_step_outputs = []
        self.save_hyperparameters("num_classes", "optimizer", "lr_scheduler")

    def forward(self, x, y):
        x, y = x.to(self.device), y.to(self.device)
        loss, logits = self.model(x, y)
        return loss, logits

    def training_step(self, batch, batch_idx):
        loss, acc = self._shared_step(batch)
        metrics = {"train_acc": acc, "train_loss": loss}
        self.training_step_outputs.append(metrics)
        self.log_dict(metrics, prog_bar=True)
        return loss

    # def on_train_epoch_end(self):
    #     pass

    def validation_step(self, batch, batch_idx):
        loss, acc = self._shared_step(batch)
        metrics = {"val_acc": acc, "val_loss": loss}
        self.validation_step_outputs.append(metrics)
        self.log_dict(metrics)

    def on_validation_epoch_end(self):
        if self.training_step_outputs:
            train_avg_loss = torch.stack([x["train_loss"]
                                          for x in self.training_step_outputs]).mean()
            train_avg_acc = torch.stack([x["train_acc"]
                                        for x in self.training_step_outputs]).mean()
            metrics = {"train_avg_acc": train_avg_acc,
                       "train_avg_loss": train_avg_loss}
            self.log_dict(metrics)
        else:
            return
        if self.validation_step_outputs:
            val_avg_loss = torch.stack([x["val_loss"]
                                        for x in self.validation_step_outputs]).mean()
            val_avg_acc = torch.stack([x["val_acc"]
                                       for x in self.validation_step_outputs]).mean()
            metrics = {"val_avg_acc": val_avg_acc,
                       "val_avg_loss": val_avg_loss}
            self.log_dict(metrics)
        else:
            return
        print("\n" +
              (f'Epoch {self.current_epoch}, Avg. Training Loss: {train_avg_loss:.3f}, Avg. Training Accuracy: {train_avg_acc:.3f} ' +
               f'Avg. Validation Loss: {val_avg_loss:.3f}, Avg. Validation Accuracy: {val_avg_acc:.3f}'), flush=True)
        self.training_step_outputs.clear()
        self.validation_step_outputs.clear()

    def test_step(self, batch, batch_idx):
        loss, acc = self._shared_step(batch)
        metrics = {"test_acc": acc, "test_loss": loss}
        self.log_dict(metrics, prog_bar=True)

    def _shared_step(self, batch):
        x, y = batch
        loss, logits = self.model(x, y)
        acc_fn = torchmetrics.classification.MulticlassAccuracy(
            num_classes=self.num_classes).to(self.device)
        acc = acc_fn(logits, y)
        return loss, acc

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        x, y = batch
        loss, logits = self.model(x, y)
        return loss, logits

    def configure_optimizers(self):
        optimizer = self.optimizer
        if self.lr_scheduler is not None:
            return [optimizer], [self.lr_scheduler]
        else:
            return optimizer

        # return torch.optim.AdamW(self.model.parameters(), lr=2e-5)

In [6]:
with initialize(version_base=None, config_path="./"):
    cfg = compose(config_name="config.yaml")
if "batch_size" in cfg.data:
    print("⭐️")
else:
    print("🔥")

⭐️


## Model Training

Model Training을 수행합니다.


In [None]:
def generate_train_func(cfg):
    def find_key(cfg, query, new_value):
        for key, value in cfg.items():
            if key == query:
                cfg[key] = new_value
                return True
            elif isinstance(value, DictConfig):
                if find_key(value, query, new_value):
                    return True
        return False

    def train():
        try:
            # Set Constant
            CHECKPOINT_PATH = "./checkpoints"
            CONFIGS_PATH = "./configs"

            # Get Global Version Info
            with open("global.yaml", "r") as f:
                global_data = yaml.safe_load(f)
            version_count = global_data["next_version_count"]
            sweep_count = global_data["next_sweep_count"]

            # Initalize Wandb
            if "name" in cfg.train:
                name = cfg.train.name + f"_s{sweep_count}"
            else:
                name = f"v{version_count}_s{sweep_count}"
            wandb.init(name=name)

            # Get config.yaml file
            with open("config.yaml", "r") as f:
                cfg_data = yaml.safe_load(f)

            # Save Version Config Info On Configs Folder
            if sweep_count == 0:
                with open(f"{CONFIGS_PATH}/version_{version_count}_config.yaml",
                          "w") as f:
                    yaml.dump(cfg_data, f)

            # Set Sweeping Setting
            for key, item in wandb.config.items():
                if not find_key(cfg, key, item):
                    print(
                        f"key: {key} in your sweeping configuration was not found in your configuration")

            # Load Data Module
            data_module = MNISTDataModule(
                **cfg.data)

            # Load Training Configuration
            models = [instantiate(cfg.models[model])
                      for model in dir(cfg.models)]

            # Add Callbacks
            cfg_callbacks = cfg.train.callbacks
            callbacks = []
            checkpoint_callback = ModelCheckpoint(**cfg_callbacks.checkpoint_callback,
                                                  dirpath=f"{CHECKPOINT_PATH}/v{version_count}_s{sweep_count}/"
                                                  )
            callbacks.append(checkpoint_callback)

            early_stop_callback = EarlyStopping(
                **cfg_callbacks.early_stop_callback)
            callbacks.append(early_stop_callback)

            # Set Logger
            logger = instantiate(
                cfg.train.logger, name=f"version_{version_count}")

            # Train
            for model in models:
                # Set Optimizer
                optimizer = instantiate(
                    cfg.task.optimizer, params=model.parameters())

                # Set Lr Scheduler If exists
                if cfg.task.lr_scheduler.scheduler._target_ is not None:
                    lr_scheduler = {}
                    lr_scheduler["scheduler"] = instantiate(
                        cfg.task.lr_scheduler.scheduler, optimizer=optimizer)
                    lr_scheduler["interval"] = cfg.task.lr_scheduler.interval
                else:
                    lr_scheduler = None

                # Define Task
                cfg_task = OmegaConf.to_container(cfg.task)
                cfg_task.pop("optimizer")
                if cfg_task["lr_scheduler"]["scheduler"]["_target_"] is not None:
                    cfg_task.pop("lr_scheduler")
                task = ClassificationTask(**cfg_task,
                                          model=model, optimizer=optimizer, lr_scheduler=lr_scheduler)

                # Train and Test
                trainer = pl.Trainer(**cfg.train.trainer,
                                     callbacks=callbacks, logger=logger)
                trainer.fit(task, data_module)
                trainer.test(task, datamodule=data_module)
                trainer.save_checkpoint(f"{CHECKPOINT_PATH}/best_model.ckpt")

            # Save Version Config Info On Checkpoints Folder
            with open(f"{CHECKPOINT_PATH}/v{version_count}_s{sweep_count}/version_config.yaml",
                      "w") as f:
                yaml.dump(cfg_data, f)

            # Set Sweep Info
            global_data["next_sweep_count"] += 1
            with open("global.yaml", "w") as f:
                yaml.dump(global_data, f)

            # Finish wandb
            if cfg.train.logger._target_ == "pytorch_lightning.loggers.WandbLogger":
                wandb.finish()
        except Exception:
            # Finish wandb
            if cfg.train.logger._target_ == "pytorch_lightning.loggers.WandbLogger":
                wandb.finish()

            print("An error occurred:")
            print(traceback.format_exc())
            return
    return train


# Load Configuration Object
with initialize(version_base=None, config_path="./"):
    cfg = compose(config_name="config.yaml")

# Get Sweep ID
sweep_id = wandb.sweep(OmegaConf.to_container(
    cfg.sweep), project="sweep-test")

# Apply Sweeping
wandb.agent(sweep_id, function=generate_train_func(cfg))

# Update Version / Sweep Info
with open("global.yaml", "r") as f:
    global_data = yaml.safe_load(f)
global_data["next_version_count"] += 1
global_data["next_sweep_count"] = 0
with open("global.yaml", "w") as f:
    yaml.dump(global_data, f)


Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.


Create sweep with ID: cup63rjf
Sweep URL: https://wandb.ai/suwon-pabby/sweep-test/sweeps/cup63rjf


[34m[1mwandb[0m: Agent Starting Run: 7eej7e49 with config:
[34m[1mwandb[0m: 	batch_size: 128
[34m[1mwandb[0m: 	type: large
Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33msuwon-pabby[0m. Use [1m`wandb login --relogin`[0m to force relogin


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name  | Type     | Params
-----------------------------------
0 | model | CNNModel | 886 K 
-----------------------------------
886 K     Trainable params
0         Non-trainable params
886 K     Total params
3.545     Total estimated model params size (MB)


Epoch 0: 100%|██████████| 282/282 [00:10<00:00, 25.77it/s, v_num=7e49, train_acc=0.618, train_loss=1.360]

[34m[1mwandb[0m: Ctrl + C detected. Stopping sweep.



Epoch 0, Avg. Training Loss: 1.884, Avg. Training Accuracy: 0.531 Avg. Validation Loss: 1.187, Avg. Validation Accuracy: 0.733
Epoch 1:  42%|████▏     | 118/282 [00:04<00:06, 24.50it/s, v_num=7e49, train_acc=0.768, train_loss=0.786]

In [None]:
wandb.finish()

In [None]:
# # 만일 이전 결과에서 Epoch을 이어서 실행하고 싶다면?
# model = CNNModel(config=config)  # 기존에 학습 때 사용한 모델
# task = ClassificationTask.load_from_checkpoint(
#     "test_checkpoints/checkpoints/last.ckpt", model=model)  # 기존 최신 모델에서 체크포인트를 가져옴
# trainer = pl.Trainer(max_epochs=30, callbacks=callbacks)
# trainer.fit(task, data_module)


## Model Prediction

직접 Model Prediction을 수행하여 모델이 제대로 동작하는지 검증합니다.


In [None]:
data_module = MNISTDataModule()
# MNIST 테스트 데이터셋 로드
predict_dataset = datasets.MNIST(
    root='./', train=False, download=True)

# 랜덤 이미지 선택
random_idx = torch.randint(len(predict_dataset), size=(1,)).item()
image, true_label = predict_dataset[random_idx]

# 이미지 확인 (optional)
transform = torchvision.transforms.ToTensor()
image_tensor = transform(image)
plt.imshow(image_tensor[0].squeeze(), cmap='gray')
plt.show()


# 모델 생성 및 학습된 가중치 로드
model_config = {}
model = ClassificationTask.load_from_checkpoint(
    "test_checkpoints/best_model.ckpt", model=CNNModel(config=model_config))

model.eval()
with torch.no_grad():
    x, y = data_module.predict_instantly([image], [true_label])
    loss, logits = model(x, y)

# Predict Data를 원하는 DataLoader로 직접 만들어서 predict를 수행하고자 할 경우
# data_module.predicted_dataloader_attr = your_dataloader
# trainer = pl.Trainer()
# loss, logits = trainer.predict(model, datamodule=data_module)


# 가장 높은 확률을 가진 클래스 예측
_, predicted_class = torch.max(logits, dim=1)

print(f'True label: {true_label}, Predicted label: {predicted_class.item()}')