# **Downloading the dataset directly from Kaggle**

In [None]:
# Upload kaggle.json file
from google.colab import files
files.upload()

KeyboardInterrupt: 

In [None]:
# Create Kaggle directory
!mkdir ~/.kaggle

In [None]:
#Copy kaggle.json
!cp kaggle.json ~/.kaggle/

In [None]:
!ls -ltr ~/.kaggle

In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!ls -ltr ~/.kaggle

In [None]:
!kaggle datasets download -d 'asdasdasasdas/garbage-classification'

In [None]:
!ls -ltr

In [None]:
!unzip /content/garbage-classification.zip -d ../content/

# **File Handling**

In [None]:
with open('/content/one-indexed-files-notrash_train.txt') as f:
  train_data = []
  for line in f:
    line = line.strip()
    filename, label = line.rsplit(maxsplit=1) # maxsplit = 1 means split only once. This acts as a safeguard in case file names have a white space
    train_data.append((filename, int(label)))

In [None]:
with open('/content/one-indexed-files-notrash_test.txt') as f:
  test_data = []
  for line in f:
    line = line.strip()
    filename, label = line.rsplit(maxsplit=1) # maxsplit = 1 means split only once. This acts as a safeguard in case file names have a white space
    test_data.append((filename, int(label)))

In [None]:
with open('/content/one-indexed-files-notrash_val.txt') as f:
  val_data = []
  for line in f:
    line = line.strip()
    filename, label = line.rsplit(maxsplit=1) # maxsplit = 1 means split only once. This acts as a safeguard in case file names have a white space
    val_data.append((filename, int(label)))

In [None]:
def FilenameToPath(List_Filename):
  CSV_Data = []
  for data in List_Filename:
    filename = data[0]
    label = data[1]

    match(label):
      case 1:
        path = '/content/Garbage classification/Garbage classification/glass/' + filename
      case 2:
        path = '/content/Garbage classification/Garbage classification/paper/' + filename
      case 3:
        path = '/content/Garbage classification/Garbage classification/cardboard/' + filename
      case 4:
        path = '/content/Garbage classification/Garbage classification/plastic/' + filename
      case 5:
        path = '/content/Garbage classification/Garbage classification/metal/' + filename
      case 6:
        path = '/content/Garbage classification/Garbage classification/trash/' + filename
      case _:
        raise RuntimeError('INVALID LABEL FOUND!')

    CSV_Data.append((path, label))

  return CSV_Data

In [None]:
train_data_csv = FilenameToPath(train_data)
test_data_csv = FilenameToPath(test_data)
val_data_csv = FilenameToPath(val_data)

In [None]:
import csv

with open('Train_Data.csv', 'w', newline='') as f:
  writer = csv.writer(f)

  for data in train_data_csv:
    writer.writerow(data)


In [None]:
with open('Test_Data.csv', 'w', newline='') as f:
  writer = csv.writer(f)

  for data in test_data_csv:
    writer.writerow(data)


In [None]:
with open('Val_Data.csv', 'w', newline='') as f:
  writer = csv.writer(f)

  for data in val_data_csv:
    writer.writerow(data)


# **Data Preprocessing**

In [None]:
import pandas as pd
import numpy as np

In [None]:
train_df = pd.read_csv('/content/Train_Data.csv', names = ['Image_Path', 'Category'])
train_df

In [None]:
test_df = pd.read_csv('/content/Test_Data.csv', names = ['Image_Path', 'Category'])
test_df

In [None]:
val_df = pd.read_csv('/content/Val_Data.csv', names = ['Image_Path', 'Category'])
val_df

In [None]:
# Data Cleaning
train_df.dropna(inplace = True)
test_df.dropna(inplace = True)
val_df.dropna(inplace = True)

In [None]:
import numpy as np
import cv2 as cv
from google.colab.patches import cv2_imshow
# cv2.imshow() is disabled in Colab, because it causes Jupyter sessions to crash

