<https://github.com/PyTorchLightning/pytorch-lightning/blob/master/pl_examples/loop_examples/kfold.py>

## Import Lib

In [1]:
import os.path as osp
from abc import ABC, abstractmethod
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type

import torch
import torchvision.transforms as T
from sklearn.model_selection import KFold
from torch.nn import functional as F
from torch.utils.data import random_split
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.dataset import Dataset, Subset
from torchmetrics.classification.accuracy import Accuracy

from pl_examples import _DATASETS_PATH
from pl_examples.basic_examples.mnist_datamodule import MNIST
from pl_examples.basic_examples.mnist_examples.image_classifier_4_lightning_module import ImageClassifier
from pytorch_lightning import LightningDataModule, seed_everything, Trainer
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.loops.base import Loop
from pytorch_lightning.loops.fit_loop import FitLoop
from pytorch_lightning.trainer.states import TrainerFn

ModuleNotFoundError: No module named 'pl_examples'

## Step1

KFold DataModule は `setup_folds` と `setup_fold_index` メソッドを実装する必要があります。

In [None]:
class BaseKFoldDataModule(LightningDataModule, ABC):
    # 抽象クラスで、必要なメソッドを定義する
    @abstractmethod
    def setup_folds(self, num_folds: int) -> None:
        pass

    @abstractmethod
    def setup_fold_index(self, fold_index: int) -> None:
        pass

## Step2

`KFoldDataModule` は、訓練データとテストデータセットを受け取る。
`setup_folds` では、与えられた引数 `num_folds` に応じてフォルドが生成される。 `setup_fold_index` では、与えられた訓練データセットが現在のフォルド分割に応じて分割される。

In [None]:
@dataclass  # __init__や__str__等の特殊メソッドを自動生成する
class MNISTKFoldDataModule(BaseKFoldDataModule):

    train_dataset: Optional[Dataset] = None
    test_dataset: Optional[Dataset] = None
    train_fold: Optional[Dataset] = None
    val_fold: Optional[Dataset] = None

    def prepare_data(self) -> None:
        # データセットの準備を行う
        # この関数はマルチ GPU 環境では、1つの GPU でしか呼ばれないため、状態を変更するような処理は行ってはいけない。
        MNIST(_DATASETS_PATH, transform=T.Compose([T.ToTensor(), T.Normalize(mean=(0.5,), std=(0.5,))]))

    def setup(self, stage: Optional[str] = None) -> None:
        # データを読み込んでデータセットを作成するなどの処理を行います。
        # どの段階のデータセットを準備する必要があるのかが引数 stage で渡される
        # 今回は、train_datasetとtest_datasetを分解する
        dataset = MNIST(_DATASETS_PATH, transform=T.Compose([T.ToTensor(), T.Normalize(mean=(0.5,), std=(0.5,))]))
        self.train_dataset, self.test_dataset = random_split(dataset, [50000, 10000])

    def setup_folds(self, num_folds: int) -> None:
        # 必須のメソッドです。
        self.num_folds = num_folds
        # [0, ..., train_datasetのレコード数-1]をKFoldして得られた
        # KFold用のindexを、splitsに格納する
        self.splits = [split for split in KFold(num_folds).split(range(len(self.train_dataset)))]

    def setup_fold_index(self, fold_index: int) -> None:
        # 必須のメソッドです。
        # あらかじめ計算されたsplitsに従い、データを分割したものを
        # train_foldとval_foldに返す
        train_indices, val_indices = self.splits[fold_index]
        self.train_fold = Subset(self.train_dataset, train_indices)
        self.val_fold = Subset(self.train_dataset, val_indices)

    def train_dataloader(self) -> DataLoader:
        return DataLoader(self.train_fold)

    def val_dataloader(self) -> DataLoader:
        return DataLoader(self.val_fold)

    def test_dataloader(self) -> DataLoader:
        return DataLoader(self.test_dataset)

    def __post_init__(cls):
        super().__init__()

## Step3

`EnsembleVotingModel` は、カスタム LightningModule と複数の checkpoint_path を受け取ります。

In [None]:
class EnsembleVotingModel(LightningModule):
    def __init__(self, model_cls: Type[LightningModule], checkpoint_paths: List[str]) -> None:
        super().__init__()
        # `num_folds` モデルとそれに関連するフォールドの重みを作成する。
        self.models = torch.nn.ModuleList([model_cls.load_from_checkpoint(p) for p in checkpoint_paths])
        self.test_acc = Accuracy()

    def test_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> None:
        # `num_folds` 個のモデルについて、平均化された予測値を計算する。
        logits = torch.stack([m(batch[0]) for m in self.models]).mean(0)
        loss = F.nll_loss(logits, batch[1])
        self.test_acc(logits, batch[1])
        self.log("test_acc", self.test_acc)
        self.log("test_loss", loss)

## Step 4

