In [29]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SeperableConv2d(nn.Module):

  def __init__(self, in_channels, out_channels, kernel_size = 3, stride = 1, padding = 1):
    super(SeperableConv2d, self).__init__()

    self.depthwise = nn.Conv2d(
        in_channels, in_channels,
        kernel_size = kernel_size,
        stride = stride, padding = padding,
        groups = in_channels, bias = False
    )


    self.pointwise = nn.Conv2d(
        in_channels, out_channels,
        kernel_size = 1, stride = 1,
        padding = 0, bias = False
    )

  def forward(self, x):
    out = self.depthwise(x)
    out = self.pointwise(out)
    return out

# We created a seperable convolutional layer which check the image both depthwise like calculate each channel seperately using group = in_channels and get features for each channel
# Then it calculate like normal convolution layer by getting spatial features
# The inception module will do both spatial and channel wise calculations together but in Xception module it will do seperately


In [30]:
class ResidualBlock(nn.Module):
    def __init__(self, main_path, shortcut):
        super().__init__()
        self.main_path = main_path
        self.shortcut = shortcut

    def forward(self, x):
        return self.main_path(x) + self.shortcut(x)

# The Residual Block will add the two paths through skip connections here

In [31]:
class MiniXception(nn.Module):

  def __init__(self, num_classes = 10):
    super(MiniXception, self).__init__()


    self.conv1 = nn.Conv2d(3, 32, 3, padding = 1)
    self.bn1 = nn.BatchNorm2d(32)
    self.relu = nn.ReLU(inplace = True)

    self.conv2 = nn.Conv2d(32, 64, 3, padding = 1)
    self.bn2 = nn.BatchNorm2d(64)


    self.block1 = self._make_residual_block(64, 128, stride = 2)
    self.block2 = self._make_residual_block(128, 256, stride = 2)
    self.block3  = self._make_residual_block(256, 256, stride = 1)

    self.sep_final = SeperableConv2d(256, 512, kernel_size = 3, stride = 1, padding = 1)
    self.bn_final = nn.BatchNorm2d(512)

    self.gap = nn.AdaptiveAvgPool2d((1,1))
    self.fc = nn.Linear(512, num_classes)


  def _make_residual_block(self, in_c, out_c, stride):

    layers = []
    layers.append(nn.ReLU(inplace = True))
    layers.append(SeperableConv2d(in_c, out_c, kernel_size = 3, stride = stride, padding = 1))
    layers.append(nn.BatchNorm2d(out_c))

    layers.append(nn.ReLU(inplace = True))
    layers.append(SeperableConv2d(out_c, out_c, kernel_size = 3, stride = 1, padding = 1))
    layers.append(nn.BatchNorm2d(out_c))

    main_path = nn.Sequential(*layers)

    shortcut = nn.Sequential()
    if stride != 1:
      shortcut = nn.Sequential(
          nn.Conv2d(in_c, out_c, 1,  stride = stride, bias = False),
          nn.BatchNorm2d(out_c)
      )

    return ResidualBlock(main_path, shortcut)


  def forward(self, x):
      # Entry
      x = self.conv1(x)
      x = self.bn1(x)
      x = self.relu(x)
      x = self.conv2(x)
      x = self.bn2(x)

      # Middle (Residuals)
      x = self.block1(x)
      x = self.block2(x)
      x = self.block3(x)

      # Exit
      x = self.sep_final(x)
      x = self.bn_final(x)
      x = self.relu(x)

      x = self.gap(x)
      x = x.view(x.size(0), -1)
      x = self.fc(x)
      return x


In [32]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


In [33]:
model = MiniXception().to(device)

In [34]:
from torchvision import datasets, transforms
import time
from torch.utils.data import DataLoader
import torch.optim as optim


train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomCrop(32, padding = 4),
    transforms.ToTensor(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.Normalize(mean = (0.5,0.5,0.5), std = (0.5,0.5,0.5))
])
# we used random rotation, horizontal flip and crop to make the model to identify inconsistencies by augmenting new data

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
])

