In [1]:
from torchvision.models import resnet18, ResNet18_Weights
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import v2
from tqdm import tqdm
import pandas as pd
import numpy as np
import torch
import cv2
import os


In [2]:
class InnerDataset(Dataset):
    def __init__(
        self,
        paths: list[str],
        targets: list[int],
        image_size: tuple[int, int],
        transform: v2.Transform,
        target_transform: v2.Transform = None,
        load: bool = False,
    ):
        self.paths = paths
        self.targets = targets
        self.load = load
        self.image_size = image_size

        self.transform = transform
        self.target_transform = target_transform

        if load:
            self.images = (
                np.moveaxis(
                    np.array(
                        [cv2.resize(cv2.imread(path), image_size) for path in paths]
                    ),
                    3,
                    1,
                )
                / 255
            ).astype(np.float32)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        if self.load:
            img = self.images[idx]
        else:
            img = (
                np.moveaxis(
                    cv2.resize(cv2.imread(self.paths[idx]), self.image_size), 2, 0
                )
                / 255
            ).astype(np.float32)
                
        if self.transform:
            img = self.transform(img)

        target = self.targets[idx]

        if self.target_transform:
            target = self.target_transform(target)

        return img, target

In [3]:
def image_dataset_from_directory(
    directory: str,
    batch_size,
    image_size,
    shuffle,
    seed,
    validation_split,
    subset=None,
    transform=None,
    target_transform=None,
    valid_size=0.25,
):
    if validation_split is True and subset not in ["train", "valid", "both"]:
        raise ValueError("Incorrect subset value")

    folders = os.listdir(directory)
    targets = []
    paths = []
    ind2class = dict()
    class2ind = dict()
    for i, folder in enumerate(folders):
        ind2class[i] = folder
        class2ind[folder] = i
        p = [
            os.path.join(directory, folder, file)
            for file in os.listdir(os.path.join(directory, folder))
        ]
        t = [
            i,
        ] * len(p)
        targets.extend(t)
        paths.extend(p)

    paths = np.array(paths)
    targets = np.array(targets)

    if not validation_split:
        ds = InnerDataset(
            paths,
            targets,
            image_size,
            transform=transform,
            target_transform=target_transform,
        )
        return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

    rng = np.random.default_rng(seed=seed)
    inds = rng.random(size=len(paths), dtype=np.float32)
    train_inds = inds > valid_size

    match subset:
        case "train":
            return DataLoader(
                InnerDataset(
                    paths[train_inds],
                    targets[train_inds],
                    image_size,
                    transform=transform,
                    target_transform=target_transform,
                ),
                batch_size=batch_size,
                shuffle=shuffle,
            )
        case "valid":
            return DataLoader(
                InnerDataset(
                    paths[~train_inds],
                    targets[~train_inds],
                    image_size,
                    transform=transform,
                    target_transform=target_transform,
                ),
                batch_size=batch_size,
                shuffle=shuffle,
            )
        case "both":
            return (
                DataLoader(
                    InnerDataset(
                        paths[train_inds],
                        targets[train_inds],
                        image_size,
                        transform=transform,
                        target_transform=target_transform,
                    ),
                    batch_size=batch_size,
                    shuffle=shuffle,
                ),
                DataLoader(
                    InnerDataset(
                        paths[~train_inds],
                        targets[~train_inds],
                        image_size,
                        transform=transform,
                        target_transform=target_transform,
                    ),
                    batch_size=batch_size,
                    shuffle=shuffle,
                ),
            )

In [4]:
def load_dataset(
        path: str, 
        batch_size: int, 
        image_size: tuple[int, int], 
        shuffle: bool, 
        split: str,
        transform=None,
        target_transform=None,
        ) -> tuple[DataLoader, dict[int, str]]:
    '''
    Given a `path` to a csv index file loads one of the dataset splits. 
    Paths in the index are assumed to be relative to the csv file. 
    The file contains three columns: "filepaths", "labels" and "data set", path to the image, image label and dataset split respectively.

    Arguments:
        path: path to the csv index file
        batch_size: size of batches in the dataset
        image_size: size to resize the images to
        shuffle: whether to shuffle the index. If False original index order is preserved
        split: split to use. One of "train", "valid" or "test"

    Returns:
        The loaded dataset
        A dictionary mapping class indices to class names'''


    root = os.path.dirname(path)
    df = pd.read_csv(path)
    subset = df[df["data set"] == split]
    paths = [path for path in map(lambda x: os.path.join(root, x), subset['filepaths']) if os.path.exists(path)]
    labels = subset['labels']

    unique_labels = list(subset['labels'].unique())
    ind2class = {ind:value for ind, value in enumerate(unique_labels)}
    class2ind = {value:ind for ind, value in enumerate(unique_labels)}
    labels = [class2ind[x] for x in subset['labels']]

    ds = InnerDataset(paths, labels, image_size, transform, target_transform)
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle), ind2class

