<a href="https://colab.research.google.com/github/Noone0is0here/KJ_prezencka/blob/main/Festival_FIT_DVC_workshop_(cutt_ly_git_pro_data).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Colab for DVC workshop

Implements a very basic pipeline that can be changed and present the dvc pipeline features. 

 - Make your own copy `File -> Save a copy in drive` and use that 
 - Initally, just hit `Runtime -> Run all`. 
 - You can then play with the written files - just change the cell for the file and run the cell. It will override the file in storage.
 - You can also add scratch code cell (`Insert -> Scratch code cell`) where you can run the commands (shell commands start with `!`).


In [None]:
# @title Python dependencies installation
# @markdown Installs dependencies and it will mention that you should restart the runtime.
# @markdown You can restart it; however, it is not necessary.
! pip install dvc pytorch-lightning typer torchmetrics

In [None]:
# @title Initialize git and dvc repositories.

! git init
! git config --global user.email "you@example.com"
! git config --global user.name "Your Name"
! dvc init
! git add .dvc
! git commit -m "Initial DVC setup"

In [None]:
# @title data_prep.py
# @markdown File contains data conversion and preparation for training or testing.

%%writefile data_prep.py

import pandas as pd
import typer
import numpy as np
import pickle
from typing import Optional
from scipy.ndimage import rotate
import numpy as np
from tqdm import tqdm


def generate_rotated_images(images, original_labels, max_angle) -> np.ndarray:
  rotated = []
  labels = []
  for image, label in tqdm(zip(images, original_labels)):
    for angle in range(-max_angle, max_angle+1, 10):
      rotated.append(rotate(image, angle, reshape=False))
      labels.append(label)
  return np.stack(rotated), np.array(labels)



def prepare_data(
    input_path: str, 
    output_path: str, 
    rotate_max_angle: Optional[int] = typer.Option(None),
    ):
  headers_pixels = [f"pix{i}" for i in range(28*28)]
  headers = ["label"] + headers_pixels
  data = pd.read_csv(input_path, header=None, names=headers)
  images = np.stack([data[c] for c in headers_pixels])
  images = images.T.reshape(-1, 28, 28).astype(np.uint8)
  labels = np.array(data.label)
  if rotate_max_angle is not None:
    images, labels = generate_rotated_images(images, labels, rotate_max_angle)
  print(images.shape)
  data = {"labels": labels, "images": images}
  with open(output_path, "wb") as f:
    pickle.dump(data, f)


if __name__ == "__main__":
  typer.run(prepare_data)


In [None]:
# @title model.py
# @markdown Just a helper file with model and some dataset loading.
%%writefile model.py


import torch.nn as nn
import torch.nn.functional as F
import torch
from torchmetrics.classification.accuracy import Accuracy
import pytorch_lightning as pl
import pickle

def get_dataset(path):
  with open(path, "rb") as f:
    data = pickle.load(f)
  return torch.utils.data.TensorDataset(torch.tensor(data["images"]), torch.tensor(data["labels"]))


def _init_weights(m: nn.Module):
    """Initialize weight of all linear and convolution layers.

    Args
    ----
        m (nn.Module): each module of NN
    """
    if isinstance(m, (nn.Conv2d, nn.Linear)):
        nn.init.normal_(m.weight, std=0.01)
        nn.init.constant_(m.bias, 0)

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.c1 = nn.Conv2d(1, 6, kernel_size=5, padding="same")
        self.pool = nn.MaxPool2d((2,2))
        self.c2 = nn.Conv2d(6, 12, kernel_size=5, padding="same")
        self.c3 = nn.Conv2d(12, 24, kernel_size=5, padding="same")
        
        self.feature_vec = nn.Linear(216, 64)
        self.final_cls = nn.Linear(64, 10)

        self.apply(_init_weights)

    def forward(self, x):
        x = self.pool(F.leaky_relu(self.c1(x)))
        x = self.pool(F.leaky_relu(self.c2(x)))
        x = self.pool(F.leaky_relu(self.c3(x)))
        x = torch.flatten(x, start_dim=1)
        x = F.leaky_relu(self.feature_vec(x))
        x = F.softmax(self.final_cls(x),dim=1)
        return x