In [35]:
train_dataset = datasets.CIFAR10(root = 'data', train = True, download = True, transform = train_transform)
test_dataset = datasets.CIFAR10(root = "data", train = False, download = True, transform = test_transform)

In [36]:
PIN_MEMORY = True if torch.cuda.is_available() else False
NUM_WORKERS = 2
BATCH_SIZE = 128
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = True,
                          pin_memory = PIN_MEMORY, num_workers = NUM_WORKERS, drop_last = True, persistent_workers= True)
test_loader = DataLoader(test_dataset, batch_size = BATCH_SIZE, shuffle = False,
                         pin_memory = PIN_MEMORY, num_workers = NUM_WORKERS, drop_last = False)

In [37]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr = 0.001, weight_decay=1e-4)

In [38]:
def train_one_epoch(dataloader, model, loss_fn, optimizer):

  model.train()
  running_loss = 0.0
  correct = 0
  total = 0


  for X,y in dataloader:
    X,y = X.to(device), y.to(device)

    optimizer.zero_grad()
    pred = model(X)
    loss = loss_fn(pred, y)
    loss.backward()

    optimizer.step()

    running_loss += loss.item()
    _, predicted = torch.max(pred,1)
    total += y.size(0)
    correct += (predicted == y).sum().item()

  return running_loss/len(dataloader), correct/total


In [39]:
def evaluate(dataloader, model, loss_fn):

  model.eval()
  running_loss = 0.0
  correct = 0
  total = 0

  with torch.no_grad():
    for X,y in dataloader:
      X, y = X.to(device), y.to(device)

      pred = model(X)
      loss = loss_fn(pred, y)
      running_loss += loss.item()
      _, predicted = torch.max(pred, 1)
      total += y.size(0)
      correct += (predicted == y).sum().item()

  return running_loss/len(dataloader), correct/total

In [None]:
EPOCHS = 25
start_time = time.time()

for epoch in range(EPOCHS):
  train_loss, train_acc = train_one_epoch(train_loader, model, loss_fn, optimizer)
  test_loss, test_acc = evaluate(test_loader, model, loss_fn)

  print(f"Epoch {epoch+1}/{EPOCHS} | "
          f"Train: {train_loss:.4f} ({train_acc:.2f}%) | "
          f"Test: {test_loss:.4f} ({test_acc:.2f}%)")

total_time = time.time() - start_time
print(f"Total time: {total_time:.2f} seconds")

# the model stopped learning after 15th epoch we should use learning scheduler to change the learning pace

Epoch 1/25 | Train: 1.3784 (0.49%) | Test: 1.3770 (0.56%)
Epoch 2/25 | Train: 0.9623 (0.66%) | Test: 0.9174 (0.68%)
Epoch 3/25 | Train: 0.7904 (0.72%) | Test: 0.7687 (0.74%)
Epoch 4/25 | Train: 0.6853 (0.76%) | Test: 0.6569 (0.77%)
Epoch 5/25 | Train: 0.6208 (0.78%) | Test: 0.6027 (0.79%)
Epoch 6/25 | Train: 0.5752 (0.80%) | Test: 0.6204 (0.79%)
Epoch 7/25 | Train: 0.5389 (0.81%) | Test: 0.5565 (0.81%)
Epoch 8/25 | Train: 0.5011 (0.83%) | Test: 0.5370 (0.83%)
Epoch 9/25 | Train: 0.4768 (0.83%) | Test: 0.5463 (0.82%)
Epoch 10/25 | Train: 0.4481 (0.84%) | Test: 0.4938 (0.83%)
Epoch 11/25 | Train: 0.4301 (0.85%) | Test: 0.5456 (0.83%)
Epoch 12/25 | Train: 0.4128 (0.86%) | Test: 0.5175 (0.83%)
Epoch 13/25 | Train: 0.3981 (0.86%) | Test: 0.4742 (0.85%)
Epoch 14/25 | Train: 0.3831 (0.87%) | Test: 0.5123 (0.84%)
Epoch 15/25 | Train: 0.3644 (0.87%) | Test: 0.4602 (0.85%)
Epoch 16/25 | Train: 0.3553 (0.88%) | Test: 0.4694 (0.85%)
Epoch 17/25 | Train: 0.3423 (0.88%) | Test: 0.4751 (0.85%)
Epoch 

