In [None]:
!pip install torchmetrics torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu116

In [77]:
import torch
from timeit import default_timer as timer

def train_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, optimizer: torch.optim, accuracy_fn, loss_fn: torch.nn.Module, device="cpu"):
  train_loss_cum = 0
  train_accuracy_cum = 0
  train_start_time = timer()
  model.train()
  for train_batch_number, (train_features, train_labels) in enumerate(data_loader):
    train_features = train_features.to(device)
    train_labels = train_labels.to(device)

    train_pred_labels = model(train_features)

    train_loss = loss_fn(train_pred_labels, train_labels)

    train_loss_cum += train_loss.item()
    train_accuracy_cum += accuracy_fn(train_pred_labels.argmax(dim=1), train_labels)

    optimizer.zero_grad()

    train_loss.backward()

    optimizer.step()

    if train_batch_number % 500 == 0:
      print(f"Looked at {train_batch_number * len(train_features)}/{len(data_loader) * len(train_features)} samples")

  train_loss = train_loss_cum / len(data_loader)
  train_accuracy = train_accuracy_cum / len(data_loader)
  train_end_time = timer()
  train_total_time = train_end_time - train_start_time
  return {
      "train_loss": train_loss,
      "train_accuracy": train_accuracy,
      "train_total_time": train_total_time
  }

In [72]:
import torch
from timeit import default_timer as timer

def test_step(model: torch.nn.Module, data_loader: torch.utils.data.DataLoader, accuracy_fn, loss_fn: torch.nn.Module, device="cpu"):
  test_loss_cum = 0
  test_accuracy_cum = 0
  test_start_time = timer()
  model.eval()
  with torch.inference_mode():
    for test_batch_number, (test_features, test_labels) in enumerate(data_loader):
      test_features = test_features.to(device)
      test_labels = test_labels.to(device)
      test_pred_labels = model(test_features)
      test_loss_cum += loss_fn(test_pred_labels, test_labels)
      test_accuracy_cum += accuracy_fn(test_pred_labels.argmax(dim=1), test_labels)

  test_loss = test_loss_cum / len(data_loader)
  test_accuracy = test_accuracy_cum / len(data_loader)
  test_end_time = timer()
  test_total_time = test_end_time - test_start_time
  return {
      "test_total_time": test_total_time,
      "test_loss": test_loss,
      "test_accuracy": test_accuracy
  }

In [80]:
## Creating a training loop and training model in batches of data rather than epoch
from tqdm.auto import tqdm
from timeit import default_timer as timer
import torch
from torchmetrics import Accuracy
from torch.utils.data import DataLoader

def train_test_loop(model: torch.nn.Module, train_data: torch.utils.data.Dataset, test_data: torch.utils.data.Dataset, random_state: int=42, device: str="cpu", lr: float=0.01, batch_size: int=32, epochs: int=10):

  loss_fn = torch.nn.CrossEntropyLoss()

  optimizer = torch.optim.SGD(params=model.parameters(), lr=lr)

  accuracy_fn = Accuracy(task="multiclass", num_classes=len(train_data.classes)).to(device)

  torch.manual_seed(random_state)
  if device == "cuda":
    torch.cuda.manual_seed(random_state)

  model.to(device)

  train_model = False
  test_modal = False

  if train_data is not None:
    train_model = True
    train_dataloader = DataLoader(
        train_data,
        batch_size=batch_size,
        shuffle=True,
    )

  if test_data is not None:
    test_model = True
    test_dataloader = DataLoader(
        test_data,
        batch_size=batch_size,
        shuffle=False,
    )

  for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch + 1} -----")

    epoch_data = {
      "train_loss": None,
      "train_accuracy": None,
      "test_loss": None,
      "test_accuracy": None,
    }

    if train_model:
      train_data = train_step(
          model=model,
          data_loader=train_dataloader,
          accuracy_fn=accuracy_fn,
          loss_fn=loss_fn,
          device=device,
          optimizer=optimizer
      )
      epoch_data["train_loss"] = train_data["train_loss"]
      epoch_data["train_accuracy"] = train_data["train_accuracy"]
      print(f'''train time on {device}: {train_data["train_total_time"]:.4f} seconds''')

    if test_model:
      test_data = test_step(
          model=model,
          data_loader=test_dataloader,
          accuracy_fn=accuracy_fn,
          loss_fn=loss_fn,
          device=device
      )
      epoch_data["test_loss"] = test_data["test_loss"]
      epoch_data["test_accuracy"] = test_data["test_accuracy"]
      print(f'''test time on {device}: {test_data["test_total_time"]:.4f} seconds''')

    output_info = []
    if epoch_data["train_loss"] is not None:
      output_info.append(
        f'''Train loss: {epoch_data["train_loss"]:.4f}'''
      )

    if epoch_data["train_accuracy"] is not None:
      output_info.append(
        f'''Train accuracy: {epoch_data["train_accuracy"]:.2f}%'''
      )

    if epoch_data["test_loss"] is not None:
      output_info.append(
        f'''Test loss: {epoch_data["test_loss"]:.4f}'''
      )

    if epoch_data["test_accuracy"] is not None:
      output_info.append(
        f'''Test accuracy: {epoch_data["test_accuracy"]:.2f}%'''
      )

    if len(output_info) != 0:
      print(" | ".join(output_info))
    print()

