In [None]:
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
import numpy as np
import os, time
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
from pathlib import Path
import wandb

In [None]:
wandb.init(
    project="my-awesome-project",

    config={
    "learning_rate": 0.02,
    "architecture": "CNN",
    "dataset": "CIFAR-100",
    "epochs": 10,
    }
)

In [None]:
INIT_LR = 0.0001
NUM_EPOCHS = 40
BATCH_SIZE = 4
N_CLASSES = 7
MODEL_PATH = "mvlidar.pth"
PLOT_PATH = "plot.png"
TEST_PATHS = "test_paths.txt"

In [None]:

#| eval: false
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {DEVICE} device")
# determine if we will be pinning memory during data loading
PIN_MEMORY = True if DEVICE == "cuda" else False

Pré processamento

In [None]:
#| eval: false
# insert the path to your dataset
train_path = ROOT_PATH+"/train/"
test_path = ROOT_PATH+"/test/"

train_merged_path = ROOT_PATH+"/train-merged/"
test_merged_path = ROOT_PATH+"/test-merged/"

data_paths = [(train_path, train_merged_path), (test_path, test_merged_path)]

for data_path, merged_path in data_paths:
    os.makedirs(merged_path, exist_ok=True)
    api.merge_images(data_path, merged_path)

Pré processamento

In [None]:
#| eval: false
train_segmentation_mask = ROOT_PATH+"/train_segmentation_mask/"
test_segmentation_mask = ROOT_PATH+"/test_segmentation_mask/"

masks_paths = [(train_path, train_segmentation_mask), (test_path, test_segmentation_mask)]

remapping_rules = {
  1: 1,
  4: 2,
  6: 3,
  7: 4,
  2: 4,
  9: 5,
  11: 6
}

for data_path, mask_path in masks_paths:
    os.makedirs(mask_path, exist_ok=True)
    data.remap_segmentation_masks(data_path, mask_path, remapping_rules=remapping_rules)

Lembrar de ajustar o nome das variaveis

In [None]:
#| eval: false
transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = data.SemanticDataset(image_path=train_imgs_path,
                                     mask_path=train_anns_path,
                                     transform=transform)

test_dataset = data.SemanticDataset(image_path=test_imgs_path,
                                    mask_path=test_anns_path,
                                    transform=transform)

In [None]:

#| eval: false
trainLoader = DataLoader(train_dataset, 
                         shuffle=True, 
                         batch_size=BATCH_SIZE, 
                         pin_memory=PIN_MEMORY, 
                         num_workers=os.cpu_count())

testLoader = DataLoader(test_dataset, 
                        shuffle=False, 
                        batch_size=BATCH_SIZE, 
                        pin_memory=PIN_MEMORY, 
                        num_workers=os.cpu_count())

In [None]:

class Learner:
    def __init__(self, lr, n_classes):
        self.model = model.MVLidar(n_classes).to(DEVICE)
        self.optimizer = Adam(model.parameters(), lr=lr)

    def predict(self, x):
        return self.model(x)

    def update(self, loss):
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

In [None]:

class Evaluator:
    def __init__(self):
        self.loss_fn = CrossEntropyLoss(reduction='none')

    def get_loss(self, y, y_hat):
        bin_mask_train = (y !=0).int()
        loss = self.loss_fn(y_hat, y)
        loss = loss * bin_mask_train
        loss = loss.mean()
        return loss

In [None]:
class Trainer:
    def __init__(self, train_dataset, test_dataset, learner: Learner, evaluator: Evaluator, batch_size):
        self.train_dataset = train_dataset
        self.test_dataset = test_dataset
        self.learner = learner
        self.batch_size = batch_size
        self.evaluator = evaluator
        self.log = defaultdict(list)
        self.log['correct'].append(0)
        self.log['total'].append(0)

    def train(self):
        self.learner.model.train()
        dataloader = DataLoader(self.train_dataset, batch_size=self.batch_size)
        epoch_loss = 0
        num_batches = len(dataloader)

        for batch_idx, (X,y) in enumerate(dataloader):
            y_hat = self.learner.predict(X)
            loss = self.evaluator.get_loss(y,y_hat)
            epoch_loss += loss.item()
            self.learner.update(loss)

        #print(f"avarage train loss: {epoch_loss/num_batches}")
        self.log['train_loss'] += [epoch_loss / num_batches]

    def test(self):
        self.learner.model.eval()
        dataloader = DataLoader(self.test_dataset, batch_size=self.batch_size)
        epoch_loss, correct = 0 , 0
        num_batches = len(dataloader)

        with torch.no_grad():
            for X,y in dataloader:
                y_hat = self.learner.predict(X)
                loss = self.evaluator.get_loss(y,y_hat)
                epoch_loss += loss.item()
                for pred, answer in zip(torch.squeeze(y_hat).detach().numpy(), torch.squeeze(y).detach().numpy()):
                    if answer == -1 and pred < 0.5:
                        correct += 1
                    elif answer == 1 and pred >= 0.5:
                        correct += 1

            self.log['test_loss'] += [epoch_loss / num_batches]
            self.log['correct'] = correct
            self.log['total'] = len(dataloader.dataset)

    def run(self, n_epochs: int):
        for t in range(n_epochs):
            self.train()
            self.test()
            if (t+1) % 100 == 0:
                print(f"Epoch {t+1}\n------------")
                print(f"Train loss {self.log['train_loss'][-1]}\nTest loss {self.log['test_loss'][-1]}")
        print("Done!")
     
