# Nhận dạng & phân loại khuẩn lạc vi sinh vật bằng Ultralytics YOLO (Detect 24 classes)

Notebook này chạy end-to-end trên **Kaggle**:
1) **Tải dữ liệu** từ Figshare (qua Figshare REST API) *(cần bật Internet trong Kaggle Notebook Settings)*  
2) **Chuẩn hoá** dataset sang format Ultralytics YOLO + **train/val/test split**  
3) **Train YOLO** (object detection + classification theo 24 loài)  
4) **Evaluate** + **Inference demo**  
5) **Export** (best.pt + ONNX)

Dataset nguồn: *Annotated dataset for deep-learning-based bacterial colony detection* (Makrai et al., Scientific Data 2023) — 369 ảnh, 24 loài, 56,865 khuẩn lạc (bbox + label).

> Gợi ý: Nếu download quá chậm, bạn có thể **tải dữ liệu về máy**, upload thành **Kaggle Dataset**, rồi đặt `DOWNLOAD_FROM_FIGSHARE = False` và trỏ `RAW_DIR` sang `/kaggle/input/...`.


In [None]:
# === 0) Setup ===
# Bật GPU: Settings -> Accelerator -> GPU
# Bật Internet: Settings -> Internet -> On (để download Figshare)

!pip install -q ultralytics opencv-python matplotlib pandas pyyaml requests scikit-learn tqdm

import os, re, json, zipfile, shutil
from pathlib import Path

import numpy as np
import requests
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import yaml

print("OK")


## 1) Tải dữ liệu từ Figshare (hoặc dùng Kaggle Input)

Mặc định notebook sẽ:
- Tải `annot_YOLO.zip`
- Tải toàn bộ ảnh `*.jpg`

Figshare web UI có thể chặn crawler, nên mình dùng **Figshare REST API** để lấy `download_url` trực tiếp cho từng file.


In [None]:
# === 1) Download dataset ===

DOWNLOAD_FROM_FIGSHARE = True

FIGSHARE_ARTICLE_ID = 22022540
RAW_DIR = Path("/kaggle/working/raw_figshare")  # nơi lưu file download

# Nếu bạn đã add dataset vào Kaggle Input, đặt:
# DOWNLOAD_FROM_FIGSHARE = False
# RAW_DIR = Path("/kaggle/input/<your-dataset-folder>")

FIGSHARE_API = "https://api.figshare.com/v2"

def figshare_get_json(url: str, timeout: int = 60):
    r = requests.get(url, timeout=timeout)
    r.raise_for_status()
    return r.json()

def figshare_list_files(article_id: int):
    return figshare_get_json(f"{FIGSHARE_API}/articles/{article_id}/files")

def figshare_file_details(article_id: int, file_id: int):
    return figshare_get_json(f"{FIGSHARE_API}/articles/{article_id}/files/{file_id}")

