# CPE494/663 GENERATIVE ARTIFICIAL INTELLIGENCE
## Lab 1: PyTorch Discriminative Model

### Download dataset

Using `RealWaste` dataset from https://archive.ics.uci.edu/dataset/908/realwaste  
The dataset contains 4752 images of waste items across 9 material types.

In [None]:
import requests
import zipfile
from pathlib import Path
from io import BytesIO

In [None]:
%%time
if not Path("realwaste-main").is_dir():
    print("Downloading dataset...")
    resp = requests.get("https://archive.ics.uci.edu/static/public/908/realwaste.zip")
    content = BytesIO(resp.content)
    with zipfile.ZipFile(content, 'r') as zip_ref:
        zip_ref.extractall(".")
    print("Downloaded and extracted dataset to `./realwaste-main`")
else:
    print("Dataset already exists")

### Define `datasets` and `DataLoader`

In [None]:
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import transforms, datasets

In [None]:
torch.set_seed(42)

In [None]:
# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Define data transformation
data_transforms = transforms.Compose([transforms.Resize((256, 256)),
                                      transforms.ToTensor()
                                     ])

In [None]:
# Read image directory as dataset
realwaste_img = datasets.ImageFolder("realwaste-main/RealWaste/",
                                     transform=data_transforms)

In [None]:
# Get number of classes in the dataset
num_classes = len(realwaste_img.classes)
print(f"Dataset contains {num_classes} classes: {realwaste_img.classes}")

In [None]:
# Split dataset into training and test set
train_ratio = 0.7

train_size = int(train_ratio * len(realwaste_img))
test_size = len(realwaste_img) - train_size

