In [1]:
import numpy as np
import torch
from torch import nn
import torchvision
from torchvision.transforms import v2
import matplotlib.pyplot as plt

#### Augmentations and Transformations List:
Pad
RandomPerspective
RandomRotation
RandomAffine
GaussianBlur

policy == AutoAugmentPolicy.CIFAR10:
            return [
                (("Invert", 0.1, None), ("Contrast", 0.2, 6)),
                (("Rotate", 0.7, 2), ("TranslateX", 0.3, 9)),
                (("Sharpness", 0.8, 1), ("Sharpness", 0.9, 3)),
                (("ShearY", 0.5, 8), ("TranslateY", 0.7, 9)),
                (("AutoContrast", 0.5, None), ("Equalize", 0.9, None)),
                (("ShearY", 0.2, 7), ("Posterize", 0.3, 7)),
                (("Color", 0.4, 3), ("Brightness", 0.6, 7)),
                (("Sharpness", 0.3, 9), ("Brightness", 0.7, 9)),
                (("Equalize", 0.6, None), ("Equalize", 0.5, None)),
                (("Contrast", 0.6, 7), ("Sharpness", 0.6, 5)),
                (("Color", 0.7, 7), ("TranslateX", 0.5, 8)),
                (("Equalize", 0.3, None), ("AutoContrast", 0.4, None)),
                (("TranslateY", 0.4, 3), ("Sharpness", 0.2, 6)),
                (("Brightness", 0.9, 6), ("Color", 0.2, 8)),
                (("Solarize", 0.5, 2), ("Invert", 0.0, None)),
                (("Equalize", 0.2, None), ("AutoContrast", 0.6, None)),
                (("Equalize", 0.2, None), ("Equalize", 0.6, None)),
                (("Color", 0.9, 9), ("Equalize", 0.6, None)),
                (("AutoContrast", 0.8, None), ("Solarize", 0.2, 8)),
                (("Brightness", 0.1, 3), ("Color", 0.7, 0)),
                (("Solarize", 0.4, 5), ("AutoContrast", 0.9, None)),
                (("TranslateY", 0.9, 9), ("TranslateY", 0.7, 9)),
                (("AutoContrast", 0.9, None), ("Solarize", 0.8, 3)),
                (("Equalize", 0.8, None), ("Invert", 0.1, None)),
                (("TranslateY", 0.7, 9), ("AutoContrast", 0.9, None)),
            ]




In [2]:
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [3]:
def get_label_names_from_bytes_dict(x:dict) -> list:
    data = x.get(b"label_names")
    labels =  list(map(lambda m: m.decode("utf-8"),data))
    label_dict = dict()
    for i in range(10):
        label_dict.update({labels[i]: i})
    return label_dict

In [4]:
def treat_image_structure(x):
    return np.transpose(np.reshape(x, (3,32,32)), (1,2,0))

In [5]:
def get_data(file_path):
    X = []
    y = []

    file_data = unpickle(file_path)
    for image, label in zip(file_data[b"data"],file_data[b"labels"]):
        corrected_image = treat_image_structure(image)
        X.append(corrected_image)
        y.append(label)

    X = np.array(X)
    y = np.array(y)

    return (X,y)


In [6]:
def get_data_from_dir(dir_path, start_pattern="data_batch_"):
    import os
    X = []
    y = []
    for path in os.listdir(dir_path):
        if path.startswith(start_pattern):
            final_path = "".join((dir_path,path))

            X_batch, y_batch = get_data(final_path)
            X.extend(X_batch)
            y.extend(y_batch)
       
    X = np.array(X)
    y = np.array(y)
    return (X,y)

In [7]:
def preprocess_data(images):
    '''
    We apply torchvision's auto-augment policy specially designed to enhance the generalization of the model on CIFAR10 dataset.
    Thus, this creates better augmented data, more suitable for generalized learning of features.
    The transformations are applied randomly from bunch of transformations. Random for every image.
    '''

    augmenter = v2.AutoAugment(v2.AutoAugmentPolicy.CIFAR10)
    augmented_images = augmenter(images)
    return augmented_images   

In [8]:
def is_my_model_under_5m_params(model):
    FIVE_MILLION = 5_000_000
    sum = 0
    for x in model.parameters():
        sum += x.numel()
    print(sum)
    if sum <= FIVE_MILLION:
        print("less than 5 million params")
    else:
        print(f"Decrease {sum - FIVE_MILLION} params!!!")

In [34]:
META_FILE_PATH = "dataset/train/batches.meta"

