# Deep learning

In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim

from tqdm.auto import tqdm
import numpy as np


DATA_DIR = "data/office"
VAL_SPLIT = .2
RANDOM_SEED = 42
BATCH_SIZE = 2

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [2]:
import sklearn.preprocessing
import imageio.v2 as imageio

import os
from typing import Callable


def resnet_preprocessor(image: np.ndarray) -> np.ndarray:
    """
    Preprocesses an image for ResNet model.

    :param numpy.ndarray image: The input image.
    :return: Preprocessed image.
    :rtype: numpy.ndarray
    """
    # assume RGB dimension, either as first or last dimension
    #assert image.shape[0] == 3 or image.shape[-1] == 3

    #if image.shape[-1] == 3:
        #image = np.moveaxis(image, -1, 0)
        #print(image.shape)

    preprocess = torchvision.transforms.Compose(
        [
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(
                mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
            ),
        ]
    )

    image = preprocess(image)
    #image = image.unsqueeze(0) #mini-batch creation
    return image


class ImageDataset(torch.utils.data.Dataset):
    """
    Lazily loads images from a root directory.
    Directory is assumed to be of shape "<root>/<class_name>/<instance_file>".
    Allows custom functions for reading, preprocessing each image and setting the label encodings.
    """

    def __init__(
        self,
        data_dir: str,
        parser_func: Callable = imageio.imread,
        preprocessing_func: Callable[[np.ndarray], np.ndarray] = resnet_preprocessor,
        label_encoder=None,
    ):
        """
        Initializes the ImageDataset.

        :param str data_dir: Root directory containing the dataset.
        :param parser_func: Function to parse images.
        :type parser_func: Callable, optional
        :param preprocessing_func: Function to preprocess images.
        :type preprocessing_func: Callable[[numpy.ndarray], numpy.ndarray], optional
        :param label_encoder: Encoder for label encoding.
        :type label_encoder: sklearn.preprocessing.LabelEncoder or None, optional
        """
        self.parser_func = parser_func
        self.preprocessing_func = preprocessing_func
        self.label_encoder = label_encoder
        self.samples = self._load_dataset_paths(data_dir)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        image_path, label = self.samples[idx]
        image = self.parser_func(image_path)
        image = self.preprocessing_func(image)

        if not torch.is_tensor(image):
            image = torch.tensor(image)

        return image, label

    def _load_dataset_paths(self, data_dir):
        """
        Loads paths of images in the dataset.

        :param str data_dir: Root directory containing the dataset.
        :return: List of tuples containing image paths and their corresponding labels.
        :rtype: List[Tuple[str, int]]
        """
        class_names = os.listdir(data_dir)

        if self.label_encoder is None:
            self.label_encoder = sklearn.preprocessing.LabelEncoder()
            self.label_encoder.fit(class_names)

        samples = []
        for class_name in tqdm(class_names):
            class_data_dir = os.path.join(data_dir, class_name)

            for file_name in os.listdir(class_data_dir):
                samples.append(
                    (
                        os.path.join(class_data_dir, file_name),
                        self.label_encoder.transform([class_name])[0],
                    )
                )

        return samples

In [3]:
def custom_collate(batch):
    # Sort the batch by image height in descending order
    batch = sorted(batch, key=lambda x: x[0].shape[1], reverse=True)

    # Get the maximum height and width among all images in the batch
    max_height = max(img.shape[1] for img, _ in batch)
    max_width = max(img.shape[2] for img, _ in batch)

    # Pad each image to match the maximum height and width
    padded_batch = []
    for img, label in batch:
        # Calculate padding sizes
        pad_height = max_height - img.shape[1]
        pad_width = max_width - img.shape[2]

        # Pad the image
        padded_img = torch.nn.functional.pad(img, (0, pad_width, 0, pad_height))

        padded_batch.append((padded_img, label))

    # Stack images and labels into tensors
    images = torch.stack([img for img, _ in padded_batch])
    labels = torch.tensor([label for _, label in padded_batch])

    return images, labels

In [4]:
from torch.utils.data.sampler import SubsetRandomSampler


dataset = ImageDataset(os.path.join(DATA_DIR, "amazon"))
dataset_size = len(dataset)

indices = list(range(dataset_size))
split = int(np.floor(VAL_SPLIT * dataset_size))

np.random.default_rng(RANDOM_SEED).shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(
    dataset, batch_size=BATCH_SIZE, sampler=train_sampler, collate_fn=custom_collate
)
validation_loader = torch.utils.data.DataLoader(
    dataset, batch_size=BATCH_SIZE, sampler=valid_sampler, collate_fn=custom_collate
)

  0%|          | 0/31 [00:00<?, ?it/s]

In [5]:
sample_image, label = dataset[0]
print(sample_image.shape)

torch.Size([3, 300, 300])


In [6]:
print(len(dataset))

2817


In [7]:
import time
import tempfile


def train_model(
    model,
    criterion,
    optimizer,
    scheduler,
    device: str,
    train_dataloader,
    val_dataloader,
    train_size: int,
    val_size: int,
    num_epochs: int = 25,
):

    dataloaders = {"train": train_dataloader, "val": val_dataloader}
    dataset_sizes = {"train": train_size, "val": val_size}

    since = time.time()
    # Create a temporary directory to save training checkpoints
    with tempfile.TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join(tempdir, "best_model_params.pt")

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        for epoch in range(num_epochs):
            print(f"Epoch {epoch}/{num_epochs - 1}")
            print("-" * 10)

            # Each epoch has a training and validation phase
            for phase in ["train", "val"]:
                if phase == "train":
                    model.train()  # Set model to training mode
                else:
                    model.eval()  # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in tqdm(dataloaders[phase]):
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == "train"):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == "train":
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == "train":
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

                # deep copy the model
                if phase == "val" and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)

            print()

        time_elapsed = time.time() - since
        print(
            f"Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s"
        )
        print(f"Best val Acc: {best_acc:4f}")

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path))
    return model

In [8]:
model = torch.hub.load(
    "pytorch/vision:v0.10.0", "resnet34", weights=torchvision.models.ResNet34_Weights.DEFAULT
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(model.parameters())
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

model = train_model(
    model,
    criterion,
    optimizer_ft,
    exp_lr_scheduler,
    device,
    train_loader,
    validation_loader,
    len(train_indices),
    len(val_indices),
    num_epochs=25,
)

Using cache found in /home/dimits/.cache/torch/hub/pytorch_vision_v0.10.0


Epoch 0/24
----------


  0%|          | 0/1127 [00:00<?, ?it/s]