##Import libraries

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler

In [23]:
class Block(nn.Module):
    """Bottleneck block for ResNet-50, 101, 152"""
    def __init__(self, in_channels, intermediate_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.expansion = 4

        # 1x1 convolution
        self.conv1 = nn.Conv2d(
            in_channels,
            intermediate_channels,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(intermediate_channels)

        # 3x3 convolution
        self.conv2 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(intermediate_channels)

        # 1x1 convolution (expansion)
        self.conv3 = nn.Conv2d(
            intermediate_channels,
            intermediate_channels * self.expansion,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=False
        )
        self.bn3 = nn.BatchNorm2d(intermediate_channels * self.expansion)

        self.relu = nn.ReLU(inplace=True)
        self.identity_downsample = identity_downsample
        self.stride = stride

    def forward(self, x):
        identity = x.clone()

        # First conv block
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        # Second conv block
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        # Third conv block
        x = self.conv3(x)
        x = self.bn3(x)

        # Apply identity downsample if needed
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        # Add skip connection
        x += identity
        x = self.relu(x)

        return x



In [24]:
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64

        # Initial convolution
        self.conv1 = nn.Conv2d(
            image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False
        )
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet layers
        self.layer1 = self._make_layer(
            block, layers[0], intermediate_channels=64, stride=1
        )
        self.layer2 = self._make_layer(
            block, layers[1], intermediate_channels=128, stride=2
        )
        self.layer3 = self._make_layer(
            block, layers[2], intermediate_channels=256, stride=2
        )
        self.layer4 = self._make_layer(
            block, layers[3], intermediate_channels=512, stride=2
        )

        # Classification head
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        # Initial layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # ResNet layers
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        # Classification
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
        identity_downsample = None
        layers = []

        # Check if we need to downsample
        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(
                    self.in_channels,
                    intermediate_channels * 4,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(intermediate_channels * 4),
            )

        # First block (may downsample)
        layers.append(
            block(self.in_channels, intermediate_channels, identity_downsample, stride)
        )

        # Update in_channels
        self.in_channels = intermediate_channels * 4

        # Remaining blocks
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, intermediate_channels))

        return nn.Sequential(*layers)



In [25]:
def ResNet50(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 4, 6, 3], img_channel, num_classes)


def ResNet101(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 4, 23, 3], img_channel, num_classes)


def ResNet152(img_channel=3, num_classes=1000):
    return ResNet(Block, [3, 8, 36, 3], img_channel, num_classes)


In [26]:
def get_train_valid_loader(data_dir, batch_size, augment, random_seed, valid_size=0.1, shuffle=True):
    """Get training and validation data loaders for CIFAR-10"""

    # CIFAR-10 normalization
    normalize = transforms.Normalize(
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010]
    )

    # Common transforms
    common_transform = [
        transforms.Resize((224, 224)),  # Resize to standard ResNet input
        transforms.ToTensor(),
        normalize
    ]

    # Training transforms with optional augmentation
    if augment:
        train_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomCrop(224, padding=4),
            transforms.ToTensor(),
            normalize
        ])
    else:
        train_transform = transforms.Compose(common_transform)

    valid_transform = transforms.Compose(common_transform)

    # Load datasets
    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True, download=True, transform=train_transform
    )
    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True, download=True, transform=valid_transform
    )

    # Create train/valid split
    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    # Create data loaders
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler
    )
    valid_loader = torch.utils.data.DataLoader(
        valid_dataset, batch_size=batch_size, sampler=valid_sampler
    )

    return train_loader, valid_loader

In [27]:
def get_test_loader(data_dir, batch_size, shuffle=True):
    """Get test data loader for CIFAR-10"""

    normalize = transforms.Normalize(
        mean=[0.4914, 0.4822, 0.4465],
        std=[0.2023, 0.1994, 0.2010]
    )

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        normalize
    ])

    dataset = datasets.CIFAR10(
        root=data_dir, train=False, download=True, transform=transform
    )

    data_loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=shuffle
    )

    return data_loader

In [29]:
# Configuration
data_dir = './data'
num_classes = 10  # CIFAR-10 has 10 classes
num_epochs = 7
batch_size = 64
learning_rate = 0.01

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load data
train_loader, valid_loader = get_train_valid_loader(
    data_dir=data_dir,
    batch_size=batch_size,
    augment=True,  # Enable data augmentation
    random_seed=1
)

test_loader = get_test_loader(data_dir=data_dir, batch_size=batch_size)

# Create model - FIXED: Using correct num_classes
model = ResNet50(img_channel=3, num_classes=num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    model.parameters(),
    lr=learning_rate,
    weight_decay=0.0001,
    momentum=0.9
)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

Using device: cuda


##Training

In [33]:
# Training loop
total_step = len(train_loader)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Print progress every 100 steps
        if (i + 1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], Loss: {loss.item():.4f}')

    # Epoch summary
    avg_loss = running_loss / total_step
    print(f'Epoch [{epoch+1}/{num_epochs}] - Average Loss: {avg_loss:.4f}')

    # Validation
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f'Validation Accuracy: {accuracy:.2f}%\n')

    # Step the scheduler
    scheduler.step()



Epoch [1/7], Step [100/704], Loss: 1.9643
Epoch [1/7], Step [200/704], Loss: 1.9841
Epoch [1/7], Step [300/704], Loss: 1.5853
Epoch [1/7], Step [400/704], Loss: 1.6292
Epoch [1/7], Step [500/704], Loss: 1.7593
Epoch [1/7], Step [600/704], Loss: 1.6325
Epoch [1/7], Step [700/704], Loss: 1.4379
Epoch [1/7] - Average Loss: 1.6248
Validation Accuracy: 49.82%

Epoch [2/7], Step [100/704], Loss: 1.1559
Epoch [2/7], Step [200/704], Loss: 1.2775
Epoch [2/7], Step [300/704], Loss: 0.9390
Epoch [2/7], Step [400/704], Loss: 1.4637
Epoch [2/7], Step [500/704], Loss: 1.0861
Epoch [2/7], Step [600/704], Loss: 0.8214
Epoch [2/7], Step [700/704], Loss: 0.7289
Epoch [2/7] - Average Loss: 1.1465
Validation Accuracy: 65.04%

Epoch [3/7], Step [100/704], Loss: 0.8178
Epoch [3/7], Step [200/704], Loss: 0.8814
Epoch [3/7], Step [300/704], Loss: 0.9045
Epoch [3/7], Step [400/704], Loss: 0.9674
Epoch [3/7], Step [500/704], Loss: 0.8773
Epoch [3/7], Step [600/704], Loss: 0.8862
Epoch [3/7], Step [700/704], Los

##Testing

In [35]:
# Final test evaluation
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    test_accuracy = 100 * correct / total
    print(f'\nFinal Test Accuracy: {test_accuracy:.2f}%')


Final Test Accuracy: 81.36%
