---

# <center>★ Implementation - Xception ★

<center><img src="https://raw.githubusercontent.com/Masterx-AI/Xception_Implementation/main/xception.jpg" style="width: 600px;"/>

---

### Objective:
- Understand the Model Architecture
- Reconstruct the Model Architecture from scratch
- Perform a dry run test to assess it's implemenation on real-time data.

---

### Xception Model Description:

onvolutional Neural Networks (CNN) have come a long way, from the LeNet-style, AlexNet, VGG models, which used simple stacks of convolutional layers for feature extraction and max-pooling layers for spatial sub-sampling, stacked one after the other, to Inception and ResNet networks which use skip connections and multiple convolutional and max-pooling blocks in each layer. Since its introduction, one of the best networks in computer vision has been the Inception network. The Inception model uses a stack of modules, each module containing a bunch of feature extractors, which allow them to learn richer representations with fewer parameters.

Xception paper — https://arxiv.org/abs/1610.02357

The Xception module has 3 main parts. The Entry flow, the Middle flow (which is repeated 8 times), and the Exit flow.


Entry flow of the Xception architecture (Source: Image from the original paper)
The entry flow has two blocks of convolutional layer followed by a ReLU activation. The diagram also mentions in detail the number of filters, the filter size (kernel size), and the strides.

There are also various Separable convolutional layers. There are also Max Pooling layers. When the strides are different than one, the strides are also mentioned. There are also Skip connections, where we use ‘ADD’ to merge the two tensors. It also shows the shape of the input tensor in each flow. For example, we begin with an image size of 299x299x3, and after the entry flow, we get an image size of 19x19x728.


Middle and Exit flow of Xception architecture (Source: Image from the original paper)
Similarly, for the Middle flow and the Exit flow, this diagram clearly explains the image size, the various layers, the number of filters, the shape of filters, the type of pooling, the number of repetitions, and the option of adding a fully connected layer in the end.

Also, all Convolutional and Separable Convolutional layers are followed by batch normalization.

---

## <center> Stractegic Plan of Action:
    
**We aim to solve the problem statement by creating a plan of action, Here are some of the necessary steps:**
1. Dataset Prepration
2. Model Development
3. Model Testing
4. Project Outcomes & Conclusion

---

In [5]:
! pip install torchinfo


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [6]:
import torch

num_gpus = torch.cuda.device_count()
print(f'Number of GPUs available: {num_gpus}')

# Optionally, print the name of each GPU
for i in range(num_gpus):
    print(f'GPU {i}: {torch.cuda.get_device_name(i)}')


Number of GPUs available: 0


## <center> 1. Dataset Prepration

In [7]:
#Importing the basic librarires

import numpy as np

# Preparing the dataset

In [8]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import random_split, DataLoader

# Preparing the dataset
# Define preprocessing for training and validation
transform_train = transforms.Compose([
    transforms.Resize((320, 320)),  # Resize to allow random cropping
    transforms.RandomResizedCrop(299),  # Random crop to 299x299
    transforms.RandomHorizontalFlip(),  # Augmentation for training
    transforms.ToTensor(),  # Convert to tensor (scales to [0, 1])
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet normalization
                         std=[0.229, 0.224, 0.225])
])

transform_test = transforms.Compose([
    transforms.Resize((299, 299)),  # Resize directly to 299x299
    transforms.CenterCrop(299),  # Center crop for consistency
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Download and load CIFAR10 training data
full_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)

# Split train set into training and validation (e.g., 45k train, 5k val)
train_size = int(0.9 * len(full_trainset))
val_size = len(full_trainset) - train_size
train_set, val_set = random_split(full_trainset, [train_size, val_size])

# For validation, use test transform (no augmentation)
val_set.dataset.transform = transform_test

# Load CIFAR10 test dataset
test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

# Create DataLoaders
batch_size = 128
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)


100%|██████████| 170M/170M [00:13<00:00, 12.5MB/s] 


