In [16]:
import sys
sys.path.insert(0, "..")

import os, glob
import yaml
from pathlib import Path

import torch
from torch.utils.data import DataLoader

#from cutmix.cutmix import CutMix
#from cutmix.utils import CutMixCrossEntropyLoss

from utils.models import get_model
from utils.data import CustomImageDataset
from utils.log import TextDocument

In [14]:
NUM_CLASSES = 4 
RESUME = False
epochs = 10
IMG_SIZE = 640
BATCH_SIZE = 8
WEIGHTS_DIR = "../weights"
CUTMIX = False

TRAIN_DATASET = "/home/david/Documents/Datasets/train_test_classification_quarter_size/train"
VALID_DATASET = "/home/david/Documents/Datasets/train_test_classification_quarter_size/valid"

Path(WEIGHTS_DIR).mkdir(exist_ok=True, parents=True)

In [15]:
from typing import List, Dict, Tuple
import matplotlib.pyplot as plt
import cv2
import numpy as np

from torchvision.transforms import Compose, Resize, Normalize, ToTensor, Lambda
from torchvision.transforms import ColorJitter, RandomAffine, RandomPerspective, RandomRotation, RandomErasing, RandomCrop, Grayscale
from torchvision.transforms import RandomChoice, RandomApply

def get_train_grayscale_transforms(img_size: int) -> Compose:
    """Returns data transformations/augmentations for train dataset.
    
    Args:
        img_size: The resolution of the input image (img_size x img_size)
    """
    return Compose([
        RandomApply([
            ColorJitter(brightness=0.3, contrast=0.01, saturation=0.01, hue=0),
            RandomAffine(0.1, translate=(0.04,0.04), scale=(0.04,0.04), shear=0.01, resample=2),
            RandomCrop(30),
            RandomPerspective(0.1)
        ]),
        Resize([img_size, img_size], interpolation=3),
        ToTensor(),
        Normalize(
            mean=[0.5203580774185134],
            std=[0.24102417452995067])
    ])
def get_test_grayscale_transforms(img_size: int) -> Compose:
    """Returns data transformations/augmentations for train dataset.
    
    Args:
        img_size: The resolution of the input image (img_size x img_size)
    """
    return Compose([
        Resize([img_size, img_size], interpolation=3),
        ToTensor(),
        Normalize(
            mean=[0.5203580774185134],
            std=[0.24102417452995067])
    ])

def show_confusion_matrix(matrix: List[List], labels: List[str]):
    """Display a nice confusion matrix given
    the confusion matrix in a 2D list + list of labels (decoder)
    
    Args:
        matrix: 2D array containing the values to display (confusion matrix)
        labels: Array containing the labels (indexed by corresponding label idx)
    """
    fig, ax = plt.subplots()
    fig.set_figheight(15)
    fig.set_figwidth(15)

    min_val, max_val = 0, len(labels)

    for i in range(max_val):
        for j in range(max_val):
            c = matrix[i][j]
            ax.text(i, j, str(int(c)), va='center', ha='center')

    ax.matshow(matrix, cmap=plt.cm.Blues)

    # Set number of ticks for x-axis
    ax.set_xticks(np.arange(max_val))
    # Set ticks labels for x-axis
    ax.set_xticklabels(labels, rotation='vertical', fontsize=16)

    # Set number of ticks for x-axis
    ax.set_yticks(np.arange(max_val))
    # Set ticks labels for x-axis
    ax.set_yticklabels(labels, rotation='horizontal', fontsize=16)
                    
    #ax.set_xlim(min_val, max_val)
    ax.set_ylim(max_val - 0.5, min_val - 0.5)
    plt.show()
    
def display_missclassified(class_to_idx: Dict[str,int], 
                           targets: List[int], 
                           predictions: List[int], 
                           images: List[np.ndarray], 
                           gridsize: Tuple[int] = (4,4)):
    """Display a grid with missclassified samples from test set.
    
    Args:
        class_to_idx: Class to idx encoder
        targets:      List containing all ground truths
        predictions:  List containing all predictions
        images:       List containing image arrays
        gridsize:     Tuple describing the final image grid
    """
    fig = plt.figure()
    plot_counter = 1
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    idx_to_class = {i:label for i, label in enumerate(class_to_idx)}
    for i in range(len(targets)):
        if plot_counter > gridsize[0]*gridsize[1]:
            break
        
        image = images[i].transpose(1, 2, 0)
        image = ((image * std) + mean) * 255
        image = image.astype("uint8")
    
        image = cv2.resize(image, (128, 128))
        image = cv2.putText(image, idx_to_class[predictions[i]], (0,20), 3, 0.4, (0,0,255), 1)
        if predictions[i] == targets[i]:
            pass
        else:
            ax = fig.add_subplot(gridsize[0], gridsize[1], plot_counter)
            ax.imshow(image)
            plot_counter += 1
    plt.show()

