In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt


In [11]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to 128x128
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load Flowers102 dataset
train_data = datasets.Flowers102(root='data', split='train', download=True, transform=transform)
val_data = datasets.Flowers102(root='data', split='val', download=True, transform=transform)
test_data = datasets.Flowers102(root='data', split='test', download=True, transform=transform)

# Create data loaders
train_loader = DataLoader(test_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(train_data, batch_size=32, shuffle=False)


In [3]:
if torch.cuda.is_available():
    device = 'cuda:0'
    print("[INFO] Current device:", torch.cuda.get_device_name(torch.cuda.current_device()), f", device num:{torch.cuda.current_device()}")
elif torch.has_mps:
    device = 'mps'
    print("[INFO] Current device: MAC OS Metal Performance Shaders.")
else:
    device = 'cpu'
    print("[INFO] Current device: CPU")
    print("*"*50,"\n[WARNING] You may need to change the device to GPU or MPS to get better performance.")
    print("*"*50)
device = torch.device(device)

In [4]:
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample  # Adjust dimensions if needed for skip connection

    def forward(self, x):
        identity = x  # Save the input for the skip connection

        # First convolutional layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)

        # Second convolutional layer
        out = self.conv2(out)
        out = self.bn2(out)

        # Downsample the input if necessary
        if self.downsample is not None:
            identity = self.downsample(x)

        # Add skip connection (identity)
        out += identity
        out = F.relu(out)

        return out


In [5]:
class CustomResNet(nn.Module):
    def __init__(self, block, layers, num_classes=102):
        super(CustomResNet, self).__init__()
        self.in_channels = 64  # Initial number of filters

        # Initial convolutional layer
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet layers (4 stages)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        # Global average pooling and fully connected layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)  # 102 classes for Flowers102

    def _make_layer(self, block, out_channels, blocks, stride=1):
        # Helper function to create a stage with several residual blocks
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels

        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        # Forward pass through the network
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


In [6]:
# Define the model using ResNet18 configuration (2 residual blocks per stage)
def custom_resnet18(num_classes=102):
    return CustomResNet(ResidualBlock, [2, 2, 2, 2], num_classes=num_classes)

# Initialize the model
model = custom_resnet18().to(device)


In [7]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [8]:
def train_model(model, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = 100 * correct / total
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%')

    return model


In [9]:
def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    acc = 100 * correct / total
    print(f'Accuracy: {acc:.2f}%')


In [12]:
# Train the model
model = train_model(model, criterion, optimizer, num_epochs=10)

# Validate the model
print("Validation Accuracy:")
evaluate_model(model, val_loader)

# Test the model (Optional)
print("Test Accuracy:")
evaluate_model(model, test_loader)


Epoch 1/10, Loss: 2.7951, Accuracy: 27.40%
Epoch 2/10, Loss: 2.3022, Accuracy: 38.36%
Epoch 3/10, Loss: 1.9411, Accuracy: 47.70%
Epoch 4/10, Loss: 1.7009, Accuracy: 52.98%
Epoch 5/10, Loss: 1.4233, Accuracy: 59.80%
Epoch 6/10, Loss: 1.2127, Accuracy: 64.97%
Epoch 7/10, Loss: 0.9673, Accuracy: 72.01%
Epoch 8/10, Loss: 0.7420, Accuracy: 78.39%
Epoch 9/10, Loss: 0.5891, Accuracy: 82.55%
Epoch 10/10, Loss: 0.4231, Accuracy: 87.04%
Validation Accuracy:
Accuracy: 56.37%
Test Accuracy:
Accuracy: 55.88%


In [13]:
# Save the trained model
torch.save(model.state_dict(), 'custom_resnet18_flowers102.pth')
