In [1]:
import os
import json
import random
from PIL import Image

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

import torch
import torch.nn as nn
from torchvision import transforms, models
from torch.utils.data import Dataset, TensorDataset, DataLoader

from tqdm import tqdm
import matplotlib.pyplot as plt 

from datasets.waste_dataset import WasteDataset

%matplotlib inline

## set paths and config

In [2]:
# set paths 

cwd = os.getcwd()

project_root = os.path.abspath(os.path.join(cwd, ".."))

data_root = os.path.join(project_root, "data", "raw", "AerialWaste")

image_dirs = [os.path.join(data_root, f"images{i}") for i in range(6)]

train_json = f'{data_root}/training.json'

test_json = f'{data_root}/testing.json'


In [3]:
# set constants 

image_size = 244 
batch_size = 32
num_workers = 1
seed = 42

torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

## get usable image paths from json 

In [5]:
def get_image_path(file_name, image_dirs):
    for dir_path in image_dirs:
        full_path = os.path.join(dir_path, file_name)
        if os.path.exists(full_path):
            return full_path
    return None


In [6]:
with open(train_json, "r") as f:
    train_json_data = json.load(f)

records = []

for img in train_json_data["images"]:
    path = get_image_path(img["file_name"], image_dirs)

    if path is not None:
        records.append({
            "file_name": img["file_name"],
            "full_path": path,
            "waste": int(img["is_candidate_location"])
        })

df = pd.DataFrame(records)
df.head()


Unnamed: 0,file_name,full_path,waste
0,2.png,C:\Users\rimsh\Desktop\rimsha\github\urban-was...,1
1,3.png,C:\Users\rimsh\Desktop\rimsha\github\urban-was...,1
2,4.png,C:\Users\rimsh\Desktop\rimsha\github\urban-was...,1
3,5.png,C:\Users\rimsh\Desktop\rimsha\github\urban-was...,1
4,6.png,C:\Users\rimsh\Desktop\rimsha\github\urban-was...,1


In [7]:
print("Total usable training images:", len(df))
print(df["waste"].value_counts())


Total usable training images: 6327
waste
0    4205
1    2122
Name: count, dtype: int64


## train-val split

In [8]:
train_df, val_df = train_test_split(
    df,
    test_size=0.2,
    stratify=df["waste"],
    random_state=seed,
    shuffle=True
)

train_df = train_df.reset_index(drop=True)
val_df = train_df.reset_index(drop=True)

print("Train size:", len(train_df))
print("Validation size:", len(val_df))


Train size: 5061
Validation size: 5061


## image transformation + resizing

In [9]:
train_transforms = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(15),      # rotate Â±15 degrees
    transforms.ColorJitter(0.1, 0.1, 0.1, 0.1), # slight brightness/contrast/saturation changes
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

val_transforms = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])


## prepare training & val images and labels

In [10]:
class WasteDataset(Dataset):
    def __init__(self, dataframe, transforms=None):
        self.df = dataframe.reset_index(drop=True)
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row["full_path"]).convert("RGB")
        label = row["waste"]

        if self.transforms:
            image = self.transforms(image)

        return image, torch.tensor(label, dtype=torch.long)


In [11]:
train_dataset = WasteDataset(train_df, train_transforms)
val_dataset = WasteDataset(val_df, val_transforms)

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=num_workers
)


## RESENET18 Training

In [12]:
model = models.resnet18(weights="IMAGENET1K_V1")

# Freeze backbone
for param in model.parameters():
    param.requires_grad = False

# Replace classifier
model.fc = nn.Linear(model.fc.in_features, 2)

model = model.to(device)


### Loss Function

In [13]:
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(
    model.fc.parameters(),
    lr=1e-3
)


In [14]:
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0

    for images, labels in tqdm(loader):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(loader)


In [15]:
def evaluate(model, loader):
    model.eval()
    preds, targets = [], []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            outputs = model(images)
            predictions = torch.argmax(outputs, dim=1).cpu().numpy()

            preds.extend(predictions)
            targets.extend(labels.numpy())

    return accuracy_score(targets, preds)


In [None]:
EPOCHS = 5

for epoch in range(EPOCHS):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_acc = evaluate(model, val_loader)

    print(f"Epoch {epoch+1}/{EPOCHS}")
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val Accuracy: {val_acc:.4f}")
    print("-" * 30)


  0%|                                                                             | 0/159 [00:00<?, ?it/s]