---

## <center>2. Model Development

In [9]:
# Building the Xception Model Architecture


import torch
import torch.nn as nn
import torch.nn.functional as F

class SeparableConv2D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0, stride=1):
        super(SeparableConv2D, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding, groups=in_channels, bias=False)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

class EntryFlow(nn.Module):
    def __init__(self):
        super(EntryFlow, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        self.block_sizes = [128, 256, 728]
        self.blocks = nn.ModuleList([])
        for i, size in enumerate(self.block_sizes):
            in_channels = 64 if i == 0 else self.block_sizes[i-1]
            block = nn.Sequential(
                nn.ReLU(),
                SeparableConv2D(in_channels, size, kernel_size=3, padding=1),
                nn.BatchNorm2d(size),
                nn.ReLU(),
                SeparableConv2D(size, size, kernel_size=3, padding=1),
                nn.BatchNorm2d(size),
                nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            )
            self.blocks.append(block)
        self.residual_convs = nn.ModuleList([
            nn.Conv2d(64, 128, kernel_size=1, stride=2, padding=0),
            nn.Conv2d(128, 256, kernel_size=1, stride=2, padding=0),
            nn.Conv2d(256, 728, kernel_size=1, stride=2, padding=0),
        ])

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        previous_block_activation = x
        for i, block in enumerate(self.blocks):
            x1 = block(previous_block_activation)
            residual = self.residual_convs[i](previous_block_activation)
            x = x1 + residual
            previous_block_activation = x
        return x

class MiddleFlow(nn.Module):
    def __init__(self, num_blocks=8):
        super(MiddleFlow, self).__init__()
        self.num_blocks = num_blocks
        self.blocks = nn.ModuleList([])
        for _ in range(num_blocks):
            self.blocks.append(nn.Sequential(
                nn.ReLU(),
                SeparableConv2D(728, 728, kernel_size=3, padding=1),
                nn.BatchNorm2d(728),
                nn.ReLU(),
                SeparableConv2D(728, 728, kernel_size=3, padding=1),
                nn.BatchNorm2d(728),
                nn.ReLU(),
                SeparableConv2D(728, 728, kernel_size=3, padding=1),
                nn.BatchNorm2d(728),
            ))

    def forward(self, x):
        previous_block_activation = x
        for block in self.blocks:
            x1 = block(x)
            x = x1 + previous_block_activation
            previous_block_activation = x
        return x

class ExitFlow(nn.Module):
    def __init__(self, num_classes=10):
        super(ExitFlow, self).__init__()
        self.relu = nn.ReLU()
        self.sepconv1 = SeparableConv2D(728, 728, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(728)
        self.sepconv2 = SeparableConv2D(728, 1024, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(1024)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.residual_conv = nn.Conv2d(728, 1024, kernel_size=1, stride=2)

        self.sepconv3 = SeparableConv2D(1024, 1536, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(1536)
        self.sepconv4 = SeparableConv2D(1536, 2048, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(2048)

        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)

    def forward(self, x):
        previous_block_activation = x
        x = self.relu(x)
        x = self.bn1(self.sepconv1(x))
        x = self.relu(x)
        x = self.bn2(self.sepconv2(x))
        x = self.maxpool(x)
        residual = self.residual_conv(previous_block_activation)
        x = x + residual

        x = self.bn3(self.sepconv3(x))
        x = self.relu(x)
        x = self.bn4(self.sepconv4(x))
        x = self.relu(x)

        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

class Xception(nn.Module):
    def __init__(self, num_classes=10):
        super(Xception, self).__init__()
        self.entry_flow = EntryFlow()
        self.middle_flow = MiddleFlow()
        self.exit_flow = ExitFlow(num_classes)

    def forward(self, x):
        x = self.entry_flow(x)
        x = self.middle_flow(x)
        x = self.exit_flow(x)
        return x

# Instantiate the model
model = Xception(num_classes=10)


# Checking Model Summary
from torchinfo import summary
summary(model, input_size=(1, 3, 299, 299))



Layer (type:depth-idx)                        Output Shape              Param #
Xception                                      [1, 10]                   --
├─EntryFlow: 1-1                              [1, 728, 19, 19]          --
│    └─Conv2d: 2-1                            [1, 32, 150, 150]         896
│    └─BatchNorm2d: 2-2                       [1, 32, 150, 150]         64
│    └─Conv2d: 2-3                            [1, 64, 150, 150]         18,496
│    └─BatchNorm2d: 2-4                       [1, 64, 150, 150]         128
│    └─ModuleList: 2-9                        --                        (recursive)
│    │    └─Sequential: 3-1                   [1, 128, 75, 75]          26,816
│    └─ModuleList: 2-10                       --                        (recursive)
│    │    └─Conv2d: 3-2                       [1, 128, 75, 75]          8,320
│    └─ModuleList: 2-9                        --                        (recursive)
│    │    └─Sequential: 3-3                   [1, 256, 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
import json
import time

# Assume model, train_loader, val_loader defined and moved to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Xception(num_classes=10)
model = torch.nn.DataParallel(model)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# Define learning rate scheduler (decay lr by 0.94 every 2 epochs)
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.94)
scaler = GradScaler()  # For dynamic gradient scaling


def train_and_validate(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=5):
    scaler = torch.cuda.amp.GradScaler()
    
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
        'time_per_epochs': []
    }
    
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        start = time.time()

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            # Step the scheduler to update learning rate
            #scheduler.step()
            
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / total
        train_acc = correct / total
        step_time = time.time() - start

        # Validation phase
        model.eval()
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                running_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)

        val_loss = running_loss / total
        val_acc = correct / total
        print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

        # Save metrics for each epoch
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['time_per_epochs'].append(step_time)
        
    # Convert dictionary to JSON string
    history =  json.dumps(history) 
    
    # Option 2: Write the JSON string  to a file
    with open('../histories/history_xception.json', 'w') as f:
        f.write(history)
    return model, history



# model, history = train_and_validate(model, train_loader, val_loader, optimizer, criterion, device, num_epochs=5)


def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    epoch_loss = running_loss / total
    accuracy = correct / total
    return epoch_loss, accuracy

# Final evaluation on test set
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")


  scaler = GradScaler()  # For dynamic gradient scaling


KeyboardInterrupt: 

In [None]:
num_epochs = 10
model, history = train_and_validate(model, train_loader, val_loader, optimizer, criterion, device, num_epochs)

In [None]:
torch.save({
    'epoch': num_epochs,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
}, '../models/xception_model.pth')


In [None]:
import matplotlib.pyplot as plt

def plot_training_history(history):
    epochs = range(1, len(history['train_loss']) + 1)

    plt.figure(figsize=(12, 5))

    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history['train_loss'], 'b-', label='Training Loss')
    plt.plot(epochs, history['val_loss'], 'r-', label='Validation Loss')
    plt.title('Loss vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plot Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history['train_acc'], 'b-', label='Training Accuracy')
    plt.plot(epochs, history['val_acc'], 'r-', label='Validation Accuracy')
    plt.title('Accuracy vs Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()


In [None]:
# Final evaluation on test set
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"Test Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.4f}")
history = json.loads(history)
plot_training_history(history)

---

# <center>4. Project Outcomes & Conclusion

### Here are some of the key outcomes of the project:

- The Model Architecture was reconstructed from scratch with no errors.
- We were able to plot the model graph & observe input & output shapes of all the layers. 
- Also we could identify that the total trainable paramters for the Xception model were 20.8M.
- To demonstrate it's functionality, we have tested the model for just 10 epochs (due to hardware limitations). 
- Despite this, the model has performed surpisingly well achieving high accuracy within few iterations.

In [None]:
#<<<--------------------------------------THE END---------------------------------------->>>