Step 0: Import Libraries & Set Up

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


Step 1: Define Depthwise Separable Convolution

In [2]:
class DepthwiseSeparableConv(nn.Module):
    """
    Implements Depthwise Separable Convolution:
    - Depthwise: separate convolution per input channel (groups=in_channels)
    - Pointwise: 1x1 convolution to mix features across channels
    """
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        super().__init__()
        self.depthwise = nn.Conv2d(
            in_channels, in_channels,
            kernel_size=kernel_size, stride=stride,
            padding=kernel_size // 2,
            groups=in_channels,  # depthwise
            bias=False
        )
        self.pointwise = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=1, bias=False  # pointwise
        )
        self.bn = nn.BatchNorm2d(out_channels)
        self.act = nn.ReLU6(inplace=True)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        x = self.act(x)
        return x

Step 2: Implement InvertedResidual Block

In [3]:
class InvertedResidual(nn.Module):
    """
    MobileNetV2 Inverted Residual Block:
    - 1x1 Expansion Conv → ReLU6
    - 3x3 Depthwise Conv → ReLU6
    - 1x1 Projection Conv → Linear (no activation)
    - Optional residual connection if stride == 1 and in/out channels match
    """
    def __init__(self, in_channels, out_channels, stride, expand_ratio):
        super().__init__()
        self.stride = stride
        self.use_residual = (stride == 1 and in_channels == out_channels)
        hidden_dim = in_channels * expand_ratio

        layers = []
        if expand_ratio != 1:
            # Expansion phase
            layers.append(nn.Conv2d(in_channels, hidden_dim, kernel_size=1, bias=False))
            layers.append(nn.BatchNorm2d(hidden_dim))
            layers.append(nn.ReLU6(inplace=True))

        # Depthwise convolution
        layers.append(nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, stride=stride,
                                padding=1, groups=hidden_dim, bias=False))
        layers.append(nn.BatchNorm2d(hidden_dim))
        layers.append(nn.ReLU6(inplace=True))

        # Linear projection
        layers.append(nn.Conv2d(hidden_dim, out_channels, kernel_size=1, bias=False))
        layers.append(nn.BatchNorm2d(out_channels))

        self.block = nn.Sequential(*layers)

    def forward(self, x):
        if self.use_residual:
            return x + self.block(x)
        else:
            return self.block(x)

Step 3: Build MobileNetV2 Architecture

In [4]:
class MobileNetV2(nn.Module):
    """
    Full MobileNetV2 architecture.
    - Initial 3x3 convolution
    - Sequence of InvertedResidual blocks
    - Final 1x1 conv + global average pooling + classifier
    """
    def __init__(self, num_classes=1000, width_mult=1.0):
        super().__init__()
        block = InvertedResidual
        input_channel = 32
        last_channel = 1280

        # (t, c, n, s) → expand_ratio, output_channels, num_blocks, stride
        config = [
            # t, c, n, s
            (1, 16, 1, 1),
            (6, 24, 2, 2),
            (6, 32, 3, 2),
            (6, 64, 4, 2),
            (6, 96, 3, 1),
            (6, 160, 3, 2),
            (6, 320, 1, 1),
        ]

        # Initial layer
        input_channel = int(input_channel * width_mult)
        self.stem = nn.Sequential(
            nn.Conv2d(3, input_channel, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(input_channel),
            nn.ReLU6(inplace=True)
        )

        # Building inverted residual blocks
        layers = []
        for t, c, n, s in config:
            output_channel = int(c * width_mult)
            for i in range(n):
                stride = s if i == 0 else 1
                layers.append(block(input_channel, output_channel, stride, t))
                input_channel = output_channel
        self.features = nn.Sequential(*layers)

        # Final layers
        last_channel = int(last_channel * width_mult) if width_mult > 1.0 else last_channel
        self.conv_last = nn.Sequential(
            nn.Conv2d(input_channel, last_channel, kernel_size=1, bias=False),
            nn.BatchNorm2d(last_channel),
            nn.ReLU6(inplace=True)
        )
        self.classifier = nn.Linear(last_channel, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.features(x)
        x = self.conv_last(x)
        x = x.mean([2, 3])  # Global average pooling
        x = self.classifier(x)
        return x

Example: Create Model and Print Summary

In [7]:
model = MobileNetV2(num_classes=1000).to(device)
print(model)

MobileNetV2(
  (stem): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU6(inplace=True)
  )
  (features): Sequential(
    (0): InvertedResidual(
      (block): Sequential(
        (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU6(inplace=True)
        (3): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (4): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2d(16, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU6(inplace=True)
        (3):

Step 4: Train MobileNetV2 on CIFAR-10

In [8]:
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Data preprocessing
transform = transforms.Compose([
    transforms.Resize(32),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Download CIFAR-10
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)

trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
testloader = DataLoader(testset, batch_size=64, shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data\cifar-10-python.tar.gz


100%|███████████████████████████████████████████████████████████████| 170498071/170498071 [00:24<00:00, 7055379.41it/s]


Extracting ./data\cifar-10-python.tar.gz to ./data
Files already downloaded and verified


 Step 4.2: Define Training Utilities

In [9]:
import torch.optim as optim

# Use smaller model for fast training
model = MobileNetV2(num_classes=10, width_mult=0.5).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

Step 4.3: Training Loop

In [10]:
def train(model, loader, optimizer, criterion):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    avg_loss = running_loss / total
    acc = correct / total
    return avg_loss, acc

Step 4.4: Test Loop

In [11]:
def evaluate(model, loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    avg_loss = running_loss / total
    acc = correct / total
    return avg_loss, acc

Step 4.5: Run Training

In [None]:
EPOCHS = 5
for epoch in range(EPOCHS):
    train_loss, train_acc = train(model, trainloader, optimizer, criterion)
    test_loss, test_acc = evaluate(model, testloader, criterion)

    print(f"Epoch {epoch+1}/{EPOCHS}")
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}")
    print(f"Test  Loss: {test_loss:.4f}, Accuracy: {test_acc:.4f}")
    print('-' * 50)

Epoch 1/5
Train Loss: 2.0202, Accuracy: 0.2455
Test  Loss: 1.7753, Accuracy: 0.3315
--------------------------------------------------
Epoch 2/5
Train Loss: 1.6936, Accuracy: 0.3733
Test  Loss: 1.5950, Accuracy: 0.4091
--------------------------------------------------
Epoch 3/5
Train Loss: 1.5513, Accuracy: 0.4321
Test  Loss: 1.4701, Accuracy: 0.4578
--------------------------------------------------