train_dataset, test_dataset = random_split(realwaste_img,
                                           [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True)

In [None]:
# Show sample image from training set
import matplotlib.pyplot as plt
import numpy as np

def show_image(img_tensor):
    img = img_tensor.numpy().transpose((1, 2, 0))
    img = np.clip(img, 0, 1)
    plt.imshow(img)
    plt.axis('off')
    plt.show()

images, labels = next(iter(train_loader))
show_image(images[0])
print(f"Label: {realwaste_img.classes[labels[0]]}")

### Modeling

**Your turn:** Develop and train a discriminative neural network to classify images within the `RealWaste` dataset.  
You can use any neural network architecture; however, you must provide a technical justification for your design choices.  
Evaluate the model's performance using appropriate loss functions and metrics.  

**What to submit:** 2 files:
1. A Python notebook (.ipynb)
2. Export your notebook as a PDF file
   
Submit to `Lab 1 - PyTorch` activity in LEB2.  


**Due date:** 26/01/2026 17:59

In [None]:
import torch
import torch.nn as nn
import math

# ---------------------------------------------------------
# 1. Helper Functions for Scaling
# ---------------------------------------------------------

def make_divisible(v, divisor=8, min_value=None):
    """
    Ensures all layer channels are divisible by 8 (friendly to hardware).
    Adapted from the original EfficientNet implementation.
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

def get_compound_params(phi):
    """
    Calculates the depth, width, and resolution multipliers
    based on the compound coefficient phi.

    EfficientNet Constants:
    alpha = 1.2 (depth)
    beta  = 1.1 (width)
    gamma = 1.15 (resolution)
    """
    alpha, beta, gamma = 1.2, 1.1, 1.15

    depth_mult = alpha ** phi
    width_mult = beta ** phi
    res_mult   = gamma ** phi

    return depth_mult, width_mult, res_mult

# ---------------------------------------------------------
# 2. Standard ResNet Components (Bottleneck)
# ---------------------------------------------------------

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.)) * groups

        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

In [None]:
class ScaledResNet(nn.Module):
    def __init__(self, block, layers, width_mult=1.0, num_classes=1000,
                 zero_init_residual=False, groups=1, width_per_group=64):
        super(ScaledResNet, self).__init__()

        norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        # Base settings
        self.inplanes = make_divisible(64 * width_mult)
        self.base_width = width_per_group
        self.groups = groups

        # Initial Stem (Scale the stem width too)
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Calculate scaled channel widths for the 4 stages
        # Base ResNet planes are [64, 128, 256, 512]
        layer_planes = [
            make_divisible(64 * width_mult),
            make_divisible(128 * width_mult),
            make_divisible(256 * width_mult),
            make_divisible(512 * width_mult)
        ]

        # Create layers
        self.layer1 = self._make_layer(block, layer_planes[0], layers[0])
        self.layer2 = self._make_layer(block, layer_planes[1], layers[1], stride=2)
        self.layer3 = self._make_layer(block, layer_planes[2], layers[2], stride=2)
        self.layer4 = self._make_layer(block, layer_planes[3], layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        # Final fully connected layer
        final_channels = layer_planes[3] * block.expansion
        self.fc = nn.Linear(final_channels, num_classes)

        # Weight Initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None

        # If stride != 1 or input channels != output channels, we need a downsample projection
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, norm_layer=norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [None]:
# ---------------------------------------------------------
# 4. Builder Function
# ---------------------------------------------------------

def efficient_scaled_resnet50(phi=0, num_classes=1000):
    """
    Constructs a ResNet-50 model scaled using EfficientNet compound scaling.

    Args:
        phi (int): The compound scaling coefficient (e.g., 0 for base, 1, 2...).
                   phi=0 is standard ResNet50.
    """
    # 1. Get multipliers
    d_mult, w_mult, r_mult = get_compound_params(phi)

    # 2. Scale Depth (ResNet50 base layers: [3, 4, 6, 3])
    base_layers = [3, 4, 6, 3]
    # We use ceiling to round up layers, ensuring we don't lose depth on small scales
    scaled_layers = [int(math.ceil(l * d_mult)) for l in base_layers]

    # 3. Scale Resolution (Target Input Size)
    base_res = 224
    target_res = int(base_res * r_mult)

    print(f"--- Efficient Scaled ResNet50 (Phi={phi}) ---")
    print(f"Depth Multiplier: {d_mult:.2f} -> Layers: {scaled_layers}")
    print(f"Width Multiplier: {w_mult:.2f}")
    print(f"Target Resolution: {target_res}x{target_res}")

    # 4. Build Model
    model = ScaledResNet(
        block=Bottleneck,
        layers=scaled_layers,
        width_mult=w_mult,
        num_classes=num_classes
    )

    return model, target_res

In [None]:
# Example: Create a Phi=1 scaled ResNet50 (roughly "ResNet-50-B1")
model, resolution = efficient_scaled_resnet50(phi=1, num_classes=num_classes)
model.to(device)

# Test with a dummy input of the calculated resolution
dummy_input = torch.randn(1, 3, resolution, resolution)
output = model(dummy_input)

print(f"Output shape: {output.shape}") # Should be [1, 1000]

### Training and Evaluation

In [None]:
import torch.optim as optim
import torch.nn.functional as F

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Number of epochs
num_epochs = 5

print(f"Training on: {device}")

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    for i, data in enumerate(train_loader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()  # Zero the parameter gradients

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    train_accuracy = 100 * correct_train / total_train
    print(f'Epoch {epoch + 1}, Training Loss: {running_loss / len(train_loader):.3f}, Training Accuracy: {train_accuracy:.2f}%')

    # Evaluation loop
    model.eval()  # Set model to evaluation mode
    correct_test = 0
    total_test = 0
    test_loss = 0.0
    with torch.no_grad():  # Disable gradient calculation during evaluation
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()

    test_accuracy = 100 * correct_test / total_test
    print(f'Epoch {epoch + 1}, Test Loss: {test_loss / len(test_loader):.3f}, Test Accuracy: {test_accuracy:.2f}%\n')

print('Finished Training')


---