In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,random_split

In [None]:
dataset = torchvision.datasets.CIFAR10(root='data/',download=True,transform=transforms.Compose([
    transforms.RandomCrop(32, padding=4,padding_mode='reflect'), # augmentation
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(), # CxHxW
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
]))

test_dataset = torchvision.datasets.CIFAR10(root='data/',train = False,download=True,transform=transforms.Compose([
    transforms.RandomCrop(32, padding=4,padding_mode='reflect'), # augmentation
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(), # CxHxW
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
]))

In [None]:
val_ratio = 0.2
train_dataset, val_dataset = random_split(dataset,[int((1-val_ratio)*len(dataset)), int(val_ratio*len(dataset))])
batch_size =  32 #higher batch size is better
train_dl = DataLoader(train_dataset,batch_size,shuffle=True,pin_memory=True)
val_dl = DataLoader(val_dataset,batch_size,shuffle=True,pin_memory=True)
test_dl = DataLoader(test_dataset,batch_size,pin_memory=True)


In [None]:
import matplotlib.pyplot as plt
from torchvision.utils import make_grid

def denormalize(images,means,std_div):
  means = torch.tensor(means).reshape(1,3,1,1)
  std_div = torch.tensor(std_div).reshape(1,3,1,1)
  return images*std_div + means

def show_preview(dl,normalized):
  for images,labels in dl:
    fig, ax = plt.subplots(figsize=(10,10))
    if(normalized):
      images = denormalize(images,(0.5,0.5,0.5),(0.5,0.5,0.5))
    ax.imshow(make_grid(images,10).permute(1,2,0)) #H,W,C
    break
show_preview(train_dl,1)
show_preview(train_dl,0)

#they appear dark because we had applied normalization

In [None]:
def get_default_devices():
  return torch.device("cuda").type if torch.cuda.is_available() else torch.device("cpu")

def to_device(data,device):
  if(isinstance(data,(list,tuple))):
    return [to_device(x,device) for x in data]
  return data.to(device,non_blocking=True)
  #non blocking means you dont want to block execution of code when transferring code

class DeviceDataLoader():
  ''' wrapper around dataloaders to transfer batches to specified devices'''
  def __init__(self,dl,device):
    self.dl = dl
    self.device = device
  def __iter__(self):
    for b in self.dl:
      yield to_device(b,self.device)
  def __len__(self):
    return len(self.dl)

device = get_default_devices()
train_dl = DeviceDataLoader(train_dl,device)
test_dl = DeviceDataLoader(test_dl,device)
val_dl = DeviceDataLoader(val_dl,device)

### Network architecture

resnets: residual blocks
inception
mobilenet

model = ResnetX(in_channels,num_classes)
logits = model(images)

In [None]:
from typing import OrderedDict
import torch.nn as nn
import torch.nn.functional as F


def conv_block(in_channels,out_channels,use_pool=False):
  layers = [nn.Conv2d(in_channels,out_channels,kernel_size=3,padding=1),
            nn.BatchNorm2d(out_channels), #skewness doesnt develop in some particular channel
            nn.ReLU(),
            nn.Conv2d(out_channels,out_channels,kernel_size=3,padding=1),
            ]
  if(use_pool):
    layers.append(nn.MaxPool2d(2))
  return nn.Sequential(*layers)

class ResnetX(nn.Module):
  def __init__(self,in_channels,num_classes):
    super().__init__()
    self.conv1 = conv_block(in_channels,64)
    self.conv2 = conv_block(64,128,use_pool=True)
    # question: how can 1x1 convolution do the same thing ?
    self.res1 = nn.Sequential(OrderedDict([("conv1 res 1",conv_block(128,128)),("conv2 res 1",conv_block(128,128))])) # Pass key-value pairs as a single list
    self.conv3 = conv_block(128,256)
    self.conv4 = conv_block(256,512,use_pool=True)
    self.res2 = nn.Sequential(conv_block(512,512),conv_block(512,512))
    self.classifier = nn.Sequential(nn.MaxPool2d(4),
                                    nn.Flatten(),
                                    nn.Dropout(0.2), #for good generalization
                                    nn.Linear(2048,num_classes))
  def forward(self,x):
    out = self.conv1(x)
    out = self.conv2(out)
    # why not add x ? why add out?
    out = self.res1(out) + out
    out = self.conv3(out)
    out = self.conv4(out)
    out = self.res2(out) + out
    out = self.classifier(out)
    return out


In [None]:
model = ResnetX(3,10)
# model
# list(model.parameters())

In [None]:
!pip install torchview

In [None]:
from torchview import draw_graph
model_graph = draw_graph(model, torch.zeros(1, 3, 32, 32))
model_graph.visual_graph

### Training the network

In [None]:
def train(model, train_dl, val_dl, epochs, max_lr, loss_function, optim):
    optimizer = optim(model.parameters(), max_lr)
    schedular = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, steps_per_epoch=len(train_dl))

    results = []
    lrs = []

    for epoch in range(epochs):
        model.train()
        train_loss = []

        for images, labels in train_dl:  # for every batch
            optimizer.zero_grad()
            out = model(images)
            loss = loss_function(out, labels)
            train_loss.append(loss)
            loss.backward()  # delta loss/delta_model_parameters
            optimizer.step()
            lrs.append(optimizer.param_groups[0]['lr'])
            schedular.step()

        epoch_train_loss = torch.stack(train_loss).mean()

        model.eval()
        batch_losses, batch_accs = [], []

        with torch.no_grad():
            for images, labels in val_dl:
                out = model(images)
                loss = loss_function(out, labels)
                batch_losses.append(loss)
                acc = torch.argmax(out, dim=1) == labels  # BxN
                batch_accs.append(acc)

        val_loss = torch.stack(batch_losses).mean()
        val_acc = torch.stack(batch_accs).mean()

        results.append({'avg_train_loss': epoch_train_loss, 'avg_val_loss': val_loss, 'avg_val_acc': val_acc, 'lrs':lrs})

    return results


In [None]:
model = to_device(model,device)
epochs = 10
max_lr = 1e-2
loss_func = nn.functional.cross_entropy
optim = torch.optim.Adam
results = train(model,train_dl,val_dl,epochs,max_lr,loss_func,optim)

In [None]:
import matplotlib.pyplot as plt

def plot(results, pairs):
    fig, axes = plt.subplots(len(pairs), figsize=(10, 10))

    # If there's only one plot, axes is not a list, so convert it to a list
    if len(pairs) == 1:
        axes = [axes]

    for i, pair in enumerate(pairs):
        for title, graphs in pair.items():
            axes[i].set_title(title)
            for graph in graphs:
                axes[i].plot(results[graph], label=graph)
            axes[i].legend()  # Correct way to set legend

    plt.tight_layout()
    plt.show()

# Corrected example input with closing quotes for learning rate
plot(results, [{"Accuracies vs epochs": ["avg_val_acc"],
               "Losses vs epochs": ["avg_train_loss", "avg_val_loss"],
               "Learning rate vs Batches": ["lrs"]}])


In [None]:
torch.save(model.state.dict(),"cifar10ResnetX.pth")