In [5]:
class LoaderDataset(Dataset):
    def __init__(
        self,
        directory: str,
        batch_size,
        image_size,
        shuffle,
        seed,
        validation_split=False,
        subset="train",
        transform=None,
        target_transform=None,
        valid_size=0.25,
        load=False,
    ):
        self.shuffle = shuffle
        self.image_size = image_size
        self.batch_size = batch_size
        self.load = load
        self.transform = transform
        self.target_transform = target_transform

        folders = os.listdir(directory)
        targets = []
        paths = []
        ind2class = dict()
        class2ind = dict()

        for i, folder in enumerate(folders):
            ind2class[i] = folder
            class2ind[folder] = i
            p = [
                os.path.join(directory, folder, file)
                for file in os.listdir(os.path.join(directory, folder))
            ]
            t = [
                i,
            ] * len(p)
            targets.extend(t)
            paths.extend(p)

        paths = np.array(paths)
        targets = torch.tensor(np.array(targets))

        self.rng = np.random.default_rng(seed=seed)
        if not validation_split:
            self.paths = paths
            self.targets = targets
        else:
            inds = self.rng.random(size=len(paths), dtype=np.float32)
            train_inds = inds > valid_size
            match subset:
                case "train":
                    self.paths = paths[train_inds]
                    self.targets = targets[train_inds]
                case "valid":
                    self.paths = paths[~train_inds]
                    self.targets = targets[~train_inds]

        self.index = np.arange(len(self.paths))
        if self.shuffle:
            self.index = self.rng.choice(self.index, len(self.paths), replace=False)

        if self.load:
            img = np.array([cv2.resize(cv2.imread(path), image_size) for path in paths])
            img = img / 255
            img = np.moveaxis(img, 3, 1)
            self.images = torch.tensor(img)

    def __len__(self):
        return int(len(self.paths) // self.batch_size)

    def __iter__(self):
        if self.shuffle:
            self.index = self.rng.choice(self.index, len(self.paths), replace=False)
        for i in range(0, len(self)):
            yield self.__getitem__(i)

    def __getitem__(self, idx):
        inds = self.index[self.batch_size * idx : self.batch_size * (idx + 1)]
        if self.load:
            img = self.images[inds]
        else:
            img = torch.tensor(
                np.moveaxis(
                    np.array(
                        [
                            cv2.resize(cv2.imread(self.paths[idx]), self.image_size)
                            for idx in inds
                        ]
                    ),
                    3,
                    1,
                )
                / 255
            ).to(dtype=torch.float32)
        target = self.targets[inds]
        if self.transform:
            img = self.transform(img)

        if self.target_transform:
            target = self.target_transform(target)
        return img, target

# Test functions

In [6]:
def create_model():
    model = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
    for param in model.parameters():
        param.requires_grad = False

    model.fc = torch.nn.Sequential(
        torch.nn.BatchNorm1d(512),
        torch.nn.Linear(512, 512),
        torch.nn.LeakyReLU(),
        torch.nn.BatchNorm1d(512),
        torch.nn.Linear(512, 420),
    )

    model = model.to(device="cuda")
    return model

In [7]:
def train_loop(model, dl, update_tqdm):
    loss_fn = torch.nn.CrossEntropyLoss()
    optim = torch.optim.Adam(model.parameters())

    running_loss = 0
    with tqdm(dl) as batches:
        for i, (data, targets) in enumerate(batches):
            data = data.to(device="cuda")
            targets = targets.to(device="cuda").to(dtype=torch.int64)

            optim.zero_grad()
            pred = model(data)
            
            loss = loss_fn(pred, targets)
            loss.backward()

            optim.step()
            running_loss += loss.detach().cpu()

            if (i + 1) % update_tqdm == 0:
                batches.set_description(str(running_loss/update_tqdm))
                running_loss = 0


## image_dataset_from_dictionary

In [8]:
dl = image_dataset_from_directory(
    "C:\\gr\\TempFolder\\NeuralNetworks\\datasets\\archive\\train",
    batch_size=256,
    image_size=(64,64),
    shuffle=True,
    seed=42,
    validation_split=False,
    transform=v2.Compose([
        v2.ColorJitter(
            brightness=0.1
        ),
        v2.RandomHorizontalFlip(),
    ])
)

In [9]:
train_loop(create_model(), dl, 50)

tensor(3.7033): 100%|██████████| 262/262 [03:57<00:00,  1.10it/s]


## load_dataset

In [10]:
dl, d = load_dataset(
    "C:\\gr\\TempFolder\\NeuralNetworks\\datasets\\archive\\birds.csv",
    batch_size=256,
    image_size=(64, 64),
    shuffle=True,
    split="train",
    transform=v2.Compose(
        [
            v2.ColorJitter(brightness=0.1),
            v2.RandomHorizontalFlip(),
        ]
    ),
)


In [11]:
train_loop(create_model(), dl, 50)

tensor(3.7603): 100%|██████████| 262/262 [01:00<00:00,  4.30it/s]


## Dataset

In [12]:
dl = LoaderDataset(
    directory="C:\\gr\\TempFolder\\NeuralNetworks\\datasets\\archive\\train",
    batch_size=256,
    image_size=(64, 64),
    shuffle=True,
    seed=42,
    transform=v2.Compose(
        [
            v2.ColorJitter(brightness=0.1),
            v2.RandomHorizontalFlip(),
        ]
    ),
)


In [13]:
train_loop(create_model(), dl, 50)

tensor(3.6803): 100%|██████████| 261/261 [00:58<00:00,  4.48it/s]