In [None]:
from torchvision import datasets
from torchvision.transforms import ToTensor
train_data = datasets.FashionMNIST(root="data", train=True, download=True, transform=ToTensor(), target_transform=None)
test_data = datasets.FashionMNIST(root="data", train=False, download=True, transform=ToTensor(), target_transform=None)

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from torch import nn

# NN without any non-linear activation function
class FashionMNISTModelV0(nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.layers = nn.Sequential(
        nn.Flatten(),
        nn.Linear(
            in_features=input_shape,
            out_features=hidden_units
        ),
        nn.Linear(
            in_features=hidden_units,
            out_features=output_shape
        )
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    return self.layers(x)

  
train_test_loop(
    FashionMNISTModelV0(
      input_shape=28 * 28,
      output_shape=len(train_data.classes),
      hidden_units=10
    ),
    train_data=train_data,
    test_data=test_data,
    epochs=5,
    device=device
)

In [None]:
from torch import nn

# Model with non-linear activation functions
class FashionMNISTModelV1(nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.layers = nn.Sequential(
        nn.Flatten(),
        nn.Linear(
            in_features=input_shape,
            out_features=hidden_units
        ),
        nn.ReLU(),
        nn.Linear(
            in_features=hidden_units,
            out_features=output_shape
        )
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    return self.layers(x)

  from torch import nn


train_test_loop(
    FashionMNISTModelV1(
      input_shape=28 * 28,
      output_shape=len(train_data.classes),
      hidden_units=10
    ),
    train_data=train_data,
    test_data=test_data,
    epochs=5,
    device=device
)

In [None]:
class FashionMNISTModelV2(nn.Module):
  def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
    super().__init__()
    self.conv_block_1 = nn.Sequential(
        nn.Conv2d(
            in_channels=input_shape,
            out_channels=hidden_units,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=hidden_units,
            out_channels=hidden_units,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )

    self.conv_block_2 = nn.Sequential(
        nn.Conv2d(
            in_channels=hidden_units,
            out_channels=hidden_units,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.Conv2d(
            in_channels=hidden_units,
            out_channels=hidden_units,
            kernel_size=3,
            stride=1,
            padding=1
        ),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2)
    )

    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(
            in_features=hidden_units * 7 * 7,
            out_features=output_shape
        )
    )

  def forward(self, x):
    x = self.conv_block_1(x)
    x = self.conv_block_2(x)

    return self.classifier(x)

train_test_loop(
    FashionMNISTModelV2(
      input_shape=1,
      output_shape=len(train_data.classes),
      hidden_units=10
    ),
    train_data=train_data,
    test_data=test_data,
    epochs=5,
    device=device
)