### Importing Packages

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt
import numpy as np
import time

### Checking GPU Utilization

In [None]:
import torch
print("MPS available:", torch.backends.mps.is_available())
print("MPS built:", torch.backends.mps.is_built())
print("Device count:", torch.mps.device_count())

# Setting device
if torch.backends.mps.is_available():
    device = torch.device("mps")
    print("✅ Using MPS (GPU acceleration)")
else:
    device = torch.device("cpu")
    print("❌ Using CPU only")

print("Current device:", device)

MPS available: True
MPS built: True
Device count: 1
✅ Using MPS (GPU acceleration)
Current device: mps


### Convolution Expansion and Projection

In [3]:
class InvertedResidual(nn.Module):
    def __init__(self, in_channels, out_channels, stride, expansion_ratio=6):
        super(InvertedResidual, self).__init__()
        hidden_dim = int(in_channels * expansion_ratio)
        self.use_residual = stride == 1 and in_channels == out_channels
        
        layers = []
        # Pointwise convolution for expansion
        if expansion_ratio != 1:
            layers.append(nn.Conv2d(in_channels, hidden_dim, 1, bias=False))
            layers.append(nn.BatchNorm2d(hidden_dim))
            layers.append(nn.ReLU6(inplace=True))
        
        # Depthwise convolution
        layers.append(nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, 
                               groups=hidden_dim, bias=False))
        layers.append(nn.BatchNorm2d(hidden_dim))
        layers.append(nn.ReLU6(inplace=True))
        
        # Pointwise convolution for projection
        layers.append(nn.Conv2d(hidden_dim, out_channels, 1, bias=False))
        layers.append(nn.BatchNorm2d(out_channels))
        
        self.conv = nn.Sequential(*layers)
    
    def forward(self, x):
        if self.use_residual:
            return x + self.conv(x)
        else:
            return self.conv(x)

### Building Layers

In [4]:
class MobileNetV4Base(nn.Module):
    def __init__(self, num_classes=10, width_mult=1.0):
        super(MobileNetV4Base, self).__init__()
        
        # Initial convolution layer
        input_channel = 32
        last_channel = 1280
        
        # MobileNetV4 configuration
        inverted_residual_setting = [
            # t, c, n, s
            [1, 16, 1, 1],
            [6, 24, 2, 2],
            [6, 32, 3, 2],
            [6, 64, 4, 2],
            [6, 96, 3, 1],
            [6, 160, 3, 2],
            [6, 320, 1, 1],
        ]
        
        # Building first layer
        input_channel = int(input_channel * width_mult)
        self.last_channel = int(last_channel * max(1.0, width_mult))
        
        features = [nn.Sequential(
            nn.Conv2d(3, input_channel, 3, 2, 1, bias=False),
            nn.BatchNorm2d(input_channel),
            nn.ReLU6(inplace=True)
        )]
        
        # Building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = int(c * width_mult)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(InvertedResidual(input_channel, output_channel, 
                                               stride, t))
                input_channel = output_channel
        
        # Building last several layers
        features.append(nn.Sequential(
            nn.Conv2d(input_channel, self.last_channel, 1, bias=False),
            nn.BatchNorm2d(self.last_channel),
            nn.ReLU6(inplace=True)
        ))
        
        self.features = nn.Sequential(*features)
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, num_classes)
        )
        
        # Weight initialization
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)
    
    def forward(self, x):
        x = self.features(x)
        x = F.adaptive_avg_pool2d(x, (1, 1))
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

### Training Model