def download_url_to_path(download_url: str, out_path: Path, chunk_size: int = 1024*1024):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with requests.get(download_url, stream=True, timeout=120) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length", "0") or "0")
        pbar = tqdm(total=total, unit="B", unit_scale=True, desc=out_path.name)
        with open(out_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if not chunk:
                    continue
                f.write(chunk)
                pbar.update(len(chunk))
        pbar.close()

def download_figshare_article(article_id: int, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    files = figshare_list_files(article_id)

    # Chỉ tải YOLO annotation zip + jpg images
    wanted = []
    for f in files:
        name = f.get("name", "")
        if name == "annot_YOLO.zip" or name.lower().endswith(".jpg") or name.lower().endswith(".jpeg"):
            wanted.append(f)

    print(f"Found {len(files)} files on Figshare; will download {len(wanted)} file(s).")

    for f in wanted:
        fid = int(f["id"])
        name = str(f["name"])
        out_path = out_dir / name
        if out_path.exists():
            continue

        details = figshare_file_details(article_id, fid)
        download_url = details.get("download_url")
        if not download_url:
            raise RuntimeError(f"No download_url for file {name} (id={fid})")

        print(f"Downloading: {name}")
        download_url_to_path(download_url, out_path)

if DOWNLOAD_FROM_FIGSHARE:
    download_figshare_article(FIGSHARE_ARTICLE_ID, RAW_DIR)

# Quick sanity checks
print("RAW_DIR:", RAW_DIR)
print("Has annot_YOLO.zip:", (RAW_DIR/"annot_YOLO.zip").exists())
n_imgs = len(list(RAW_DIR.glob("*.jpg"))) + len(list(RAW_DIR.glob("*.JPG")))
print("Number of images:", n_imgs)


## 2) Chuẩn hoá dataset về format Ultralytics YOLO + chia train/val/test

Ultralytics YOLO cần cấu trúc kiểu:

```
dataset/
  images/train, images/val, images/test
  labels/train, labels/val, labels/test
  data.yaml
```

(Trong `data.yaml` có `path`, `train`, `val`, `names`...)


In [None]:
# === 2) Prepare dataset ===

OUT_DIR = Path("/kaggle/working/yolo_dataset")
OUT_DIR.mkdir(parents=True, exist_ok=True)

DEFAULT_CLASS_NAMES = [
    "Actinobacillus equuli",
    "Actinobacillus pleuropneumoniae",
    "Aeromonas hydrophila",
    "Bacillus cereus",
    "Bibersteinia trehalosi",
    "Bordetella bronchiseptica",
    "Brucella ovis",
    "Clostridium perfringens",
    "Corynebacterium pseudotuberculosis",
    "Erysipelothrix rhusiopathiae",
    "Escherichia coli",
    "Glaesserella parasuis",
    "Klebsiella pneumoniae",
    "Listeria monocytogenes",
    "Paenibacillus larvae",
    "Pasteurella multocida",
    "Proteus mirabilis",
    "Pseudomonas aeruginosa",
    "Rhodococcus equi",
    "Salmonella enterica",
    "Staphylococcus aureus",
    "Staphylococcus hyicus",
    "Streptococcus agalactiae",
    "Trueperella pyogenes",
]

SPECIES_RE = re.compile(r"^(sp\d{2})_img\d+", re.IGNORECASE)

def infer_species_id(filename: str):
    m = SPECIES_RE.match(Path(filename).stem)
    return m.group(1).lower() if m else "unknown"

def unzip(zip_path: Path, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(out_dir)

def find_label_files(labels_root: Path):
    txts = []
    for p in labels_root.rglob("*.txt"):
        if p.name.lower() in {"classes.txt", "obj.names"}:
            continue
        txts.append(p)
    return sorted(txts)

def find_classes_file(labels_root: Path):
    for name in ["classes.txt", "obj.names"]:
        p = labels_root / name
        if p.exists():
            return p
    for p in labels_root.rglob("classes.txt"):
        return p
    for p in labels_root.rglob("obj.names"):
        return p
    return None

def load_class_names(labels_root: Path):
    cf = find_classes_file(labels_root)
    if cf:
        lines = [ln.strip() for ln in cf.read_text(encoding="utf-8", errors="ignore").splitlines() if ln.strip()]
        if lines:
            return lines
    return DEFAULT_CLASS_NAMES

def parse_label_line(line: str):
    parts = line.strip().split()
    cls = int(float(parts[0]))
    x, y, w, h = map(float, parts[1:5])
    return cls, x, y, w, h

def scan_class_range(label_files):
    mn, mx = 10**9, -10**9
    for lf in label_files:
        for line in lf.read_text(encoding="utf-8", errors="ignore").splitlines():
            line=line.strip()
            if not line:
                continue
            cls, *_ = parse_label_line(line)
            mn = min(mn, cls)
            mx = max(mx, cls)
    return mn, mx

def rewrite_labels(label_files, image_stems, out_labels_dir, class_shift: int):
    out_labels_dir.mkdir(parents=True, exist_ok=True)
    mapping = {}
    for lf in tqdm(label_files, desc="Rewrite labels"):
        stem = lf.stem
        if stem not in image_stems:
            continue
        out_path = out_labels_dir / f"{stem}.txt"
        out_lines = []
        for line in lf.read_text(encoding="utf-8", errors="ignore").splitlines():
            line=line.strip()
            if not line:
                continue
            cls, x, y, w, h = parse_label_line(line)
            cls2 = cls + class_shift
            out_lines.append(f"{cls2} {x:.6f} {y:.6f} {w:.6f} {h:.6f}")
        out_path.write_text("\n".join(out_lines) + ("\n" if out_lines else ""), encoding="utf-8")
        mapping[stem] = out_path
    return mapping

def copy_split(image_paths, labels_by_stem, images_out, labels_out):
    images_out.mkdir(parents=True, exist_ok=True)
    labels_out.mkdir(parents=True, exist_ok=True)
    for img in tqdm(image_paths, desc=f"Copy {images_out.name}"):
        stem = img.stem
        shutil.copy2(img, images_out / img.name)
        lf = labels_by_stem.get(stem)
        if lf and lf.exists():
            shutil.copy2(lf, labels_out / f"{stem}.txt")
        else:
            (labels_out / f"{stem}.txt").write_text("", encoding="utf-8")

def write_data_yaml(out_dir: Path, class_names):
    data = {
        "path": str(out_dir.resolve()),
        "train": "images/train",
        "val": "images/val",
        "test": "images/test",
        "names": {i: n for i, n in enumerate(class_names)},
    }
    yaml_path = out_dir / "data.yaml"
    yaml_path.write_text(yaml.safe_dump(data, sort_keys=False, allow_unicode=True), encoding="utf-8")
    return yaml_path

# Images
image_paths = sorted(list(RAW_DIR.glob("*.jpg")) + list(RAW_DIR.glob("*.JPG")))
if not image_paths:
    image_paths = sorted(list(RAW_DIR.rglob("*.jpg")) + list(RAW_DIR.rglob("*.JPG")))
print("Images:", len(image_paths))

# Labels
labels_root = OUT_DIR / "_labels_raw"
if not labels_root.exists():
    unzip(RAW_DIR / "annot_YOLO.zip", labels_root)

label_files = find_label_files(labels_root)
print("Label files:", len(label_files))

class_names = load_class_names(labels_root)
nc = len(class_names)

mn, mx = scan_class_range(label_files)
class_shift = 0
if mn == 1 and mx == nc:
    class_shift = -1
print(f"class range: min={mn}, max={mx}, nc={nc}, shift={class_shift}")

labels_clean_dir = OUT_DIR / "_labels_clean"
image_stems = {p.stem for p in image_paths}
labels_by_stem = rewrite_labels(label_files, image_stems, labels_clean_dir, class_shift)

# Split (stratified by species id inferred from filename)
y = [infer_species_id(p.name) for p in image_paths]

seed = 42
test_ratio = 0.15
val_ratio = 0.15
val_ratio_rel = val_ratio / (1.0 - test_ratio)

X_trainval, X_test, y_trainval, _ = train_test_split(image_paths, y, test_size=test_ratio, random_state=seed, stratify=y)
X_train, X_val, _, _ = train_test_split(X_trainval, y_trainval, test_size=val_ratio_rel, random_state=seed, stratify=y_trainval)

for split_name, X in [("train", X_train), ("val", X_val), ("test", X_test)]:
    copy_split(X, labels_by_stem, OUT_DIR/"images"/split_name, OUT_DIR/"labels"/split_name)

DATA_YAML = write_data_yaml(OUT_DIR, class_names)

print("✅ Prepared dataset at:", OUT_DIR)
print("✅ data.yaml:", DATA_YAML)
print("Split sizes:", len(X_train), len(X_val), len(X_test))


## 3) Train YOLO (Detect + 24-class)

Vì khuẩn lạc rất nhỏ và ảnh thường độ phân giải cao, bạn nên bắt đầu với:
- `imgsz=1024` (nếu GPU yếu thì giảm 640)
- `batch=4` (giảm nếu OOM)
- model nhỏ: `yolov8n.pt` hoặc `yolov8s.pt`


In [None]:
# === 3) Train ===
from ultralytics import YOLO

MODEL = "yolov8n.pt"  # thử 'yolov8s.pt' nếu GPU ổn hơn
EPOCHS = 50
IMGSZ = 1024
BATCH = 4

model = YOLO(MODEL)

results = model.train(
    data=str(DATA_YAML),
    epochs=EPOCHS,
    imgsz=IMGSZ,
    batch=BATCH,
    project="/kaggle/working/runs/detect",
    name="bacterial_colony_24cls",
)


## 4) Evaluate + Inference demo

- `model.val(...)` để đánh giá (mAP, precision/recall, v.v.)
- `model.predict(...)` để demo ảnh test


In [None]:
# === 4) Evaluate ===
best_pt = "/kaggle/working/runs/detect/bacterial_colony_24cls/weights/best.pt"
model = YOLO(best_pt)

metrics_val = model.val(data=str(DATA_YAML), split="val", imgsz=IMGSZ)
print(metrics_val)


In [None]:
# === 5) Predict on a few test images ===
test_dir = OUT_DIR / "images" / "test"
pred = model.predict(source=str(test_dir), imgsz=IMGSZ, conf=0.25, save=True,
                     project="/kaggle/working/runs/predict", name="demo")
print("Saved predictions to:", "/kaggle/working/runs/predict/demo")


## 5) Export (best.pt + ONNX)

Ultralytics hỗ trợ export sang nhiều định dạng (ONNX, TensorRT, ...).


In [None]:
# === 6) Export ===
# Copy best.pt ra working root (để dễ tải về)
import shutil, os
shutil.copy2(best_pt, "/kaggle/working/best.pt")
print("Saved:", "/kaggle/working/best.pt")

# Export ONNX
model.export(format="onnx", imgsz=IMGSZ)
print("Done export")