TRAINING_FILE_PATH = "dataset/train/"
VALIDATION_FILE_PATH = "dataset/val/test_batch"
TEST_FILE_PATH = "dataset/test/cifar_test_nolabels.pkl"

IS_LOGGING_ENABLED = True

In [35]:
label_mapping = get_label_names_from_bytes_dict(unpickle(META_FILE_PATH))
if IS_LOGGING_ENABLED:
    print(label_mapping)

{'airplane': 0, 'automobile': 1, 'bird': 2, 'cat': 3, 'deer': 4, 'dog': 5, 'frog': 6, 'horse': 7, 'ship': 8, 'truck': 9}


In [36]:
X_train_np, y_train_np = get_data_from_dir(TRAINING_FILE_PATH)
if IS_LOGGING_ENABLED:
    print(X_train_np.shape)
    print(y_train_np.shape)

(50000, 32, 32, 3)
(50000,)


In [12]:
X_val_np, y_val_np = get_data(VALIDATION_FILE_PATH)
if IS_LOGGING_ENABLED:
    print(X_val_np.shape)
    print(y_val_np.shape)

(10000, 32, 32, 3)
(10000,)


In [13]:
# I want to set the seed for the random transformations for autoaugment policy.
torch.manual_seed(0)

<torch._C.Generator at 0x114c85cd0>

In [14]:
# We change the structure of the image from 32x32x3 to 3x32x32 because the our torchvision augmenter needs data in that style.
"""
X_train = torch.transpose(torch.from_numpy(X_train_np),1,3)
y_train = torch.from_numpy(y_train_np)

X_val = torch.transpose(torch.from_numpy(X_val_np),1,3)
y_val = torch.from_numpy(y_val_np)
"""

# Convert image numpy arrays to torch tensors of type float
X_train = torch.tensor(X_train_np, dtype=torch.float).transpose(1, 3)
y_train = torch.tensor(y_train_np, dtype=torch.long)  # Labels should be of type long

X_val = torch.tensor(X_val_np, dtype=torch.float).transpose(1, 3)
y_val = torch.tensor(y_val_np, dtype=torch.long)


if IS_LOGGING_ENABLED:
    print(X_train.shape)
    print(y_train.shape, end="\n\n")
    print(X_val.shape)
    print(y_val.shape)

torch.Size([50000, 3, 32, 32])
torch.Size([50000])

torch.Size([10000, 3, 32, 32])
torch.Size([10000])


In [15]:
# We use data augmentation only on training data because we will use the validation data as test data
# We could also use it on val data, but I do not think it would be useful.

X_train = preprocess_data(X_train)

In [25]:
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms

class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        """
        images: A numpy array of shape (N, 32, 32, 3) where N is the number of images.
        labels: A numpy array of shape (N,) where N is the number of labels.
        transform: PyTorch transforms for transforms and data augmentation
        """
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Define your transforms
transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image first
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

# Create dataset
train_dataset = CustomDataset(X_train_np, y_train_np, transform=transform)
val_dataset = CustomDataset(X_val_np, y_val_np, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

In [17]:
resnet18 = torchvision.models.resnet18(weights=None)
resnet18

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [18]:
is_my_model_under_5m_params(resnet18)

11689512
Decrease 6689512 params!!!


In [19]:
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                     stride=stride, padding=1, bias=False)

In [30]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample
        #self.downsample = None
        
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample is not None:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

# ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)
        
    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
    
model = ResNet(ResidualBlock, [2, 2, 2])

In [31]:
is_my_model_under_5m_params(model)

195738
less than 5 million params


In [32]:
"""
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
num_epochs = 50

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        running_loss = 0.0  # Optionally reset running loss after printing if you want to print per batch loss
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss:.4f}')
"""

KeyboardInterrupt: 

In [None]:
#Try the Learning Rate Scheduler

import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

# Define the learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    # After each epoch, step the scheduler
    scheduler.step()
    
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')


In [None]:
# Ensure the model is in evaluation mode
model.eval()

# To calculate the accuracy, keep track of the correct predictions and total predictions
correct_predictions = 0
total_predictions = 0

# No gradients need to be calculated
with torch.no_grad():
    for data in val_loader:  # Assuming you use val_dataloader for testing
        inputs, labels = data[0].to(device), data[1].to(device)
        
        # Forward pass through the model
        outputs = model(inputs)
        
        # The class with the highest output is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        
        # Accumulate the total number of predictions and correct predictions
        total_predictions += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

# Calculate the accuracy as correct predictions over total predictions
accuracy = 100 * correct_predictions / total_predictions

print(f'Accuracy of the model on the test set: {accuracy:.2f}%')