In [40]:
import sys
#sys.path.insert(0, "../..")

from typing import Callable, Protocol, Dict, Optional, Iterator, List, Tuple

import gin
from pathlib import Path
import numpy as np
import math
from datetime import datetime

import torch
from torch import nn
import torch.optim as optim
from torch.optim import Optimizer
from torchinfo import summary
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

#from src.data import make_dataset
#from src.models import metrics
#from src.models import train_model # om de functie train_loop binnen te halen

# ------------------------------------------------------------------------------- #
## Alle imports benodigd voor de functie train_loop uit train_model.py 

import tensorflow as tf  # noqa: F401

# needed to make summarywriter load without error
from loguru import logger
from numpy import Inf
from ray import tune


from torch.utils.tensorboard import SummaryWriter
from torchvision.utils import make_grid
from tqdm import tqdm

#from src.models.metrics import Metric
#from src.typehinting import GenericModel
#from src.data import data_tools

In [2]:
## benodigde functie uitmake_dataset.py

#@gin.configurable
def get_MNIST(  # noqa: N802
    data_dir: Path, batch_size: int
) -> Tuple[DataLoader, DataLoader]:

    training_data = datasets.FashionMNIST(
        root=data_dir,
        train=True,
        download=True,
        transform=ToTensor(),
    )

    test_data = datasets.FashionMNIST(
        root=data_dir,
        train=False,
        download=True,
        transform=ToTensor(),
    )

    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

    return train_dataloader, test_dataloader

In [3]:
## Benodigde funties uit metrics.py

Tensor = torch.Tensor

class Metric:
    def __repr__(self) -> str:
        raise NotImplementedError

    def __call__(self, y: Tensor, yhat: Tensor) -> Tensor:
        raise NotImplementedError

class Accuracy(Metric):
    def __repr__(self) -> str:
        return "Accuracy"

    def __call__(self, y: Tensor, yhat: Tensor) -> Tensor:
        return (yhat.argmax(dim=1) == y).sum() / len(yhat)


In [4]:
class GenericModel(Protocol):
    train: Callable
    eval: Callable
    parameters: Callable

    def __call__(self, *args, **kwargs) -> torch.Tensor:
        pass


In [8]:
def count_parameters(model: GenericModel) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [42]:
## uit data_tools.py

def dir_add_timestamp(log_dir: Optional[Path] = None) -> Path:
    if log_dir is None:
        log_dir = Path(".")
    log_dir = Path(log_dir)
    timestamp = datetime.now().strftime("%Y%m%d-%H%M")
    log_dir = log_dir / timestamp
    logger.info(f"Logging to {log_dir}")
    if not log_dir.exists():
        log_dir.mkdir(parents=True)
    return log_dir

In [44]:
## Definieren functie train_loop

#@gin.configurable
def trainloop(
    epochs: int,
    model: GenericModel,
    optimizer: torch.optim.Optimizer,
    learning_rate: float,
    loss_fn: Callable,
    metrics: List[Metric],
    train_dataloader: Iterator,
    test_dataloader: Iterator,
    log_dir: Path,
    train_steps: int,
    eval_steps: int,
    patience: int = 10,
    factor: float = 0.9,
    tunewriter: bool = False,
) -> GenericModel:
    
    optimizer_: torch.optim.Optimizer = optimizer(
        model.parameters(), lr=learning_rate
    )  # type: ignore

    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer_,
        factor=factor,
        patience=patience,
    )

    if not tunewriter:
        log_dir = dir_add_timestamp(log_dir)
        writer = SummaryWriter(log_dir=log_dir)
        #write_gin(log_dir, gin.config_str())

        images, _ = next(iter(train_dataloader))
        if len(images.shape) == 4:
            grid = make_grid(images)
            writer.add_image("images", grid)
        writer.add_graph(model, images)

    for epoch in tqdm(range(epochs)):
        train_loss = trainbatches(
            model, train_dataloader, loss_fn, optimizer_, train_steps
        )

        metric_dict, test_loss = evalbatches(
            model, test_dataloader, loss_fn, metrics, eval_steps
        )

        scheduler.step(test_loss)

        if tunewriter:
            tune.report(
                iterations=epoch,
                train_loss=train_loss,
                test_loss=test_loss,
                **metric_dict,
            )
        else:
            writer.add_scalar("Loss/train", train_loss, epoch)
            writer.add_scalar("Loss/test", test_loss, epoch)
            for m in metric_dict:
                writer.add_scalar(f"metric/{m}", metric_dict[m], epoch)
            lr = [group["lr"] for group in optimizer_.param_groups][0]
            writer.add_scalar("learning_rate", lr, epoch)
            metric_scores = [f"{v:.4f}" for v in metric_dict.values()]
            logger.info(
                f"Epoch {epoch} train {train_loss:.4f} test {test_loss:.4f} metric {metric_scores}"  # noqa E501
            )

    return model

In [46]:
## uit train_model.py

def trainbatches(
    model: GenericModel,
    traindatastreamer: Iterator,
    loss_fn: Callable,
    optimizer: torch.optim.Optimizer,
    train_steps: int,
) -> float:
    model.train()
    train_loss: float = 0.0
    for _ in tqdm(range(train_steps)):
        x, y = next(iter(traindatastreamer))
        optimizer.zero_grad()
        yhat = model(x)
        loss = loss_fn(yhat, y)
        loss.backward()
        optimizer.step()
        train_loss += loss.detach().numpy()
    train_loss /= train_steps
    return train_loss

