In [1]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import Dataset, Subset, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np
import torch.optim as optim
import torch.nn as nn
import torch
import torch_directml
dml = torch_directml.device()
print(dml)

privateuseone:0


In [2]:
# Load the dataset with data augmentation and normalizing
class Cifar10(Dataset):
    def __init__(self):
        super().__init__()
        transform_train = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.RandomAutocontrast(),
            transforms.ToTensor(),
            transforms.Resize((128, 128)), # B2 for resizing image
            transforms.Normalize(mean=[0.4854, 0.4567, 0.4062], std=[0.2291, 0.2249, 0.2253])
        ])
        self.data = ImageFolder("clean_train", transform=transform_train)

    def __len__(self):
        return self.data.__len__()

    def __getitem__(self, idx):
        features, label = self.data[idx]
        return features, label

    def split_data(self):
        indices = np.arange(len(self.data))
        targets = np.array(self.data.targets)
        train_indices, val_indices = train_test_split(indices,
                                                       test_size=0.2,
                                                       random_state=42,
                                                       stratify=targets)
        final_train_set = Subset(self.data, train_indices)
        final_val_set = Subset(self.data, val_indices)
        return final_train_set, final_val_set
        
train_data = Cifar10()
# Split the dataset
train_set, val_set = train_data.split_data()

In [3]:
# Build the CNN
class CnnCifar(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1), # B4. Padding in each CONV layer
            nn.BatchNorm2d(64),
            nn.LeakyReLU(negative_slope=0.01),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1), # B4. Padding in each CONV layer
            nn.BatchNorm2d(128),
            nn.LeakyReLU(negative_slope=0.01),
            nn.MaxPool2d(kernel_size=2),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(0.2)
        )
        self.classifier = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.classifier(x)
        return x

In [4]:
class EarlyStopping:
    def __init__(self, patience, delta):
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.early_stop = False
        self.counter = 0
        self.best_model_state = None

    def __call__(self, model, val_loss):
        score = -val_loss

        # If there is no best score then it's current best score
        if self.best_score is None:
            self.best_score = score
            self.best_model_state = model.state_dict()

        curr_score = self.best_score + self.delta
        if score < curr_score:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        elif score > curr_score:
            self.best_score = score
            self.best_model_state = model.state_dict()
            self.counter = 0

    def get_best_model(self, model):
        model.load_state_dict(self.best_model_state)