## Machine Learning Assignment #1 - Image Classification

### Implement the missing parts to train an image classification model:
- configuration of the model.
- loss function for training the model.

### Implementing an approach to mitigate and eliminate noise inherent in the provided data:
- there is no fixed or predefined answer.
- include brief comments in the code to explain the approach wherever noise mitigation or removal is implemented.

In [8]:
import torch
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
device = 'cuda:1' if torch.cuda.is_available() else 'cpu'
print('current device: ',device)

current device:  cuda:1


### Importing libraries required for code execution.

In [9]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
%matplotlib inline

import numpy as np
from PIL import Image

### Data Augmentation(My Own)


In [10]:
# Data Augmentation Transforms for handling noisy data
train_transform = transforms.Compose([
    # Resize to ensure consistent size
    transforms.Resize((64, 64)),
    
    # Data Augmentation techniques to handle noise (only requested methods)
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(
        brightness=0.2,    # Adjust brightness to handle lighting variations
        contrast=0.2,      # Adjust contrast to handle poor quality images
        saturation=0.2,    # Adjust saturation 
        hue=0.1           # Slight hue adjustment
    ),
    
    # Normalization
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Validation transform (minimal augmentation)
val_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

### Code for loading the provided noisy dataset.
- note that basic augmentations (in the preprocessing stage) have already been applied.

In [11]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, pt_file, transform=None, noise_filter=True):
        data = torch.load(pt_file)
        self.images = data['images']   
        self.targets = data['targets']
        self.transform = transform
        
        # Optional: Filter noisy samples based on pixel intensity variance
        if noise_filter:
            self.images, self.targets = self._filter_noisy_samples(self.images, self.targets)
        
    def _filter_noisy_samples(self, images, targets, threshold=0.01):
        """
        Filter out samples that might be too noisy based on pixel variance
        Low variance might indicate corrupted/blank images
        """
        filtered_images = []
        filtered_targets = []
        
        for img, target in zip(images, targets):
            # Calculate variance of pixel values
            img_var = torch.var(img.float())
            
            # Keep samples with reasonable variance (not too low, not too high)
            if threshold < img_var < 1.0:
                filtered_images.append(img)
                filtered_targets.append(target)
        
        print(f"Filtered {len(images) - len(filtered_images)} noisy samples")
        print(f"Remaining samples: {len(filtered_images)}")
        
        return filtered_images, filtered_targets
        
    def __len__(self):
        return len(self.targets)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        target = self.targets[idx]
        
        # Convert tensor to PIL Image for transforms
        if self.transform:
            # Ensure image is in the right format for PIL
            if image.dim() == 3:  # C, H, W
                image = image.permute(1, 2, 0)  # H, W, C
            
            # Convert to PIL Image
            if image.dtype == torch.float32:
                image = (image * 255).clamp(0, 255).byte()
            
            image = Image.fromarray(image.numpy())
            image = self.transform(image)
        
        return image, target


In [12]:
def my_collate_fn(samples):
    images = []
    labels = []
    
    for data in samples:
        img, target = data
        images.append(img)
        labels.append(target)
        
    return torch.stack(images), torch.stack(labels)

### Data loading.
- "pt_file" referes to the location of the provided data file.
- the number of training samples is 43,201.

In [14]:
pt_file = os.path.join('Noisy_dataset.pt')
image_size = 64

# Create dataset with augmentation and noise filtering
train_dataset = CustomDataset(pt_file, transform=train_transform, noise_filter=True)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4, collate_fn=my_collate_fn)

print(f'Length of train samples after filtering: {len(train_dataset)}')

Filtered 14239 noisy samples
Remaining samples: 28962
Length of train samples after filtering: 28962


In [15]:
print(  train_dataset.targets)