In [48]:
## uit train_model.py

def evalbatches(
    model: GenericModel,
    testdatastreamer: Iterator,
    loss_fn: Callable,
    metrics: List[Metric],
    eval_steps: int,
) -> Tuple[Dict[str, float], float]:
    model.eval()
    test_loss: float = 0.0
    metric_dict: Dict[str, float] = {}
    for _ in range(eval_steps):
        x, y = next(iter(testdatastreamer))
        yhat = model(x)
        test_loss += loss_fn(yhat, y).detach().numpy()
        for m in metrics:
            metric_dict[str(m)] = (
                metric_dict.get(str(m), 0.0) + m(y, yhat).detach().numpy()
            )

    test_loss /= eval_steps
    for key in metric_dict:
        metric_dict[str(key)] = metric_dict[str(key)] / eval_steps
    return metric_dict, test_loss


In [None]:
#gin.parse_config_file("model.gin")

In [None]:
#sys.path

In [12]:
# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")


Using cpu device


In [52]:
## Model 1: CNN_een met een convolutional layer

#@gin.configurable
class CNN_een(nn.Module):
    def __init__(
        self, num_classes: int, kernel_size: int, filter1: int, filter2: int
    ) -> None:
        super().__init__()

        self.convolutions = nn.Sequential(
            nn.Conv2d(1, filter1, kernel_size=kernel_size, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.dense = nn.Sequential(
            nn.Flatten(),
            nn.Linear(6272, 3136),
            nn.ReLU(),
            nn.Linear(3136, 1568),
            nn.ReLU(),
            nn.Linear(1568, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.convolutions(x)
        logits = self.dense(x)
        return logits

# a = aantal classes, b = kernel_size, c = grootte filter 1, d = grootte  filter2
model_een = CNN_een(10, 3, 32, 32).to(device)

In [57]:
## Model 2: CNN_twee met twee convolutional layers

#@gin.configurable
class CNN_twee(nn.Module):
    def __init__(
        self, num_classes: int, kernel_size: int, filter1: int, filter2: int
    ) -> None:
        super().__init__()

        self.convolutions = nn.Sequential(
            nn.Conv2d(1, filter1, kernel_size=kernel_size, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(filter1, filter2, kernel_size=kernel_size, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            )

        self.dense = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1152, 576),
            nn.ReLU(),
            nn.Linear(576, 288),
            nn.ReLU(),
            nn.Linear(288, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.convolutions(x)
        logits = self.dense(x)
        return logits

# a = aantal classes, b = kernel_size, c = grootte filter 1, d = grootte  filter2
model_twee = CNN_twee(10, 3, 32, 32).to(device)

In [59]:
## Model 3: CNN_drie met drie convolutional layers

#@gin.configurable
class CNN_drie(nn.Module):
    def __init__(
        self, num_classes: int, kernel_size: int, filter1: int, filter2: int
    ) -> None:
        super().__init__()

        self.convolutions = nn.Sequential(
            nn.Conv2d(1, filter1, kernel_size=kernel_size, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(filter1, filter2, kernel_size=kernel_size, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(filter1, filter2, kernel_size=kernel_size, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            )

        self.dense = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.convolutions(x)
        logits = self.dense(x)
        return logits

# a = aantal classes, b = kernel_size, c = grootte filter 1, d = grootte  filter2
model_drie = CNN_drie(10, 3, 32, 32).to(device)

In [None]:
# summary(model_een, input_size=(64, 1, 28, 28))
# summary(model_twee, input_size=(64, 1, 28, 28))
# summary(model_drie, input_size=(64, 1, 28, 28))

In [None]:
gin.parse_config_file("model.gin")

In [25]:
datadir =  Path("/home/admindme/code/ML22_opdracht1/data/raw/FashionMNIST")
train_dataloader, test_dataloader = get_MNIST(datadir, 64)
len(train_dataloader), len(test_dataloader)

(938, 157)

In [60]:
#model = model_een
#model = model_twee
model = model_drie

loss_fn = torch.nn.CrossEntropyLoss()
accuracy = Accuracy()

model = trainloop(
            epochs=3,
            model=model,
            optimizer=torch.optim.Adam,
            learning_rate= 0.001,
            loss_fn=loss_fn,
            metrics=[accuracy],
            train_dataloader=train_dataloader,
            test_dataloader=test_dataloader,
            log_dir="../../models/test/",
            train_steps=len(train_dataloader),
            eval_steps=150,
            #patience= ,
            #factor= ,
            #tunewriter= ,
        )

2022-12-17 15:15:28.666 | INFO     | __main__:dir_add_timestamp:9 - Logging to ../../models/test/20221217-1515
100%|██████████| 938/938 [00:23<00:00, 39.56it/s]
2022-12-17 15:15:54.798 | INFO     | __main__:trainloop:68 - Epoch 0 train 0.7282 test 0.5636 metric ['0.7844']
100%|██████████| 938/938 [00:23<00:00, 39.50it/s]
2022-12-17 15:16:20.763 | INFO     | __main__:trainloop:68 - Epoch 1 train 0.4477 test 0.4196 metric ['0.8493']
100%|██████████| 938/938 [00:23<00:00, 39.44it/s]
2022-12-17 15:16:46.824 | INFO     | __main__:trainloop:68 - Epoch 2 train 0.3758 test 0.4247 metric ['0.8449']
100%|██████████| 3/3 [01:17<00:00, 26.00s/it]