In [None]:
# Training function
def train_model(model, train_loader, test_loader, device, epochs=10, lr=0.01):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
    
    train_losses = []
    test_accuracies = []
    
    # Adding timer
    start_time = time.time()
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        epoch_start = time.time()  # Start time for this epoch
        
        for batch_idx, (data, target) in enumerate(train_loader):
            # Move data to GPU
            data, target = data.to(device), target.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f'Epoch: {epoch+1:2d}/{epochs} | '
                      f'Batch: {batch_idx:4d}/{len(train_loader)} | '
                      f'Loss: {loss.item():.4f}')
        
        avg_loss = running_loss / len(train_loader)
        train_losses.append(avg_loss)
        
        # Evaluating on test set
        model.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, target in test_loader:
                # Move data to GPU
                data, target = data.to(device), target.to(device)
                outputs = model(data)
                _, predicted = torch.max(outputs.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()
        
        accuracy = 100 * correct / total
        test_accuracies.append(accuracy)
        
        # Printing epoch time
        epoch_time = time.time() - epoch_start
        total_time = time.time() - start_time
        print(f'Epoch {epoch+1:2d}/{epochs} | '
              f'Loss: {avg_loss:.4f} | '
              f'Accuracy: {accuracy:.2f}% | '
              f'Epoch Time: {epoch_time:.1f}s | '
              f'Total Time: {total_time/60:.1f}m')
        
        scheduler.step()
    
    return train_losses, test_accuracies


### Data Preparation

In [13]:
def prepare_data(batch_size=64):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    train_dataset = datasets.CIFAR10(root='./data', train=True, 
                                    download=True, transform=transform)
    test_dataset = datasets.CIFAR10(root='./data', train=False, 
                                   download=True, transform=transform)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                             shuffle=True, num_workers=2)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, 
                            shuffle=False, num_workers=2)
    
    return train_loader, test_loader

### Main Execution

In [14]:
if __name__ == "__main__":
    # Setup device with MPS support for Apple Silicon
    if torch.backends.mps.is_available():
        device = torch.device("mps")
        print("✅ USING MPS (APPLE SILICON GPU ACCELERATION)")
    else:
        device = torch.device("cpu")
        print("⚠️  USING CPU ONLY (NO GPU ACCELERATION)")
    
    print(f"Device: {device}")
    
    # Preparing data with larger batch size for better GPU utilization
    train_loader, test_loader = prepare_data(batch_size=128)  # Increased from 64
    
    # Creating and train base model
    print("\nTraining Base MobileNetV4 Model...")
    base_model = MobileNetV4Base(num_classes=10).to(device)  # Move model to GPU
    
    # Adding this to monitor GPU memory usage
    if torch.backends.mps.is_available():
        print("GPU Memory allocated:", torch.mps.current_allocated_memory() / 1024**2, "MB")
    
    train_losses_base, test_accuracies_base = train_model(
        base_model, train_loader, test_loader, device, epochs=20, lr=0.001
    )
    
    print(f"Final Test Accuracy (Base): {test_accuracies_base[-1]:.2f}%")
    
    # Additional GPU info
    if torch.backends.mps.is_available():
        print("Peak GPU Memory usage:", torch.mps.driver_allocated_memory() / 1024**3, "GB")

✅ USING MPS (APPLE SILICON GPU ACCELERATION)
Device: mps

Training Base MobileNetV4 Model...
GPU Memory allocated: 34.7490234375 MB
Epoch:  1/20 | Batch:    0/391 | Loss: 2.3047
Epoch:  1/20 | Batch:  100/391 | Loss: 1.9696
Epoch:  1/20 | Batch:  200/391 | Loss: 1.7546
Epoch:  1/20 | Batch:  300/391 | Loss: 1.6737
Epoch  1/20 | Loss: 1.9147 | Accuracy: 40.36% | Epoch Time: 50.0s | Total Time: 0.8m
Epoch:  2/20 | Batch:    0/391 | Loss: 1.5431
Epoch:  2/20 | Batch:  100/391 | Loss: 1.5763
Epoch:  2/20 | Batch:  200/391 | Loss: 1.5947
Epoch:  2/20 | Batch:  300/391 | Loss: 1.5678
Epoch  2/20 | Loss: 1.5275 | Accuracy: 49.24% | Epoch Time: 42.0s | Total Time: 1.5m
Epoch:  3/20 | Batch:    0/391 | Loss: 1.4828
Epoch:  3/20 | Batch:  100/391 | Loss: 1.3814
Epoch:  3/20 | Batch:  200/391 | Loss: 1.4382
Epoch:  3/20 | Batch:  300/391 | Loss: 1.2817
Epoch  3/20 | Loss: 1.3474 | Accuracy: 53.77% | Epoch Time: 42.5s | Total Time: 2.2m
Epoch:  4/20 | Batch:    0/391 | Loss: 1.2185
Epoch:  4/20 | 