# **Analysing Images using OpenCV**

We analyse the images in training, validation and testing datasets using the OpenCV module. We inspect the datasets for the following three flags:

1. Do all images have the same dimensions or not? If not, then what is the minimum height and minimum width?
2. Are all the images square?
3. Do all images have 3 channels?

In [None]:
def inspectImages(df):
  c1 = 0
  c2 = 0
  c3 = 0
  height_old = 0
  width_old = 0
  heights = []
  widths = []

  for row, col in df.iterrows():
    path = df.iloc[row,0]
    img = cv.imread(path)
    height, width, channels = img.shape

    # To check if all images have same dimensions.
    if (height_old != height or width_old != width) and (height_old != 0 and width_old != 0):
      c3 += 1

    # To check if the images are square.
    if height != width:
      c1 += 1

    # To check if all images have 3 channels.
    if channels != 3:
      c2 += 1

    height_old = height
    width_old = width

    if c3 > 0:
      heights.append(height)
      widths.append(width)

  print(c1, '/', df.shape[0], 'images are rectangular.')

  if len(heights) > 0:
    print('Minimum height:', min(heights))
  else:
    print('All images have height', height_old)

  if len(widths) > 0:
    print('Minimum width:', min(widths))
  else:
    print('All images have width', width_old)

  print(c3, '/', df.shape[0], "images have different dimensions.")
  print('Channels not equal to 3 for', c2, '/', df.shape[0], 'images.')

In [None]:
print("Inspecting the training dataset...")
inspectImages(train_df)
print("\nInspecting the validation dataset...")
inspectImages(val_df)
print("\nInspecting the testing dataset...")
inspectImages(test_df)

**Conclusion:**
All images in the training, validation and testing datasets have 3 channels and dimensions 384 x 512. This means we do not need to resize the images unless we use the model for images from external sources.

# **Preparing the datasets for PyTorch operations:**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader

import torchvision.transforms as transforms
from PIL import Image

In [None]:
class MyDataset(Dataset):
  def __init__(self, df, transform=None):
    self.df = df
    self.transform = transform

  def __len__(self):
    return self.df.shape[0]

  def __getitem__(self, idx):
    img_path, label = self.df.iloc[idx, 0], torch.tensor(self.df.iloc[idx, 1] - 1, dtype = torch.long)
    img = Image.open(img_path).convert("RGB")

    if self.transform != None:
      img = self.transform(img)

    return img, label

In [None]:
transform1 = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(0.1,0.1,0.1,0.05),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=(0.485,0.456,0.406),
        std=(0.229,0.224,0.225)
    )
])

transform2 = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=(0.485,0.456,0.406),
        std=(0.229,0.224,0.225)
    )
])


In [None]:
train_data = MyDataset(train_df, transform = transform1)
val_data = MyDataset(val_df, transform = transform2)
test_data = MyDataset(test_df, transform = transform2)

In [None]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers = 0, pin_memory=True)
val_loader = DataLoader(val_data, batch_size = 64, shuffle = False, num_workers = 0, pin_memory=True)
test_loader = DataLoader(test_data, batch_size = 64, shuffle = False, num_workers = 0, pin_memory=True)

In [None]:
image, label = train_data[0]
image.size()

In [None]:
train_data.__len__()

# **Defining the CNN Architecture**

Work in Progress