In [None]:
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
EPOCHS = 50

# Learning Rate Scheduler (Cosine Annealing)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

for epoch in range(EPOCHS):
    train_loss, train_acc = train_one_epoch(train_loader, model, loss_fn, optimizer)
    test_loss, test_acc = evaluate(test_loader, model, loss_fn)

    scheduler.step()

    current_lr = scheduler.get_last_lr()[0]

    print(f"Epoch {epoch+1}/{EPOCHS} | LR: {current_lr:.6f} | "
          f"Train: {train_loss:.4f} ({train_acc:.2f}%) | "
          f"Test: {test_loss:.4f} ({test_acc:.2f}%)")

Epoch 1/50 | LR: 0.000999 | Train: 1.4486 (0.47%) | Test: 1.1347 (0.59%)
Epoch 2/50 | LR: 0.000996 | Train: 1.0178 (0.64%) | Test: 0.8783 (0.69%)
Epoch 3/50 | LR: 0.000991 | Train: 0.8319 (0.71%) | Test: 0.7207 (0.75%)
Epoch 4/50 | LR: 0.000984 | Train: 0.7277 (0.75%) | Test: 0.6590 (0.77%)
Epoch 5/50 | LR: 0.000976 | Train: 0.6633 (0.77%) | Test: 0.6205 (0.79%)
Epoch 6/50 | LR: 0.000965 | Train: 0.6129 (0.79%) | Test: 0.6400 (0.79%)
Epoch 7/50 | LR: 0.000952 | Train: 0.5777 (0.80%) | Test: 0.5653 (0.81%)
Epoch 8/50 | LR: 0.000938 | Train: 0.5488 (0.81%) | Test: 0.5419 (0.82%)
Epoch 9/50 | LR: 0.000922 | Train: 0.5174 (0.82%) | Test: 0.5628 (0.81%)
Epoch 10/50 | LR: 0.000905 | Train: 0.4938 (0.83%) | Test: 0.4936 (0.84%)
Epoch 11/50 | LR: 0.000885 | Train: 0.4744 (0.84%) | Test: 0.4792 (0.84%)
Epoch 12/50 | LR: 0.000864 | Train: 0.4532 (0.84%) | Test: 0.4823 (0.85%)
Epoch 13/50 | LR: 0.000842 | Train: 0.4405 (0.85%) | Test: 0.4473 (0.85%)
Epoch 14/50 | LR: 0.000819 | Train: 0.4176 (0.8

In [13]:
!pip install timm



In [40]:
class SEBlock(nn.Module):
  def __init__(self, in_channels, reduction = 16):
    super(SEBlock, self).__init__()

    self.squeeze = nn.AdaptiveAvgPool2d(1)
    self.excitation = nn.Sequential(
        nn.Linear(in_channels, in_channels // reduction, bias  = False),
        nn.ReLU(inplace = True),
        nn.Linear(in_channels // reduction, in_channels, bias = False),
        nn.Sigmoid()
    )


  def forward(self, x):
    b, c, _, _ = x.size()
    y = self.squeeze(x).view(b, c)
    y = self.excitation(y).view(b, c, 1, 1)
    return x*y

# SENet architecture block is like a attention mechanism for CNN which squeeze the incoming layer into important channels and using relu to check for important channes
# The Relu will kill unnecessary neurons which doesnt confidence in its predictions and then another layer convert back to its original size with selected important features


In [41]:
class ResidualSEBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualSEBlock, self).__init__()

        self.main_path = nn.Sequential(
            nn.ReLU(inplace=False),
            SeperableConv2d(in_channels, out_channels, stride=stride),
            nn.BatchNorm2d(out_channels),

            nn.ReLU(inplace=True),
            SeperableConv2d(out_channels, out_channels, stride=1),
            nn.BatchNorm2d(out_channels)
        )


        self.se = SEBlock(out_channels, reduction=16)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.main_path(x)
        out = self.se(out)      # Apply Attention
        out += self.shortcut(x) # Add Residual
        return out

# same Xception module as before we added the attention block within residual block

In [42]:
class MiniXceptionSE(nn.Module):
    def __init__(self, num_classes=10):
        super(MiniXceptionSE, self).__init__()

        # Entry Flow
        self.entry = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),

            nn.Conv2d(32, 64, 3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.block1 = ResidualSEBlock(64, 128, stride=2)
        self.block2 = ResidualSEBlock(128, 256, stride=2)
        self.block3 = ResidualSEBlock(256, 728, stride=2)

        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(0.4) # Slightly lower dropout since SE acts as regularization
        self.fc = nn.Linear(728, num_classes)

    def forward(self, x):
        x = self.entry(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.gap(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.fc(x)
        return x


In [43]:
model = MiniXceptionSE(num_classes=10).to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

EPOCHS = 50
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

In [44]:
start_time = time.time()

for epoch in range(EPOCHS):
    train_loss, train_acc = train_one_epoch(train_loader, model, loss_fn, optimizer)
    test_loss, test_acc = evaluate(test_loader, model, loss_fn)
    scheduler.step()
    lr = scheduler.get_last_lr()[0]

    print(f"Epoch {epoch+1}/{EPOCHS} | LR: {lr:.6f} | "
          f"Train: {train_loss:.4f} ({train_acc:.2f}%) | "
          f"Test: {test_loss:.4f} ({test_acc:.2f}%)")

total_time = time.time() - start_time
print(f"Done in {total_time:.1f}s")

Epoch 1/50 | LR: 0.000999 | Train: 1.5413 (0.43%) | Test: 1.2289 (0.56%)
Epoch 2/50 | LR: 0.000996 | Train: 1.1441 (0.59%) | Test: 0.9632 (0.66%)
Epoch 3/50 | LR: 0.000991 | Train: 0.9626 (0.66%) | Test: 0.8619 (0.70%)
Epoch 4/50 | LR: 0.000984 | Train: 0.8508 (0.70%) | Test: 0.8297 (0.72%)
Epoch 5/50 | LR: 0.000976 | Train: 0.7731 (0.73%) | Test: 0.6879 (0.76%)
Epoch 6/50 | LR: 0.000965 | Train: 0.7064 (0.75%) | Test: 0.6935 (0.77%)
Epoch 7/50 | LR: 0.000952 | Train: 0.6645 (0.77%) | Test: 0.6221 (0.79%)
Epoch 8/50 | LR: 0.000938 | Train: 0.6245 (0.78%) | Test: 0.5966 (0.79%)
Epoch 9/50 | LR: 0.000922 | Train: 0.5932 (0.79%) | Test: 0.5416 (0.81%)
Epoch 10/50 | LR: 0.000905 | Train: 0.5660 (0.80%) | Test: 0.5507 (0.81%)
Epoch 11/50 | LR: 0.000885 | Train: 0.5402 (0.81%) | Test: 0.5049 (0.83%)
Epoch 12/50 | LR: 0.000864 | Train: 0.5216 (0.82%) | Test: 0.5036 (0.83%)
Epoch 13/50 | LR: 0.000842 | Train: 0.5004 (0.83%) | Test: 0.4837 (0.84%)
Epoch 14/50 | LR: 0.000819 | Train: 0.4803 (0.8