[tensor([97., -3., -3.]), tensor([97., -3., -3.]), tensor([88., -3., -3.]), tensor([26., -3., -3.]), tensor([63., -3., -3.]), tensor([88., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([ 3., -3., -3.]), tensor([26., -3., -3.]), tensor([ 3., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([50., -3., -3.]), tensor([50., -3., -3.]), tensor([63., -3., -3.]), tensor([88., -3., -3.]), tensor([ 3., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([26., -3., -3.]), tensor([33., -3., -3.]), tensor([38., -3., -3.]), tensor([50., -3., -3.]), tensor([50., -3., -3.]), tensor([63., -3., -3.]), tensor([90., -3., -3.]), tensor([ 3., -3., -3.]), tensor([ 3., -3., -3.]), tensor([11., -3., -3.]), tensor([13., -3., -3.]), tensor([23., -3., -3.]), tensor([33., -3., -3.]), tensor([33., -3., -3.]), tensor([33., -3., -3.]), tensor([50., -3., -3.]), tensor([52., -3., -3.]),

### Prepare the model; ResNet
- fill in the blank.
- ResNet-10 must be used without exception.

In [16]:
# Make the Basic Block of ResNet

class BasicBlock(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride=1):    
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_planes, out_channels=planes, kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn1 = nn.BatchNorm2d(planes)

        self.conv2 = nn.Conv2d(in_channels=planes, out_channels=planes, kernel_size=3, stride=1, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        # Fill in; Conditional statement related to the stride.
        #이 부분 검토하기
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels=in_planes,out_channels= planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes))

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [17]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=100):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=2)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)
    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def ResNet10():
    return ResNet(BasicBlock, [1, 1, 1, 1])

In [19]:
# Some code for saving and loading model.
# Use if you needed.
def save_model(model, save_name):
    path = './' + str(save_name) + '.pt'
    ckpt = {'model': model}
    torch.save(ckpt, path)

def load_model(init_model, load_name):
    path = './' + str(load_name) + '.pt'
    #load_file = torch.load(path, map_location='cpu', weights_only=False)
    load_file = torch.load(path, map_location='cpu',weights_only=False)
    try:
        model = load_file['model']
        init_model.load_state_dict(model.state_dict())
    except:
        model = torch.load(str(load_name)+'.pt', weights_only=False)
        init_model = model
    return init_model

#### Implementation of the loss function for training.

In [20]:
# cross entropy function
def implemented_cross_entropy(output, target):
    if target.dim() == 2:
        target = target[:, 0]  # (batch_size,)
    
    target = target.long()
    
    log_probs = F.log_softmax(output, dim=1)  # (batch_size, num_classes)
    loss = -log_probs[torch.arange(output.size(0)), target]
    return loss.mean()

### Train function

In [21]:
def train(total_epoch, network, built_in_criterion, implemented_cross_entropy, optimizer, lr_schedule, train_loader, train_fn, device = 'cpu', save_name = 'save_name'):
    network = network.to(device)
    for epoch in range(total_epoch):
        train_fn(epoch, network, built_in_criterion, implemented_cross_entropy, optimizer, train_loader, device)
        lr_schedule.step()
        if ((epoch + 1) % 10 == 0) or epoch == total_epoch - 1:
            save_model(network, save_name)
            print('Model saved at epoch {} with name {} '.format(epoch + 1, save_name + '.pt'))
                

In [22]:
def train_single_epoch(current_epoch, network, built_in_criterion, implemented_cross_entropy, optimizer, train_loader, device='cpu'):
    network.train()
    running_loss = 0.0
    loss_error = 0.0
    correct, total_sample = 0.0, 0.0
    for idx, (input, label) in enumerate(train_loader):
        input, label = input.to(device), label.to(device)
        optimizer.zero_grad()
        output = network(input)

        _, pred = torch.max(output.data, 1)
        correct += (pred == label[:, 0].long()).sum().item() # The first index in the label represents the class ID. The remaining indices may be useful for the project.
        total_sample += label.size(0)

        loss = implemented_cross_entropy(output, label[:, 0].view(-1).long())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        # calculate error between imported loss and implmented from scratch
        loss_error += torch.abs(built_in_criterion(output, label[:, 0].view(-1).long()).detach() - implemented_cross_entropy(output, label[:, 0].view(-1).long()).detach())

    print('Epoch: {} | Training Accuracy: {:.2f} % | Loss: {:.2f} | Loss error {:.5f}'.format(current_epoch, 100*correct/total_sample, running_loss/(idx+1), loss_error/(idx + 1)))

### Training begins!

In [23]:
network = ResNet10()
total_epoch = 100
built_in_criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(network.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
step_lr_scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=[50, 75], gamma =0.1)

train_fn = train_single_epoch
train(total_epoch, network, built_in_criterion, implemented_cross_entropy, optimizer, step_lr_scheduler, train_loader, train_fn, 
device = device, save_name= f'김두회_20202927')

Epoch: 0 | Training Accuracy: 4.91 % | Loss: 4.33 | Loss error 0.00000
Epoch: 1 | Training Accuracy: 8.94 % | Loss: 4.07 | Loss error 0.00000
Epoch: 2 | Training Accuracy: 11.96 % | Loss: 3.89 | Loss error 0.00000
Epoch: 3 | Training Accuracy: 14.84 % | Loss: 3.74 | Loss error 0.00000
Epoch: 4 | Training Accuracy: 17.06 % | Loss: 3.62 | Loss error 0.00000
Epoch: 5 | Training Accuracy: 19.22 % | Loss: 3.52 | Loss error 0.00000
Epoch: 6 | Training Accuracy: 21.27 % | Loss: 3.43 | Loss error 0.00000
Epoch: 7 | Training Accuracy: 22.99 % | Loss: 3.34 | Loss error 0.00000
Epoch: 8 | Training Accuracy: 24.31 % | Loss: 3.27 | Loss error 0.00000
Epoch: 9 | Training Accuracy: 26.12 % | Loss: 3.18 | Loss error 0.00000
Model saved at epoch 10 with name 김두회_20202927.pt 
Epoch: 10 | Training Accuracy: 27.63 % | Loss: 3.12 | Loss error 0.00000
Epoch: 11 | Training Accuracy: 28.71 % | Loss: 3.05 | Loss error 0.00000
Epoch: 12 | Training Accuracy: 30.16 % | Loss: 2.98 | Loss error 0.00000
Epoch: 13 | 

### Critical note regarding the testing process.
- the submitted models will be evaluated using the code that includes the test function below.
- make sure that the trained model runs inference properly with this code 
- since the test data is not provided, verification can be done using the train data.
- failure to run will be treated as an error.
- Name the .pt file as: Name_StudentID.pt.

In [24]:
def test(network, implemented_cross_entropy, test_loader, device = 'cpu', load_name = None):
    if load_name is not None:
        network = load_model(network, load_name)

    print('Test start')
    network = network.to(device)
    network.eval()
    test_loss = 0.0
    correct, total_sample = 0.0, 0.0
    with torch.no_grad():
        for idx, (image, label) in enumerate(test_loader):
            image, label = image.to(device), label.to(device)
            output = network(image)
            loss = implemented_cross_entropy(output, label[:, 0].view(-1).long())
            _, pred = torch.max(output.data, 1)
            print('Prediction values: {}' .format(pred))
            total_sample += label.size(0)
            correct += (pred == label[:, 0].long()).sum().item()
            test_loss += loss.item()
    print('Test loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(test_loss/(idx + 1) ,correct, total_sample, 100 * correct / total_sample))
    
network = ResNet10()
test(network, implemented_cross_entropy, train_loader, device = device, load_name= '김두회_20202927')

Test start
Prediction values: tensor([69, 73, 30, 11, 78, 47, 47, 41, 17, 57, 41, 83, 33, 64, 58, 56, 78, 11,
        72, 88, 15,  2, 22, 36, 78, 82, 27, 53, 46, 12, 34, 19, 50, 76, 79, 86,
        37, 21, 70, 26, 56, 34, 66, 31, 80, 65, 12,  9, 68, 49, 34, 47, 59, 10,
        58,  6,  5, 45, 35, 58, 78, 22, 77, 39, 33, 36, 20, 83, 47, 30, 26, 61,
        51, 31, 74, 56,  5,  2, 10, 72, 35, 69, 73, 23,  4, 49, 66, 33,  3, 46,
        15, 66, 42, 74, 68, 19, 89, 10, 51, 64,  8, 80, 45, 33, 38, 75,  1, 18,
        62, 24, 62, 15, 30, 66, 18, 16, 62, 11, 54,  5, 72,  2, 87,  8,  2, 34,
        40, 41], device='cuda:1')
Prediction values: tensor([16,  5, 72, 98, 27, 74, 43, 54,  6, 12, 26,  9, 48, 40, 23,  5, 11, 61,
        34, 79, 37, 48, 79, 15, 51, 43, 24, 60, 67, 51, 82, 64,  3, 15, 79, 77,
        68, 51, 30,  0, 66, 44, 99, 23, 59, 18, 56, 28, 70, 19, 40, 65, 49, 16,
         2, 82, 17, 99, 75, 81, 26, 19, 17,  9, 27, 89, 92, 70, 28, 31, 67, 32,
        31, 18, 41, 14, 49, 57,  1, 8