class LitModel(pl.LightningModule):
  def __init__(self):
    super().__init__()
    self.net = Net()
    # loss function
    self.criterion = torch.nn.CrossEntropyLoss()

    # use separate metric instance for train, val and test step
    # to ensure a proper reduction over the epoch
    self.accuracies = {phase: Accuracy() for phase in ("train", "val", "test")}
    

  def forward(self, image):
    image = torch.unsqueeze(image, dim=1)
    image = image.float() / 255
    return self.net(image)
    

  def _step(self, batch, phase):
    x, y = batch
    logits = self.forward(x)
    loss = self.criterion(logits, y)
    preds = torch.argmax(logits, dim=1)
    acc_metric = self.accuracies[phase]
    acc = acc_metric(preds, y)
    self.log(f"{phase}/acc", acc, on_step=False, on_epoch=True, prog_bar=True)
    return {"loss": loss, "preds": preds, "targets": y}

  def training_step(self, batch, batch_idx):
    return self._step(batch, "train")

  def validation_step(self, batch, batch_idx):
    return self._step(batch, "val")

  def test_step(self, batch, batch_idx):
    return self._step(batch, "test")

  def on_epoch_end(self):
    for acc in self.accuracies.values():
      acc.reset()
  
  def configure_optimizers(self):
    optimizer = torch.optim.Adam(self.parameters(), lr=0.001, weight_decay=0.00005) 
    return optimizer


In [None]:
# @title train.py
# @markdown Runs training and saves the trained model to model.ckpt

%%writefile train.py

import typer
import torch 
import pytorch_lightning as pl

from model import LitModel, get_dataset

def train(max_epochs: int = typer.Option(10), 
          seed: int = typer.Option(123)):
  pl.seed_everything(seed)
  
  ds = get_dataset("sample_data/mnist_train_small.pkl")
  train_images_num = int(0.95*len(ds))
  train_ds, val_ds = torch.utils.data.random_split(ds, [train_images_num, len(ds)-train_images_num], generator=torch.Generator().manual_seed(42))
  train_loader = torch.utils.data.DataLoader(train_ds, batch_size=128, shuffle=True, num_workers=2)
  val_loader = torch.utils.data.DataLoader(val_ds, batch_size=128, shuffle=False)
  
  trainer = pl.Trainer(max_epochs=max_epochs)
  model = LitModel()
  trainer.fit(model, train_loader, val_dataloaders=val_loader)

  trainer.save_checkpoint("model.ckpt")

if __name__ == "__main__":
  typer.run(train)



In [None]:
# @title test.py
# @markdown Evaluates the trained model on test data.

%%writefile test.py

import typer
import torch 
import pytorch_lightning as pl
import json

from model import LitModel, get_dataset

def test():
  model = LitModel.load_from_checkpoint("model.ckpt")
  ds = get_dataset("sample_data/mnist_test.pkl")
  loader = torch.utils.data.DataLoader(ds, batch_size=128, shuffle=False)
  
  trainer = pl.Trainer()
  results = trainer.test(model, loader)
  results = results[0]
  
  with open("metrics.json", "wt") as f:
    json.dump(results, f)


if __name__ == "__main__":
  typer.run(test)


In [None]:
#@title dvc.yaml

%%writefile dvc.yaml

stages:
  prepare_data:
    foreach:
      train: 
        input_path: sample_data/mnist_train_small.csv
        output_path: sample_data/mnist_train_small.pkl
        additional_args: ""
      test:
        input_path: sample_data/mnist_test.csv
        output_path: sample_data/mnist_test.pkl
        additional_args: ""
    do:
      cmd: python data_prep.py ${item.input_path} ${item.output_path} ${item.additional_args}
      deps:
        - ${item.input_path}
        - data_prep.py
      outs:
        - ${item.output_path}

  train:
    cmd: python train.py --max-epochs 1
    deps: 
      - sample_data/mnist_train_small.pkl
    outs:
      - model.ckpt

  
  test:
    cmd: python test.py
    deps:
     - model.ckpt
     - sample_data/mnist_test.pkl
    metrics:
      - metrics.json:
          cache: false


In [None]:
!dvc repro

In [None]:
!git add dvc* *.py metrics.json
!git commit -a -m "Initial training"