## **Computer vision project: Deggendorf Waste Sorting Assistant**

### **Overview**
The Deggendorf Waste Sorting Assistant is a Computer Vision-based tool designed to help residents and international students correctly identify waste bins. The project leverages image classification to determine the category of a given waste bin based on its visual characteristics. Users can take a picture of an unlabeled bin, and the model will classify it while providing information on the appropriate waste materials for disposal.

### **Project Goals**
- Develop an image classification model capable of identifying waste bins in Deggendorf.
- Provide users with clear guidance on proper waste disposal based on bin classification.
- Document all processes in a Jupyter Notebook, covering dataset creation, model training, evaluation, and deployment.


### 1. Mount Google Drive & Interactive Labeling Utility

This section sets up everything you need to label images **in-Colab**:

1. Installs required packages  
2. Mounts your Drive  
3. Enables Colab’s custom widget manager for `ipywidgets`  
4. Defines constants, logging, and a CSV to track labels  
5. Provides an interactive widget UI to:
   - Scan `/MyDrive/cv_garbage` for unlabeled images  
   - Display one image at a time  
   - Pick a label from a fixed list  
   - Copy the image into `/MyDrive/cv_garbage/labled` with a standardized name  
   - Record `original_filename`, `new_filename`, `label`, and `timestamp` in `labels.csv`  

---

In [None]:
# 1.0 · Install/upgrade exact versions once per runtime
import importlib, subprocess, sys

def _ensure(pkg: str) -> None:
    """Install *pkg* if missing (quiet)."""
    module = pkg.split("==")[0].split(">=")[0]
    try:
        importlib.import_module(module)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", pkg])

for spec in (
    "ipywidgets>=8,<9",     # Colab requires 8.x
    "pandas>=2.2,<3",
    "pillow>=10,<11",
):
    _ensure(spec)


In [None]:
# 1.1 · Mount Drive & enable custom widgets
from google.colab import drive, output
drive.mount("/content/drive", force_remount=False)
output.enable_custom_widget_manager()


In [None]:
# 1.2 · Imports, paths, logging
from __future__ import annotations
from pathlib import Path
from datetime import datetime, timezone
import shutil, logging, uuid, io, pandas as pd
from PIL import Image as PILImage
import ipywidgets as w
from IPython.display import display

# ── configuration ────────────────────────────────────────────────────
BASE_DIR    = Path("/content/drive/MyDrive/cv_garbage")
LABELED_DIR = BASE_DIR / "labeled"
CSV_PATH    = LABELED_DIR / "labels.csv"
LABELS      = ["Restmüll", "Biomüll", "Papier", "Gelber Sack", "Glas"]

LABELED_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s: %(message)s",
    force=True,
)


In [None]:
# 1.3 · Labeler definition
class ColabLabeler:
    """
    Lightweight, widget-based image labeler for Google Colab.
    ─────────────────────────────────────────────────────────
    *   Renders fully inside the cell's output (no pop-ups)
    *   Uses a single widgets.Image for fast updates
    *   Writes CSV atomically to protect against Colab disconnects
    *   Generates UUID-based target names to avoid clashes
    """

    EXT = {".jpg", ".jpeg", ".png", ".bmp", ".gif", ".webp"}

    def __init__(
        self,
        src_dir: Path,
        dst_dir: Path,
        csv_path: Path,
        labels: list[str],
    ) -> None:
        self.src_dir, self.dst_dir, self.csv_path = map(Path, (src_dir, dst_dir, csv_path))
        self.labels     = labels
        self._df        = self._load_or_init_csv()
        self._images    = self._find_unlabeled()
        self._idx       = 0

        # widgets
        self._w_img   = w.Image(layout=w.Layout(max_width="640px", max_height="480px",
                                                border="1px solid #ccc"))
        self._w_lbl   = w.ToggleButtons(options=self.labels, description="Label:")
        self._w_next  = w.Button(description="Save & Next ▶️", button_style="success")
        self._w_skip  = w.Button(description="Skip ⏭️")
        self._w_stat  = w.HTML()

        self._w_next.on_click(self._on_save)
        self._w_skip.on_click(self._on_skip)

        self.ui = w.VBox([
            self._w_img,
            self._w_lbl,
            w.HBox([self._w_next, self._w_skip]),
            self._w_stat,
        ])

    # ── persistence ──────────────────────────────────────────────────
    def _load_or_init_csv(self) -> pd.DataFrame:
        if self.csv_path.exists():
            logging.info("Loaded %s", self.csv_path)
            return pd.read_csv(self.csv_path)
        logging.info("Creating %s", self.csv_path)
        return pd.DataFrame(columns=["original_filename", "new_filename", "label", "timestamp"])

    def _find_unlabeled(self) -> list[str]:
        imgs = sorted(p.name for p in self.src_dir.iterdir() if p.suffix.lower() in self.EXT)
        done = set(self._df["original_filename"])
        todo = [f for f in imgs if f not in done]
        logging.info("Unlabeled images: %d", len(todo))
        return todo

    def _write_row(self, orig: str, new: str, label: str) -> None:
        ts  = datetime.now(timezone.utc).isoformat(timespec="seconds")
        row = pd.DataFrame([[orig, new, label, ts]], columns=self._df.columns)
        self._df = pd.concat([self._df, row], ignore_index=True)

        tmp = self.csv_path.with_suffix(".tmp")
        self._df.to_csv(tmp, index=False)
        tmp.replace(self.csv_path)

    # ── UI helpers ────────────────────────────────────────────────────
    def _flash(self, msg: str, level: str = "info") -> None:
        color = dict(info="#333", success="green", warn="orange", error="red").get(level, "#333")
        self._w_stat.value = f"<span style='color:{color}'>{msg}</span>"

    def _render_current(self) -> None:
        if self._idx >= len(self._images):
            self._w_img.value = b""
            self._flash("🎉 All images processed.", "success")
            self._w_next.disabled = self._w_skip.disabled = True
            return

        fname = self._images[self._idx]
        path  = self.src_dir / fname
        try:
            with PILImage.open(path) as im:
                im.thumbnail((640, 480))
                buf = io.BytesIO()
                im.save(buf, format="PNG")
                self._w_img.value  = buf.getvalue()
                self._w_img.format = "png"
        except Exception as exc:
            logging.exception("Failed to load %s", path)
            self._w_img.value = b""
            self._flash(f"❌ Error loading {fname}: {exc}", "error")
            return

        self._flash(f"{self._idx + 1} / {len(self._images)} · {fname}")

    # ── callbacks ────────────────────────────────────────────────────
    def _on_save(self, _btn) -> None:
        orig  = self._images[self._idx]
        label = self._w_lbl.value.replace(" ", "_")
        ext   = Path(orig).suffix
        new   = f"{label}_{uuid.uuid4().hex[:8]}{ext}"

        try:
            shutil.copy2(self.src_dir / orig, self.dst_dir / new)
            self._write_row(orig, new, label)
            self._flash(f"✔️ Saved {new}", "success")
        except Exception as exc:
            logging.exception("Copy failed for %s", orig)
            self._flash(f"❌ {exc}", "error")
            return

        self._idx += 1
        self._render_current()

    def _on_skip(self, _btn) -> None:
        self._idx += 1
        self._flash("⏭️ Skipped.", "warn")
        self._render_current()

    # ── public ────────────────────────────────────────────────────────
    def start(self) -> None:
        if not self._images:
            self._flash(f"Nothing to label in {self.src_dir}.", "warn")
        else:
            self._render_current()
        display(self.ui)

