# IMPORT

In [28]:
import os
import shutil
import random
from pathlib import Path
import time
import copy
import json

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms

from sklearn.metrics import confusion_matrix, classification_report

# CONFIG

In [20]:
RAW_DIR = "garbage-dataset"            # folder sekarang
OUT_DIR = "garbage-dataset-split"      # folder hasil split
VAL_RATIO = 0.2                        # 80/20 split
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 15
LR = 1e-3
PATIENCE = 5
FREEZE_BACKBONE = False
OUTPUT_MODEL_DIR = "outputs"

# HELPER

In [21]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# 1. SPLIT DATASET

In [22]:
def split_dataset(raw_dir, out_dir, val_ratio=0.2):
    raw_dir = Path(raw_dir)
    out_dir = Path(out_dir)
    train_dir = out_dir / "train"
    val_dir = out_dir / "val"

    train_dir.mkdir(parents=True, exist_ok=True)
    val_dir.mkdir(parents=True, exist_ok=True)

    print("üîÑ Splitting dataset...")

    for cls in sorted(os.listdir(raw_dir)):
        class_path = raw_dir / cls
        if not class_path.is_dir():
            continue

        imgs = [p for p in class_path.iterdir() if p.is_file()]
        random.shuffle(imgs)

        split_idx = int(len(imgs) * (1 - val_ratio))
        train_imgs = imgs[:split_idx]
        val_imgs = imgs[split_idx:]

        (train_dir / cls).mkdir(parents=True, exist_ok=True)
        (val_dir / cls).mkdir(parents=True, exist_ok=True)

        for p in train_imgs:
            shutil.copy(p, train_dir / cls / p.name)

        for p in val_imgs:
            shutil.copy(p, val_dir / cls / p.name)

        print(f"[{cls}] Train: {len(train_imgs)}, Val: {len(val_imgs)}")

    print("Split selesai\n")

# 2. Data Loaders

In [23]:
def create_dataloaders(data_dir):
    train_tf = transforms.Compose([
        transforms.RandomResizedCrop(IMG_SIZE),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(12),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

    val_tf = transforms.Compose([
        transforms.Resize(int(IMG_SIZE * 1.1)),
        transforms.CenterCrop(IMG_SIZE),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

    train_dataset = datasets.ImageFolder(Path(data_dir) / "train", train_tf)
    val_dataset = datasets.ImageFolder(Path(data_dir) / "val", val_tf)

    loaders = {
        "train": DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True),
        "val": DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False),
    }

    return loaders, train_dataset.classes, {"train": len(train_dataset), "val": len(val_dataset)}

# 3. MODEL

In [24]:
def build_model(num_classes):
    model = models.resnet50(pretrained=True)
    if FREEZE_BACKBONE:
        for name, p in model.named_parameters():
            if "fc" not in name:
                p.requires_grad = False

    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

# 4. TRAINING

In [25]:
def train_model(model, dataloaders, sizes, device):
    os.makedirs(OUTPUT_MODEL_DIR, exist_ok=True)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)
    scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    best_acc = 0.0
    best_wts = copy.deepcopy(model.state_dict())
    no_improve = 0

    for epoch in range(EPOCHS):
        print(f"\nEpoch {epoch+1}/{EPOCHS}")

        for phase in ["train", "val"]:
            model.train() if phase == "train" else model.eval()

            total_loss = 0
            total_correct = 0

            loop = tqdm(dataloaders[phase], desc=phase)
            for x, y in loop:
                x, y = x.to(device), y.to(device)

                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == "train"):
                    out = model(x)
                    _, preds = torch.max(out, 1)
                    loss = criterion(out, y)

                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                total_loss += loss.item() * x.size(0)
                total_correct += torch.sum(preds == y).item()

            if phase == "train":
                scheduler.step()

            epoch_loss = total_loss / sizes[phase]
            epoch_acc = total_correct / sizes[phase]

            print(f"  {phase} ‚Üí Loss: {epoch_loss:.4f} | Acc: {epoch_acc:.4f}")

            if phase == "val":
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_wts = copy.deepcopy(model.state_dict())
                    torch.save(model.state_dict(), f"{OUTPUT_MODEL_DIR}/best_model.pth")
                    print("  ‚úî Model improved ‚Üí saved")
                    no_improve = 0
                else:
                    no_improve += 1

        if no_improve >= PATIENCE:
            print("‚èπ Early stopping")
            break

    model.load_state_dict(best_wts)
    return model

# 5. EVALUATION

In [26]:
def evaluate(model, dataloader, class_names, device):
    model.eval()
    preds = []
    labels = []

    with torch.no_grad():
        for x, y in tqdm(dataloader, desc="Evaluating"):
            x = x.to(device)
            out = model(x)
            _, p = torch.max(out, 1)

            preds.extend(p.cpu().numpy())
            labels.extend(y.numpy())

    print("\nClassification Report:")
    print(classification_report(labels, preds, target_names=class_names))

    cm = confusion_matrix(labels, preds)
    print("\nConfusion Matrix:\n", cm)


# RUN PIPELINE

In [29]:
set_seed(42)

# Split (hanya jika folder belum ada)
if not Path(OUT_DIR).exists():
    split_dataset(RAW_DIR, OUT_DIR)

# Loaders
loaders, class_names, sizes = create_dataloaders(OUT_DIR)

with open("class_names.json", "w") as f:
    json.dump(class_names, f)

print("Classes:", class_names)
print("Sizes:", sizes)

# Model
device = "cuda" if torch.cuda.is_available() else "cpu"
model = build_model(len(class_names)).to(device)

# Training
model = train_model(model, loaders, sizes, device)

# Evaluation
evaluate(model, loaders["val"], class_names, device)


Classes: ['battery', 'biological', 'cardboard', 'clothes', 'glass', 'metal', 'paper', 'plastic', 'shoes', 'trash']
Sizes: {'train': 15806, 'val': 3956}

Epoch 1/15


train:   4%|‚ñç         | 19/494 [02:06<52:37,  6.65s/it]


KeyboardInterrupt: 