In [None]:
import numpy as np
import pandas as pd

import os

from sklearn.model_selection import train_test_split

import cv2

from torch.utils.data import Dataset, DataLoader
import torch
from torch import optim
from torch import flatten
from torch.nn import Module, Conv2d, Linear, MaxPool2d, ReLU, Flatten, Sequential
from torch.nn.functional import cross_entropy

from torchvision.transforms import ToTensor

from torchinfo import summary

import lightning as pl
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

In [None]:
def create_metadata():
    metadata = pd.DataFrame(columns=["image_name", "path", "label"])
    for dirname, _, filenames in os.walk('/kaggle/input'):
        for filename in filenames:
            data = [filename, os.path.join(dirname, filename), 0 if os.path.basename(os.path.normpath(dirname)).strip() == "no" else 1]
            metadata.loc[len(metadata)] = data
    return metadata

In [None]:
class BrainMRIDataset(Dataset):
    def __init__(self, metadata=None, image_size=None):
        super().__init__()
        if metadata:
            self.metadata = pd.read_csv(csv_file)
        else:
            self.metadata = create_metadata()
        self.image_size = image_size

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        image_path = self.metadata["path"][idx]
        image_label = self.metadata["label"][idx]
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        if self.image_size:
            image = cv2.resize(image, (self.image_size, self.image_size))
        image = np.asarray(image)
        image = ToTensor()(image[ :, :, np.newaxis])
        
        return {
            "image": image,
            "label": image_label
        }

In [None]:
class Model(pl.LightningModule):
    
    def __init__(self, num_channels=1, num_classes=2):
        super().__init__()
        
        self.model = Sequential(
            Conv2d(in_channels=num_channels, out_channels=16, kernel_size=(3, 3), padding='same'),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2)),
            
            Conv2d(in_channels=16, out_channels=16, kernel_size=(3, 3), padding='same'),
            ReLU(),
            MaxPool2d(kernel_size=(2, 2)),
            Flatten(),
            Linear(in_features=65536, out_features=128),
            ReLU(),
            Linear(in_features=128, out_features=num_classes)
        )
    
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx):
        image_batch, label_batch = batch["image"], batch["label"]
        predict = self.model(image_batch)
        train_loss = cross_entropy(predict, label_batch)
        self.log("train_loss", train_loss)
        return train_loss
    
    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer
    
    def validation_step(self, batch, batch_idx):
        image_batch, label_batch = batch["image"], batch["label"]
        predict = self.model(image_batch)
        val_loss = cross_entropy(predict, label_batch)
        self.log("val_loss", val_loss)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        image_batch, label_batch = batch["image"], batch["label"]
        predict = self.model(image_batch)
        test_loss = cross_entropy(predict, label_batch)
        self.log("test_loss", test_loss)
        return test_loss

In [None]:
def train_val_test_split(dataset):
    
    label_list = [dataset[idx]["label"] for idx in range(len(dataset))]
    train_idx, valid_and_test_idx = train_test_split(range(len(dataset)), test_size=0.4, stratify=label_list)
    
    valid_and_test_label = [label_list[idx] for idx in valid_and_test_idx]
    val_idx, test_idx = train_test_split(valid_and_test_idx, test_size=0.5, stratify=valid_and_test_label)
    
    return train_idx, val_idx, test_idx

In [None]:
def prepare_dataloader(dataset, batch_size=64):
    
    train_idx, val_idx, test_idx = train_val_test_split(dataset)
    
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_idx, drop_last=True)
    val_loader = DataLoader(dataset, batch_size=batch_size, sampler=val_idx, drop_last=True)
    test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_idx, drop_last=True)
    
    return train_loader, val_loader, test_loader

In [None]:
BATCH_SIZE = 64

In [None]:
dataset = BrainMRIDataset(image_size=256)

In [None]:
train_dataloader, val_dataloader, test_dataloader = prepare_dataloader(dataset, BATCH_SIZE)

In [None]:
trainer = pl.Trainer(logger=TensorBoardLogger(save_dir="logs/"), callbacks=[EarlyStopping(monitor="val_loss", mode="min")])
model = Model()

In [None]:
trainer.fit(model, train_dataloader, val_dataloader)

In [None]:
PATH_TO_CHAEKPOINT = "/kaggle/working/logs/lightning_logs/version_0/checkpoints/epoch=12-step=52.ckpt"
model = Model.load_from_checkpoint(PATH_TO_CHAEKPOINT)
trainer.test(model, test_dataloader)