In [None]:
# 1.4 · Launch
labeler = ColabLabeler(
    src_dir=BASE_DIR,
    dst_dir=LABELED_DIR,
    csv_path=CSV_PATH,
    labels=LABELS,
)
labeler.start()

### 2. Import Required Libraries for the Rest of the Project

In [None]:
# 2.0 · Import Required Libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

### 3. Dataset Creation

In [None]:
'''
Code to split the dataset. Commented cause the dataset is already splitted and
ready to train

import os
import shutil
from collections import defaultdict
from sklearn.model_selection import train_test_split

# Paths
images_dir = 'YOLO_Dataset/images'
labels_dir = 'YOLO_Dataset/labels'

output_images_train = 'YOLO_Dataset/images/train'
output_images_val = 'YOLO_Dataset/images/val'
output_labels_train = 'YOLO_Dataset/labels/train'
output_labels_val = 'YOLO_Dataset/labels/val'

# Create output dirs
for d in [output_images_train, output_images_val, output_labels_train, output_labels_val]:
    os.makedirs(d, exist_ok=True)

# Gather all label files and their corresponding image
label_files = [f for f in os.listdir(labels_dir) if f.endswith('.txt')]
class_to_files = defaultdict(list)

# Group by class (based on the first class ID found in each file)
for label_file in label_files:
    label_path = os.path.join(labels_dir, label_file)
    with open(label_path, 'r') as f:
        lines = f.readlines()
        if not lines:
            continue
        classes = set([int(line.strip().split()[0]) for line in lines])
        # Assign to each class (multi-label will duplicate in multiple buckets)
        for cls in classes:
            class_to_files[cls].append(label_file)

# Merge files from all classes, deduplicate
all_files = set()
for file_list in class_to_files.values():
    all_files.update(file_list)

# Convert to list
all_files = list(all_files)

# Split balanced by filename (not perfect stratified but helps keep variation)
train_files, val_files = train_test_split(all_files, test_size=0.2, random_state=42)

# Helper to copy files
def copy_files(files, img_dst, label_dst):
    for label_file in files:
        img_file = label_file.replace('.txt', '.jpg')  # or .png if you use that
        src_img = os.path.join(images_dir, img_file)
        src_label = os.path.join(labels_dir, label_file)
        if os.path.exists(src_img) and os.path.exists(src_label):
            shutil.copy(src_img, os.path.join(img_dst, img_file))
            shutil.copy(src_label, os.path.join(label_dst, label_file))

# Copy to train/val
copy_files(train_files, output_images_train, output_labels_train)
copy_files(val_files, output_images_val, output_labels_val)

print(f"✅ Done! Train: {len(train_files)}, Val: {len(val_files)}")
'''

### 4. Model Training

In [None]:
from ultralytics import YOLO

# Load a YOLOv8 model (you can try yolov8n, yolov8s, etc.)
model = YOLO('yolov8n.pt')  # nano version for fast training/testing

# Train the model
model.train(
    data='YOLO_Dataset/data.yaml',   # Path to your data.yaml
    epochs=50,
    imgsz=640,
    batch=16,
    name='waste-bin-detector',
    project='trash_yolo_project',
)

### 5. Evaluation & Deployment

In [None]:
# TODO: code for evaluating accuracy, exporting a TensorFlow Lite model, etc. …