In [1]:
def train_one_epoch(model, train_dataloader, device, accumulate_steps=1):
    model.train()
    results = {
        "running_loss": 0
    }
    t = tqdm(train_dataloader)
    for i, (X, y) in enumerate(t):
        

        X = X.to(device)
        y = y.to(device)

        preds = model(X)
        loss = criterion(preds, y)
        
        results["running_loss"] += loss.cpu().detach()
        loss = loss/accumulate_steps
        loss.backward()
        
        if ((i+1) % accumulate_steps) == 0:
            optimizer.step()
            optimizer.zero_grad()
            
        t.set_description(f"{epoch+1}/{epochs} Train: {round(float(running_loss)/(i+1), 4)}")
    optimizer.step()
    optimizer.zero_grad()
    
    return results

def evaluate_model(model, valid_dataloader, device, save_images=False):
    results = {
        "running_loss": 0,
        "targets": list(),
        "predictions": list()
    }    

    model.eval()
    with torch.no_grad():

        if save_images:
            results["images"] = list() 
            
        t = tqdm(valid_dataloader)
        for i, (X, y) in enumerate(t):
            X = X.to(device)
            y = y.to(device)

            preds = model(X)
            results["predictions"] += list(preds.argmax(axis=1).cpu().detach().numpy())
            results["targets"] += list(np.array(y.cpu()))
            if save_images:
                results["images"] += list(np.array(X.cpu()))
        
            loss = criterion(preds, y)

            running_loss += loss.cpu().detach()
            t.set_description(f"Test: {round(float(running_loss/(i+1)), 4)}")
            
    return results

def calculate_metrics():
    pass

In [11]:
train_imgs = glob.glob(TRAIN_DATASET)
valid_imgs = glob.glob(VALID_DATASET)

train_labels = set([os.path.basename(os.path.dirname(img_path)) for img_path in train_imgs])
valid_labels = set([os.path.basename(os.path.dirname(img_path)) for img_path in valid_imgs])
class_to_idx = {label: idx for idx, label in enumerate(train_labels)}

train_dataset = CustomImageDataset(train_imgs, get_test_grayscale_transforms(IMG_SIZE), train_labels)
if CUTMIX:
    train_dataset = CutMix(train_dataset, num_class=NUM_CLASSES, beta=1.0, prob=0.5, num_mix=3)    # this is paper's original setting for cifar.
train_dataloader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle=True)

valid_dataset = CustomImageDataset(valid_imgs, get_test_grayscale_transforms(IMG_SIZE), valid_labels)
valid_dataloader = DataLoader(valid_dataset, batch_size = BATCH_SIZE, shuffle=True)

In [12]:
from tqdm import tqdm
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix

# Using gpu or not
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    print(f"Using CPU")
    
for model_name in model_names:
    model = get_model(model_name, NUM_CLASSES, 1)
    model.to(device)
    print(list(class_to_idx.keys()))

    if RESUME:
        optimizer, criterion = get_training_stuff(model)
    else:
        #weights = torch.Tensor(weights).to(device)
        #optimizer, criterion = get_training_stuff(model, weights=weights)
        optimizer = torch.optim.Adam(model.parameters())
        if CUTMIX:
            criterion = CutMixCrossEntropyLoss(True)
        else:
            criterion = torch.nn.CrossEntropyLoss()

    if RESUME:
        start_epoch = state_dict["epoch"]
        optimizer_state_dict = state_dict["optimizer_state_dict"]
        best_test_f1 = state_dict["best_test_f1"]

        model.load_state_dict(state_dict["model_state_dict"])
        optimizer.load_state_dict(state_dict["optimizer_state_dict"])
    else:
        best_test_f1 = 0
        start_epoch = 0

    for epoch in range(start_epoch, epochs):
        
        # Train one epoch    
        results = train_one_epoch(model, train_dataloader, device, accumulate_steps=ACCUM_STEPS)
        train_loss = float(results["running_loss"]/(i+1))


        if epoch+1 == epochs:
            results = evaluate_model(model, valid_dataloader, device, save_images=True)
            images = results["images"]
        else:
            results = evaluate_model(model, save_images=False)

        valid_loss = float(results["running_loss"]/(i+1))
        acc = accuracy_score(results["targets"], results["predictions"])
        f1 = f1_score(results["targets"], results["predictions"], average="macro", labels=np.unique(results["predictions"]))
        recall = recall_score(results["targets"], results["predictions"], average="macro", labels=np.unique(results["predictions"]))
        precision = precision_score(results["targets"], results["predictions"], average="macro", labels=np.unique(results["predictions"]))

    
        if f1 > best_test_f1:
            best_test_f1 = f1
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_test_f1': best_test_f1,
                'train_dataloader': train_dataloader,
                'test_dataloader': valid_dataloader,
                'class_to_idx': class_to_idx
            }, os.path.join(WEIGHTS_DIR, "best.pt"))

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'best_test_f1': best_test_f1,
        'train_dataloader': train_dataloader,
        'test_dataloader': valid_dataloader,
        'class_to_idx': class_to_idx,
    }, os.path.join(WEIGHTS_DIR, "last.pt"))

    display_missclassified(class_to_idx, targets, predictions, images, gridsize=(4,4))
    show_confusion_matrix(confusion_matrix(targets, predictions), list(class_to_idx.keys()))

Using GPU: NVIDIA GeForce RTX 3080


NameError: name 'model_name' is not defined

In [None]:
display_missclassified(class_to_idx, targets, predictions, images, gridsize=(4,4))
show_confusion_matrix(confusion_matrix(targets, predictions), list(class_to_idx.keys()))