Lightning v1.5から、独自のループを実装することが可能になりました。そのためにはいくつかのステップがあり、詳しくはドキュメントに記載されています。
https://pytorch-lightning.readthedocs.io/en/latest/extensions/loops.html
ここでは、外側のfit_loopを実装します。つまり、ベースループのサブクラスを実装して、現在のトレーナー `fit_loop` をラップします。

In [None]:
class KFoldLoop(Loop):
    def __init__(self, num_folds: int, export_path: str) -> None:
        super().__init__()
        self.num_folds = num_folds
        self.current_fold: int = 0
        self.export_path = export_path

    @property
    def done(self) -> bool:
        return self.current_fold >= self.num_folds

    def connect(self, fit_loop: FitLoop) -> None:
        self.fit_loop = fit_loop

    def reset(self) -> None:
        """Nothing to reset in this loop."""

    def on_run_start(self, *args: Any, **kwargs: Any) -> None:
        """Used to call `setup_folds` from the `BaseKFoldDataModule` instance and store the original weights of the
        model."""
        assert isinstance(self.trainer.datamodule, BaseKFoldDataModule)
        self.trainer.datamodule.setup_folds(self.num_folds)
        self.lightning_module_state_dict = deepcopy(self.trainer.lightning_module.state_dict())

    def on_advance_start(self, *args: Any, **kwargs: Any) -> None:
        """Used to call `setup_fold_index` from the `BaseKFoldDataModule` instance."""
        print(f"STARTING FOLD {self.current_fold}")
        assert isinstance(self.trainer.datamodule, BaseKFoldDataModule)
        self.trainer.datamodule.setup_fold_index(self.current_fold)

    def advance(self, *args: Any, **kwargs: Any) -> None:
        """Used to the run a fitting and testing on the current hold."""
        self._reset_fitting()  # requires to reset the tracking stage.
        self.fit_loop.run()

        self._reset_testing()  # requires to reset the tracking stage.
        self.trainer.test_loop.run()
        self.current_fold += 1  # increment fold tracking number.

    def on_advance_end(self) -> None:
        """Used to save the weights of the current fold and reset the LightningModule and its optimizers."""
        self.trainer.save_checkpoint(osp.join(self.export_path, f"model.{self.current_fold}.pt"))
        # restore the original weights + optimizers and schedulers.
        self.trainer.lightning_module.load_state_dict(self.lightning_module_state_dict)
        self.trainer.strategy.setup_optimizers(self.trainer)
        self.replace(fit_loop=FitLoop)

    def on_run_end(self) -> None:
        """Used to compute the performance of the ensemble model on the test set."""
        checkpoint_paths = [osp.join(self.export_path, f"model.{f_idx + 1}.pt") for f_idx in range(self.num_folds)]
        voting_model = EnsembleVotingModel(type(self.trainer.lightning_module), checkpoint_paths)
        voting_model.trainer = self.trainer
        # This requires to connect the new model and move it the right device.
        self.trainer.strategy.connect(voting_model)
        self.trainer.strategy.model_to_device()
        self.trainer.test_loop.run()

    def on_save_checkpoint(self) -> Dict[str, int]:
        return {"current_fold": self.current_fold}

    def on_load_checkpoint(self, state_dict: Dict) -> None:
        self.current_fold = state_dict["current_fold"]

    def _reset_fitting(self) -> None:
        self.trainer.reset_train_dataloader()
        self.trainer.reset_val_dataloader()
        self.trainer.state.fn = TrainerFn.FITTING
        self.trainer.training = True

    def _reset_testing(self) -> None:
        self.trainer.reset_test_dataloader()
        self.trainer.state.fn = TrainerFn.TESTING
        self.trainer.testing = True

    def __getattr__(self, key) -> Any:
        # requires to be overridden as attributes of the wrapped loop are being accessed.
        if key not in self.__dict__:
            return getattr(self.fit_loop, key)
        return self.__dict__[key]

    def __setstate__(self, state: Dict[str, Any]) -> None:
        self.__dict__.update(state)


class LitImageClassifier(ImageClassifier):
    def __init__(self) -> None:
        super().__init__()
        self.val_acc = Accuracy()

    def validation_step(self, batch: Any, batch_idx: int) -> None:
        x, y = batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y.long())
        self.val_acc(logits, y)
        self.log("val_acc", self.val_acc)
        self.log("val_loss", loss)

## Step5

KFoldDataModule` とモデルを作成した後、`KFoldLoop` を Trainer に接続します。
最後に、`trainer.fit` を使ってクロスバリデーションの学習を開始する。

In [None]:
seed_everything(42)
model = LitImageClassifier()
datamodule = MNISTKFoldDataModule()
trainer = Trainer(
    max_epochs=10,
    limit_train_batches=2,
    limit_val_batches=2,
    limit_test_batches=2,
    num_sanity_val_steps=0,
    devices=2,
    accelerator="auto",
    strategy="ddp",
)
internal_fit_loop = trainer.fit_loop
trainer.fit_loop = KFoldLoop(5, export_path="./")
trainer.fit_loop.connect(internal_fit_loop)
trainer.fit(model, datamodule)