In [None]:
# NOTE: This code was used for experiments as-is. the code was ran in Enviornment 2: Kaggle as described in the paper
# Naming and structure may not follow programming best practices.
# Focus is on reproducibility.
#This code was developed for internal experimentation and contains hardcoded values for various test cases.
#It was not refactored for modularity, but the logic matches the experiments reported in the paper.

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import random
import torch.nn.functional as F
import time
from tqdm import tqdm
import math

# 1. Set seeds for reproducibility
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(seed)
##########################
##########################
#set_seed(40)
#set_seed(41)
set_seed(42)
#set_seed(43)
#set_seed(44)
##########################
##########################
# 2. Load CIFAR-10 and apply normalization
def load_cifar10(batch_size=128):
    # CIFAR-10 mean and std for normalization
    mean = (0.4914, 0.4822, 0.4465)
    std = (0.2470, 0.2435, 0.2616)
    
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])
    
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])
    
    trainset = torchvision.datasets.CIFAR10(
        root='./data', train=True, download=True, transform=transform_train)
    trainloader = torch.utils.data.DataLoader(
        trainset, batch_size=batch_size, shuffle=True, num_workers=2)
    
    testset = torchvision.datasets.CIFAR10(
        root='./data', train=False, download=True, transform=transform_test)
    testloader = torch.utils.data.DataLoader(
        testset, batch_size=batch_size, shuffle=False, num_workers=2)
    
    classes = ('plane', 'car', 'bird', 'cat', 'deer', 
               'dog', 'frog', 'horse', 'ship', 'truck')
    
    return trainloader, testloader, classes

# 3. Create custom Mplus activation function
class Mplus(nn.Module):
    def __init__(self):
        super(Mplus, self).__init__()
    
    def forward(self, x):
         #return x * torch.tanh(F.softplus(x))-0.1*x*torch.exp(-0.5*x**2) #(alpha=-0.1)
         #return x * torch.tanh(F.softplus(x))
         #return x * (torch.sigmoid(x)-0.1*torch.exp(-0.5*x**2))#(alpha=-0.1)
         #return x * torch.sigmoid(x)
# 4 & 5. Build ResNet20v2 with Mplus activation
class PreActBlock(nn.Module):
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride=1):
        super(PreActBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.act1 = Mplus()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                              stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.act2 = Mplus()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                              stride=1, padding=1, bias=False)
        
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1,
                         stride=stride, bias=False)
            )
        else:
            self.shortcut = nn.Sequential()
            
    def forward(self, x):
        out = self.bn1(x)
        out = self.act1(out)
        shortcut = self.shortcut(out) if hasattr(self, 'shortcut') else x
        out = self.conv1(out)
        out = self.bn2(out)
        out = self.act2(out)
        out = self.conv2(out)
        out += shortcut
        return out

class ResNet20v2(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet20v2, self).__init__()
        self.in_channels = 16
        
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.layer1 = self._make_layer(block, 16, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 32, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 64, num_blocks[2], stride=2)
        self.bn = nn.BatchNorm2d(64)
        self.act = Mplus()
        self.avg_pool = nn.AvgPool2d(8)
        self.linear = nn.Linear(64, num_classes)
        
    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.bn(out)
        out = self.act(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def create_resnet20v2():
    return ResNet20v2(PreActBlock, [3, 3, 3])  # ResNet-20

# Cosine learning rate scheduler
def cosine_annealing(epoch, total_epochs, initial_lr):
    return initial_lr * 0.5 * (1 + math.cos(math.pi * epoch / total_epochs))

# 6 & 7. Train the model with SGD+Momentum and cosine LR schedule
def train_model(model, trainloader, testloader, epochs=25):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    
    # SGD with momentum instead of Adam
    initial_lr = 0.1
    momentum = 0.9
    weight_decay = 0  # Can be set to 0 based on your previous findings
    optimizer = optim.SGD(model.parameters(), lr=initial_lr, 
                         momentum=momentum, weight_decay=weight_decay)
    
    # Track metrics
    best_train_acc = 0.0
    best_test_acc = 0.0
    lowest_train_loss = float('inf')
    lowest_test_loss = float('inf')
    
    # Training loop
    for epoch in range(epochs):
        # Update learning rate using cosine schedule
        current_lr = cosine_annealing(epoch, epochs, initial_lr)
        for param_group in optimizer.param_groups:
            param_group['lr'] = current_lr
        
        # Training phase
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0
        
        train_bar = tqdm(trainloader, desc=f'Epoch {epoch+1}/{epochs} [Train] lr={current_lr:.5f}')
        for inputs, labels in train_bar:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            train_bar.set_postfix({
                'loss': train_loss / (train_bar.n + 1),
                'acc': 100. * correct / total
            })
        
        train_loss = train_loss / len(trainloader)
        train_acc = 100. * correct / total
        
        # Evaluation phase
        model.eval()
        test_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            test_bar = tqdm(testloader, desc=f'Epoch {epoch+1}/{epochs} [Test]')
            for inputs, labels in test_bar:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
                
                test_bar.set_postfix({
                    'loss': test_loss / (test_bar.n + 1),
                    'acc': 100. * correct / total
                })
        
        test_loss = test_loss / len(testloader)
        test_acc = 100. * correct / total
        
        # Update best metrics
        best_train_acc = max(best_train_acc, train_acc)
        best_test_acc = max(best_test_acc, test_acc)
        lowest_train_loss = min(lowest_train_loss, train_loss)
        lowest_test_loss = min(lowest_test_loss, test_loss)
        
        print(f'Epoch {epoch+1}/{epochs}:')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')
        print(f'Learning Rate: {current_lr:.5f}')
        print('-' * 60)
    
    # 8. Display best metrics
    print('\nTraining Complete!')
    print(f'Best Train Accuracy: {best_train_acc:.2f}%')
    print(f'Best Test Accuracy: {best_test_acc:.2f}%')
    print(f'Lowest Train Loss: {lowest_train_loss:.4f}')
    print(f'Lowest Test Loss: {lowest_test_loss:.4f}')
    
    return model

def main():
    print("Loading CIFAR-10 dataset...")
    trainloader, testloader, classes = load_cifar10()
    
    print("Creating ResNet20v2 model with Mplus activation...")
    model = create_resnet20v2()
    
    print("Starting training...")
    trained_model = train_model(model, trainloader, testloader, epochs=40)
    
    print("Done!")

if __name__ == "__main__":
    main()