In [None]:
import os
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import random

from torch import nn
import torch.optim as optim
from sklearn.metrics import accuracy_score

In [None]:
class WasteDataset(Dataset):
    def __init__(self, root_dir, split, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.image_paths = []
        self.labels = []
        
        for i, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for subfolder in ['default', 'real_world']:
                subfolder_dir = os.path.join(class_dir, subfolder)
                image_names = os.listdir(subfolder_dir)
                random.shuffle(image_names)
                
                if split == 'train':
                    image_names = image_names[:int(0.6 * len(image_names))]
                elif split == 'val':
                    image_names = image_names[int(0.6 * len(image_names)):int(0.8 * len(image_names))]
                else:  # split == 'test'
                    image_names = image_names[int(0.8 * len(image_names)):]
                
                for image_name in image_names:
                    self.image_paths.append(os.path.join(subfolder_dir, image_name))
                    self.labels.append(i)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, index):
        image_path = self.image_paths[index]
        label = self.labels[index]
        image = Image.open(image_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [None]:
# Child class inheretied from parent class (nn.Module: Base class for all neural network modules)
class Custom_CNN_Model(nn.Module):
    def __init__(self, waste_classes_nb):
        super().__init__()

        # Feature extraction layers
        self.conv1 = nn.Sequential(nn.Conv2d(3,out_channels=32, kernel_size=3,stride=1,padding=1),
                                   nn.BatchNorm2d(32))
        self.conv2 = nn.Sequential(nn.Conv2d(32,out_channels=64, kernel_size=3,stride=1,padding=1),
                                   nn.BatchNorm2d(64))
        self.conv3 = nn.Sequential(nn.Conv2d(64,out_channels=128, kernel_size=3,stride=1,padding=1),
                                   nn.BatchNorm2d(128))
        self.conv4 = nn.Sequential(nn.Conv2d(128,out_channels=256, kernel_size=3,stride=1,padding=1),
                                   nn.BatchNorm2d(256))
        self.conv5 = nn.Sequential(nn.Conv2d(256,out_channels=512, kernel_size=3,stride=1,padding=1),
                                   nn.BatchNorm2d(512))

        # Activation function to introduce non-linearity
        self.relu = nn.ReLU()

        # Reduce spatial dimension (downsampling)
        self.maxpooling = nn.MaxPool2d(2,2)

        # Classifier layers
        self.fc1 = nn.Linear(512*7*7, 512)     # each of 25088 inputs connects to all 512 neurons
        self.fc2 = nn.Linear(512, waste_classes_nb)     # why???? (parameters?)

        # drop/turn off 50% of nurons in the fc layer (p = 0.5)
        self.drop = nn.Dropout(0.5)


    def forward(self, mod):
        mod = self.conv1(mod)  # 32 x 224 x 224
        mod = self.relu(mod)
        mod = self.maxpooling(mod)  # 32 x 112 x 112
        
        mod = self.conv2(mod) # 64 x 112 x 112
        mod = self.relu(mod)
        mod = self.maxpooling(mod) # 64 x 56 x 56

        mod = self.conv3(mod)  # 128 x 56 x 56
        mod = self.relu(mod)
        mod = self.maxpooling(mod) # 128 x 28 x 28

        mod = self.conv4(mod)  # 256 x 28 x 28
        mod = self.relu(mod)
        mod = self.maxpooling(mod) # 256 x 14 x 14

        mod = self.conv5(mod) # 512 x 14 x 14
        mod = self.relu(mod)
        mod = self.maxpooling(mod) # 512 x 7 x 7
        
        # Flatten the output of the final conv layer to a 1D vector for the FC layer
        mod = mod.view(mod.size(0), -1)

        mod = self.fc1(mod)
        mod = self.drop(mod)
        mod = self.relu(mod)
        mod = self.fc2(mod)
        return mod


In [None]:
# Load dataset path
dataset_path = r'C:\Users\rouka\OneDrive\Desktop\Uni\year 3\semester 2\CSE 351 - Intro to AI\Final_Project\archive\images\images'

# Set dataset hyperparameters
batch_size = 32
num_epochs = 10
learning_rate = 0.001 # for optimization


In [None]:
# Create the datasets and data loaders
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),     # randomly flip images
    transforms.RandomRotation(10),         # small random rotation
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = WasteDataset(dataset_path, split='train', transform=train_transform)
val_dataset = WasteDataset(dataset_path, split='val', transform=val_test_transform)
test_dataset = WasteDataset(dataset_path, split='test', transform=val_test_transform)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
Waste_Classes = train_dataset.classes
print(Waste_Classes)

Waste_Classes_NB = len(Waste_Classes)
print(Waste_Classes_NB)

In [None]:
model = Custom_CNN_Model(Waste_Classes_NB).to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# reduces learning rate every 2 epochs by multiplying learning_rate by 0.5 every 2 epochs
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)


In [None]:
# Variable to apply early stopping if validation loss does not decrease
# patience = max number or epochs without val_loss decrease
best_val_loss = float("inf")
patience, counter = 5, 0

# Lists to store the training and validation losses
train_losses = []
val_losses = []

# Training loop
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0.0
    for images, labels in train_dataloader:
        images = images.to('cuda')
        labels = labels.to('cuda')
        
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step() 
        scheduler.step() 
        
        train_loss += loss.item() * images.size(0)
    
    train_loss /= len(train_dataset)
    train_losses.append(train_loss)

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_dataloader:
            images = images.to('cuda')
            labels = labels.to('cuda')
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item() * images.size(0)
    
    val_loss /= len(val_dataset)
    val_losses.append(val_loss)

    # if val_loss decreased reset counter
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0

    # else increase counter         
    else:
        counter += 1
        if counter >= patience:     #if counter exceedes patience stop training
            print("Early stopping.")
            print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
            break

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

print("Training completed!")