In [None]:
class NeuralNet(nn.Module):
  def __init__(self):
    super(NeuralNet, self).__init__()

    self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 32, kernel_size = 3) # (3, 256, 256) -> (32, 254, 254)
    self.bn1 = nn.BatchNorm2d(32)
    self.pool = nn.MaxPool2d(2, 2)  # (32, 127, 127)
    self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3)  # (32, 127, 127) -> (64, 125, 125)
    self.bn2 = nn.BatchNorm2d(64)
    self.conv3 = nn.Conv2d(in_channels = 64, out_channels = 96, kernel_size = 3) # (64, 125, 125) -> (96, 123, 123) -> (96, 61, 61)
    self.bn3 = nn.BatchNorm2d(96)
    self.conv4 = nn.Conv2d(in_channels = 96, out_channels = 128, kernel_size = 3) # (96, 61, 61) -> (128, 59, 59)
    self.bn4 = nn.BatchNorm2d(128)
    self.gap = nn.AdaptiveAvgPool2d((1, 1))

    self.fc1 = nn.Linear(128, 128)
    self.dropout = nn.Dropout(0.3)
    self.fc2 = nn.Linear(128, 6)

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = F.relu(x)
    x = self.pool(x)

    x = self.conv2(x)
    x = self.bn2(x)
    x = F.relu(x)
    x = self.pool(x)

    x = self.conv3(x)
    x = self.bn3(x)
    x = F.relu(x)
    x = self.pool(x)

    x = self.conv4(x)
    x = self.bn4(x)
    x = F.relu(x)

    x = self.gap(x)

    x = torch.flatten(x, start_dim = 1)

    x = self.fc1(x)
    x = self.dropout(x)
    x = F.relu(x)
    x = self.fc2(x)

    return x

In [None]:
net = NeuralNet()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr = 1e-4)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = net.to(device)

In [None]:
image, label = train_data[0]
image.shape

In [None]:
scaler = torch.cuda.amp.GradScaler()

In [None]:
print(torch.cuda.get_device_name(0))

In [None]:
import copy

class EarlyStopping:
  def __init__(self, patience = 5, min_delta = 0.0):
    self.patience = patience
    self.min_delta = min_delta
    self.best_loss = float('inf')
    self.counter = 0
    self.best_state = None

  def stop(self, val_loss, model):
    if val_loss < self.best_loss - self.min_delta:
      self.best_loss = val_loss # Set best loss to the current val loss which is less then the previous best loss.
      self.counter = 0  # Reset counter as the loss is still decreasing.
      self.best_state = copy.deepcopy(model.state_dict()) # Shallow copy stores reference to state_dict. If state_dict changes, then shallow copy also changes.
      return False  # Do not stop.
    else:
      self.counter += 1
      if self.counter >= self.patience:
        return True # Stop.

In [None]:
def validate(model, dataloader, loss_fn):
  model.eval()
  running_loss = 0.0

  with torch.no_grad():
    for images, labels in dataloader:
      images = images.to(device)  # Send images to same device as model
      labels = labels.to(device)  # Send labels to same device (optional for comparison)
      outputs = model(images)
      loss = loss_fn(outputs, labels)
      running_loss += loss.item()
  val_loss = running_loss/len(dataloader)
  return val_loss

In [None]:
earlyStop = EarlyStopping(patience = 5, min_delta = 1e-3)
for epoch in range(50):
    net.train()
    print(f"Training epoch {epoch+1}")
    running_loss = 0.0

    for inputs, labels in train_loader:
      inputs = inputs.to(device, non_blocking=True)
      labels = (labels).to(device, non_blocking=True)

      optimizer.zero_grad()

      with torch.amp.autocast(device_type="cuda"):
          outputs = net(inputs)
          loss = loss_fn(outputs, labels)

      scaler.scale(loss).backward()
      scaler.step(optimizer)
      scaler.update()

      running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    val_loss = validate(net, val_loader, loss_fn)


    print(f'Training Loss: {train_loss:.4f}, Validation loss: {val_loss:.4f}')
    if earlyStop.stop(val_loss, net) == True:
      print("Early Stopping triggered")
      break

net.load_state_dict(earlyStop.best_state)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net.to(device)  # Make sure model is on the right device

correct = 0
total = 0

net.eval()
with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)  # Send images to same device as model
        labels = labels.to(device)  # Send labels to same device (optional for comparison)
        outputs = net(images)
        _, prediction = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (prediction == labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy: {accuracy:.2f}%')
