## 1. Introduction & Objectives

Camera traps are motion- or heat-triggered cameras placed in the wild that capture images whenever something moves through the scene. They produce huge numbers of images, many of which are empty; the valuable ones contain animals (sometimes multiple species at once). Object detection models add bounding boxes and class labels to each animal instance, which is more informative than image-level classification: you can count individuals, know where they are, and trigger actions only when animals appear.

YOLOv8 is a modern one-stage detector that is fast, compact, and friendly to edge devices. In this notebook you'll build an end-to-end pipeline for a real camera-trap detection dataset (ENA24: Eastern North America) that includes bounding boxes and species labels. You'll learn **what** happens at every step and **why** it matters, from data cleaning through training, evaluation, and exporting models for lightweight edge deployments (e.g., a Raspberry Pi or Jetson that records clips only when wildlife is detected).

**What we'll do**
- Set up a Colab GPU environment with YOLOv8.
- Download and explore a wildlife detection dataset with bounding boxes.
- Clean annotations, convert COCO → YOLO format, and make a train/val split.
- Train a small YOLOv8 model (transfer learning from COCO weights) suitable for Colab Free.
- Evaluate with metrics (mAP, precision, recall) and visual inspection.
- Run inference on sample images, including empty frames.
- Export to ONNX/TorchScript for edge-device deployment.

Throughout, you'll see why each step exists (e.g., cleaning boxes to avoid bad labels) and how it connects to an edge workflow that saves video clips only when animals enter the frame.

## 2. Setup and Dependencies

- Colab GPUs (T4/P100) dramatically speed up training vs CPU.
- YOLOv8 is chosen because it offers small, fast variants (\`yolov8n\`, \`yolov8s\`) that run well on edge hardware while keeping accuracy competitive.
- We'll install only the essentials: YOLOv8 (\`ultralytics\`), Torch, COCO helpers, and plotting utilities.

In [None]:
# Check hardware and install dependencies
import subprocess, sys

gpu_info = subprocess.run("nvidia-smi -L", shell=True, capture_output=True, text=True)
if gpu_info.returncode == 0 and gpu_info.stdout.strip():
    print("GPU detected:\n", gpu_info.stdout)
else:
    print("No GPU detected. Switch Colab to a GPU runtime for much faster training.")

packages = [
    "ultralytics==8.1.34",  # YOLOv8
    "opencv-python",
    "matplotlib",
    "seaborn",
    "pyyaml",
    "pycocotools",
    "pandas",
    "requests"
]
print("Installing packages ... this takes ~1 minute on Colab")
subprocess.run(f"pip install -q {' '.join(packages)}", shell=True, check=True)
print("Done.")


In [None]:
# Imports, paths, and basic config
import json, random, shutil, zipfile, tarfile, os
from pathlib import Path
from collections import defaultdict, Counter

import cv2
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import torch
import yaml
from IPython.display import Image, display
from ultralytics import YOLO

sns.set_style("whitegrid")
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# Base working folders (Colab uses /content by default)
BASE_DIR = Path("/content/wildlife_yolov8")
DATA_ROOT = BASE_DIR / "data"
RAW_DIR = DATA_ROOT / "raw"
PROC_DIR = DATA_ROOT / "processed"
YOLO_DIR = DATA_ROOT / "wildlife_yolo"
for d in [BASE_DIR, DATA_ROOT, RAW_DIR, PROC_DIR, YOLO_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print(f"Base directory: {BASE_DIR}")
print(f"Data root: {DATA_ROOT}")
print(f"CUDA available: {torch.cuda.is_available()}")


## 3. Downloading and Exploring the Dataset

We use **ENA24** (Eastern North America) with COCO-style annotations. Two download options:
- **Full zip**: grabs the entire dataset (~1–2 GB). Good if you want everything offline.
- **Lightweight subset (metadata-driven)**: pulls the COCO JSON and only downloads a manageable number of annotated images (e.g., 500–1200) via HTTP. This is faster on Colab Free and mirrors the Gemini notebook's speed-focused approach.

Camera-trap quirks to remember: many empty frames, night IR lighting, class imbalance, and occasional bad boxes. We'll clean and filter in later steps.

In [None]:
# Download or locate the ENA24 detection dataset (COCO-style)
import subprocess, requests

# Choose download mode: "full_zip" (all data) or "subset" (faster, downloads only N annotated images)
DOWNLOAD_MODE = "subset"  # options: "subset" or "full_zip"
NUM_IMAGES_TO_DOWNLOAD = 800  # used only in subset mode; adjust for speed/coverage

# Drive toggle if you already have the data there
USE_DRIVE = False

RAW_EXTRACT_DIR = RAW_DIR / "ena24"
RAW_EXTRACT_DIR.mkdir(parents=True, exist_ok=True)

if USE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')
    RAW_EXTRACT_DIR = Path('/content/drive/MyDrive/ena24_detection')
    RAW_EXTRACT_DIR.mkdir(parents=True, exist_ok=True)

# Updated metadata and image URLs (with fallbacks across GCP/Azure/AWS)
METADATA_URLS = [
    "https://lilawildlife.blob.core.windows.net/lila-wildlife/ena24/ena24.json",
    "https://storage.googleapis.com/public-datasets-lila/ena24/ena24.json",
    "http://us-west-2.opendata.source.coop.s3.amazonaws.com/agentmorris/lila-wildlife/ena24/ena24.json",
]
IMAGE_ZIP_URLS = [
    "https://storage.googleapis.com/public-datasets-lila/ena24/ena24.zip",
    "https://lilawildlife.blob.core.windows.net/lila-wildlife/ena24/ena24.zip",
    "http://us-west-2.opendata.source.coop.s3.amazonaws.com/agentmorris/lila-wildlife/ena24/ena24.zip",
]
# Base URL list for per-image downloads in subset mode
IMAGE_BASE_URLS = [
    "https://lilawildlife.blob.core.windows.net/lila-wildlife/ena24/",
    "https://storage.googleapis.com/public-datasets-lila/ena24/",
    "http://us-west-2.opendata.source.coop.s3.amazonaws.com/agentmorris/lila-wildlife/ena24/",
]

METADATA_PATH = RAW_EXTRACT_DIR / "ena24_metadata.json"
ZIP_PATH = RAW_DIR / "ena24_detection.zip"

def smart_extract(archive_path: Path, dest: Path):
    """Extract .zip or .tar.gz archives to dest."""
    if archive_path.suffix == '.zip' and zipfile.is_zipfile(archive_path):
        with zipfile.ZipFile(archive_path, 'r') as zf:
            zf.extractall(dest)
    elif archive_path.suffixes[-2:] == ['.tar', '.gz']:
        with tarfile.open(archive_path, 'r:gz') as tf:
            tf.extractall(dest)
    else:
        raise ValueError(f"Unsupported archive type: {archive_path}")

def download_with_fallback(urls, dest):
    for url in urls:
        print(f"Trying {url} ...")
        result = subprocess.run(f"wget -O {dest} {url}", shell=True)
        if result.returncode == 0:
            print(f"Success: {url}")
            return url
    raise RuntimeError("All download URLs failed. Please update the URL list or mount the data from Drive.")

if DOWNLOAD_MODE == "full_zip":
    if not any(RAW_EXTRACT_DIR.iterdir()):
        if not ZIP_PATH.exists():
            print("Downloading ENA24 full zip with fallbacks...")
            download_with_fallback(IMAGE_ZIP_URLS, ZIP_PATH)
        print("Extracting dataset ...")
        smart_extract(ZIP_PATH, RAW_EXTRACT_DIR)
    else:
        print("Dataset already present, skipping download/extract.")

    image_dirs = sorted([p for p in RAW_EXTRACT_DIR.rglob('images') if p.is_dir()])
    annotation_files = sorted([p for p in RAW_EXTRACT_DIR.rglob('*.json')])
    if len(image_dirs) == 0:
        raise FileNotFoundError("Could not find an 'images' directory. Inspect RAW_EXTRACT_DIR and adjust paths.")
    if len(annotation_files) == 0:
        raise FileNotFoundError("Could not find any JSON annotation file. Please verify the dataset download.")
    IMAGES_ROOT = image_dirs[0]
    ANNOTATION_PATH = annotation_files[0]

else:  # subset mode: download metadata + only N annotated images
    if not METADATA_PATH.exists():
        print("Downloading metadata JSON (fallbacks)...")
        success = False
        for url in METADATA_URLS:
            try:
                resp = requests.get(url, timeout=30)
                resp.raise_for_status()
                METADATA_PATH.write_bytes(resp.content)
                print("Metadata saved to", METADATA_PATH)
                success = True
                break
            except Exception as e:
                print(f"Failed metadata URL {url}: {e}")
        if not success:
            raise RuntimeError("All metadata URLs failed. Update METADATA_URLS or mount the file from Drive.")
    else:
        print("Metadata already present.")

    with open(METADATA_PATH, 'r') as f:
        meta_coco = json.load(f)

    annotated_image_ids = set(ann['image_id'] for ann in meta_coco['annotations'])
    valid_images = [img for img in meta_coco['images'] if img['id'] in annotated_image_ids]
    random.shuffle(valid_images)
    subset = valid_images[:min(NUM_IMAGES_TO_DOWNLOAD, len(valid_images))]
    print(f"Downloading {len(subset)} annotated images (subset mode)...")

    IMAGES_ROOT = RAW_EXTRACT_DIR / "images_subset"
    IMAGES_ROOT.mkdir(parents=True, exist_ok=True)

    for img in subset:
        rel_path = Path(img['file_name'])
        dest_path = IMAGES_ROOT / rel_path
        dest_path.parent.mkdir(parents=True, exist_ok=True)
        if dest_path.exists():
            continue
        downloaded = False
        for base in IMAGE_BASE_URLS:
            url = f"{base}{img['file_name']}"
            try:
                r = requests.get(url, stream=True, timeout=30)
                if r.status_code == 200:
                    with open(dest_path, 'wb') as f_out:
                        f_out.write(r.content)
                    downloaded = True
                    break
            except Exception as e:
                print(f"Failed {url}: {e}")
        if not downloaded:
            print(f"Failed to download image {img['file_name']} from all bases.")

    ANNOTATION_PATH = METADATA_PATH

print(f"Using images from: {IMAGES_ROOT}")
print(f"Using annotations: {ANNOTATION_PATH}")


In [None]:
# Explore the raw data: inspect COCO structure and visualize a few examples

with open(ANNOTATION_PATH, 'r') as f:
    coco = json.load(f)

print(f"Images listed in COCO: {len(coco['images'])}")
print(f"Annotations: {len(coco['annotations'])}")
print(f"Categories: {len(coco['categories'])}")

category_id_to_name = {c['id']: c['name'] for c in coco['categories']}
print("Sample categories:", list(category_id_to_name.values())[:10])

def resolve_image_path(file_name: str) -> Path:
    """Best-effort resolver for file paths regardless of stored prefix."""
    direct = IMAGES_ROOT / file_name
    if direct.exists():
        return direct
    base = IMAGES_ROOT / Path(file_name).name
    if base.exists():
        return base
    return direct  # fallback (even if missing)

def draw_coco_example(image_meta, annotations):
    """Load one image and draw its bounding boxes with labels."""
    img_path = resolve_image_path(image_meta['file_name'])
    if not img_path.exists():
        print(f"Missing image: {img_path}")
        return
    img = cv2.cvtColor(cv2.imread(str(img_path)), cv2.COLOR_BGR2RGB)
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.imshow(img)
    ax.axis('off')
    for ann in annotations:
        x, y, w, h = ann['bbox']
        rect = plt.Rectangle((x, y), w, h, fill=False, color='lime', linewidth=2)
        ax.add_patch(rect)
        label = category_id_to_name.get(ann['category_id'], 'unknown')
        ax.text(x, y - 2, label, color='yellow', fontsize=10, bbox=dict(facecolor='black', alpha=0.5, pad=1))
    plt.show()

# Pick a few images that actually have annotations (skip empty frames)
ann_by_image = defaultdict(list)
for ann in coco['annotations']:
    ann_by_image[ann['image_id']].append(ann)

# Only keep images with at least one annotation
image_with_boxes = [img for img in coco['images'] if len(ann_by_image[img['id']]) > 0]
for sample in random.sample(image_with_boxes, min(3, len(image_with_boxes))):
    draw_coco_example(sample, ann_by_image[sample['id']])


## 4. Cleaning and Transforming the Data

Why clean?
- Broken images or impossible bounding boxes (negative sizes) harm training.
- Camera traps have many empty frames and long-tail classes; we'll keep the top-N most frequent species and optionally cap the number of images to stay within Colab Free limits.
- YOLO needs one text file per image (\`class_id x_center y_center width height\`, normalized 0–1). We'll convert from COCO's pixel-space format.

Controls in the next cell:
- \`TOP_K_CLASSES\`: keep the most common classes.
- \`MAX_IMAGES\`: limit total images for a quick run.
- \`MIN_BOX_AREA\`: drop tiny or zero-area boxes.

In [None]:
# Clean annotations and convert COCO → YOLO format
TOP_K_CLASSES = 6         # keep the most common classes
MAX_IMAGES = 1200         # None for full data, but 1.2k keeps Colab runs reasonable
MIN_BOX_AREA = 16         # drop boxes that are too small to be reliable

image_id_to_meta = {img['id']: img for img in coco['images']}
ann_by_image = defaultdict(list)
for ann in coco['annotations']:
    ann_by_image[ann['image_id']].append(ann)

# Remove invalid boxes and collect basic stats
valid_annotations = []
for ann in coco['annotations']:
    img_meta = image_id_to_meta.get(ann['image_id'])
    if img_meta is None:
        continue
    x, y, w, h = ann['bbox']
    if w <= 0 or h <= 0 or w * h < MIN_BOX_AREA:
        continue
    valid_annotations.append(ann)

present_class_counts = Counter([
    a['category_id']
    for a in valid_annotations
    if resolve_image_path(image_id_to_meta[a['image_id']]['file_name']).exists()
])
keep_categories = [cid for cid, _ in present_class_counts.most_common(TOP_K_CLASSES)]
print("Top classes (after presence check):")
for cid in keep_categories:
    print(f"  {cid}: {category_id_to_name[cid]} (count={present_class_counts[cid]})")

if not keep_categories:
    raise ValueError('No categories found after filtering. Adjust TOP_K_CLASSES, MAX_IMAGES, or ensure images downloaded.')
filtered_annotations = [
    a for a in valid_annotations
    if a['category_id'] in keep_categories
    and resolve_image_path(image_id_to_meta[a['image_id']]['file_name']).exists()
]
images_with_kept_classes = sorted({a['image_id'] for a in filtered_annotations})

# Keep only those whose image files actually exist (important in subset mode)
existing_images = []
for img_id in images_with_kept_classes:
    meta = image_id_to_meta[img_id]
    img_path = resolve_image_path(meta['file_name'])
    if img_path.exists():
        existing_images.append(img_id)

if MAX_IMAGES is not None:
    random.shuffle(existing_images)
    existing_images = existing_images[:MAX_IMAGES]
print(f"Images kept after filtering & existence check: {len(existing_images)}")

# Remap category IDs to 0..N-1 in a stable order
sorted_cats = sorted(keep_categories)
cat_id_to_yolo = {cid: idx for idx, cid in enumerate(sorted_cats)}
class_names = [category_id_to_name[cid] for cid in sorted_cats]

labels_dir = PROC_DIR / "labels"
images_dir = PROC_DIR / "images"
labels_dir.mkdir(parents=True, exist_ok=True)
images_dir.mkdir(parents=True, exist_ok=True)

converted = []  # list of {image_path, label_path}
skipped_missing = 0

for img_id in existing_images:
    meta = image_id_to_meta[img_id]
    src_img_path = resolve_image_path(meta['file_name'])
    if not src_img_path.exists():
        skipped_missing += 1
        continue
    stem = Path(meta['file_name']).stem
    ext = Path(meta['file_name']).suffix or '.jpg'
    dst_img_path = images_dir / f"{stem}{ext}"
    dst_label_path = labels_dir / f"{stem}.txt"

    # Copy image into the processed folder (small subset keeps storage reasonable)
    if not dst_img_path.exists():
        shutil.copy2(src_img_path, dst_img_path)

    lines = []
    for ann in ann_by_image[img_id]:
        if ann['category_id'] not in keep_categories:
            continue
        x, y, w, h = ann['bbox']
        x_c = (x + w / 2) / meta['width']
        y_c = (y + h / 2) / meta['height']
        w_n = w / meta['width']
        h_n = h / meta['height']
        # Clamp to [0,1] to avoid edge numeric issues
        x_c = min(max(x_c, 0.0), 1.0)
        y_c = min(max(y_c, 0.0), 1.0)
        w_n = min(max(w_n, 0.0), 1.0)
        h_n = min(max(h_n, 0.0), 1.0)
        cls = cat_id_to_yolo[ann['category_id']]
        lines.append(f"{cls} {x_c:.6f} {y_c:.6f} {w_n:.6f} {h_n:.6f}")

    if lines:
        with open(dst_label_path, 'w') as f:
            f.write("\n".join(lines))
        converted.append({"image_path": dst_img_path, "label_path": dst_label_path})

if not converted:
    raise ValueError('No images were converted. Check download mode, paths, or filtering settings.')
print(f"Converted {len(converted)} images. Missing images skipped: {skipped_missing}")
print(f"Class names (YOLO order): {class_names}")


## 5. Preparing YOLOv8 Dataset Structure

YOLO expects:
```
data/wildlife_yolo/
  images/train, images/val
  labels/train, labels/val  (same filenames, .txt)
wildlife.yaml  (paths + class names)
```
- We'll split train/val (80/20), copy files into the YOLO layout, and write a YAML config.
- The order of \`names\` in the YAML **must** match the numeric class IDs we wrote.

In [None]:
# Create YOLO directory tree and train/val split
train_ratio = 0.8
random.shuffle(converted)
split_idx = int(len(converted) * train_ratio)
train_items = converted[:split_idx]
val_items = converted[split_idx:]
if len(val_items) == 0 and len(train_items) > 1:
    val_items.append(train_items.pop())  # ensure we have a val set for metrics
elif len(val_items) == 0:
    raise ValueError('Not enough images to create a validation split. Reduce filtering or MAX_IMAGES.')

for subset, items in [("train", train_items), ("val", val_items)]:
    (YOLO_DIR / "images" / subset).mkdir(parents=True, exist_ok=True)
    (YOLO_DIR / "labels" / subset).mkdir(parents=True, exist_ok=True)
    for entry in items:
        dst_img = YOLO_DIR / "images" / subset / entry['image_path'].name
        dst_lbl = YOLO_DIR / "labels" / subset / entry['label_path'].name
        if not dst_img.exists():
            shutil.copy2(entry['image_path'], dst_img)
        if not dst_lbl.exists():
            shutil.copy2(entry['label_path'], dst_lbl)

yaml_path = YOLO_DIR / "wildlife.yaml"
yaml_dict = {
    "path": str(YOLO_DIR),
    "train": "images/train",
    "val": "images/val",
    "names": class_names
}
with open(yaml_path, 'w') as f:
    yaml.safe_dump(yaml_dict, f)

print(f"Train images: {len(train_items)}, Val images: {len(val_items)}")
print(f"Wrote dataset YAML to: {yaml_path}")


## 6. Training YOLOv8

Key ideas:
- **Transfer learning**: start from a COCO-pretrained checkpoint (\`yolov8n.pt\`) so we learn faster with fewer epochs.
- **Hyperparameters**: 
  - \`epochs\`: passes over the data (20–40 is fine for this subset).
  - \`batch\`: images per step (increase until GPU memory complaints).
  - \`imgsz\`: resize dimension; 640 balances detail vs speed.
  - \`patience\`: early stopping if metrics stop improving.
- **Model size trade-off**: \`yolov8n\` (nano) is fastest/lightest; \`yolov8s\` is a bit slower but more accurate. We'll use nano for Colab Free.

In [None]:
# Train a YOLOv8 nano model
batch_size = 16 if torch.cuda.is_available() else 4

model = YOLO('yolov8n.pt')
train_results = model.train(
    data=str(yaml_path),
    epochs=30,
    imgsz=640,
    batch=batch_size,
    patience=5,
    name='ena24_yolov8n',
    project=str(BASE_DIR / 'runs'),
    pretrained=True
)

print("Training complete. Best weights saved under:", Path(model.trainer.save_dir) / 'weights')


In [None]:
# Visualize training curves (loss, mAP, precision/recall) similar to the Gemini notebook summary
results_png = Path(model.trainer.save_dir) / "results.png"
if results_png.exists():
    display(Image(filename=str(results_png)))
else:
    print("results.png not found; verify training run path:", model.trainer.save_dir)


## 7. Evaluating the Model

- **mAP (mean Average Precision)**: area under the precision–recall curve across IoU thresholds and classes. Higher is better.
- **Precision**: of the predicted boxes, how many are correct (controls false positives).
- **Recall**: of the ground-truth boxes, how many did we find (controls false negatives).
- Visual inspection matters: numbers can look fine even if the model systematically confuses similar species. We'll inspect a few val images.

In [None]:
# Quantitative validation
best_weights = Path(model.trainer.save_dir) / 'weights' / 'best.pt'
if not best_weights.exists():
    raise FileNotFoundError('best.pt not found. Check training run or path.')
val_model = YOLO(best_weights)
val_results = val_model.val()
print(val_results)

# Visualize a handful of predictions on the val set
sample_paths = [entry['image_path'] for entry in random.sample(val_items, min(3, len(val_items)))]
if sample_paths:
    preds = val_model.predict(sample_paths, imgsz=640, conf=0.25)
    for res in preds:
        plotted = res.plot()  # BGR image with boxes drawn
        plt.figure(figsize=(8, 6))
        plt.imshow(cv2.cvtColor(plotted, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.show()
else:
    print('No validation images available for visualization.')


## 8. Running Inference on Sample Images

We'll build a small helper that takes an image path, runs the model, and plots the detections. Try both a crowded scene and an empty frame to see false positives.

On an edge device, replace the file path with frames from a webcam or video stream; the rest (model call + thresholding) is the same.

In [None]:
def run_inference(image_path, model, conf=0.25):
    """Run YOLO on one image and display results."""
    res = model.predict(image_path, imgsz=640, conf=conf)[0]
    plotted = res.plot()
    plt.figure(figsize=(8, 6))
    plt.imshow(cv2.cvtColor(plotted, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.show()
    return res

# Choose two examples (one possibly empty)
if len(val_items) >= 2:
    crowded = val_items[0]['image_path']
    maybe_empty = val_items[-1]['image_path']
    print("Crowded example:")
    _ = run_inference(crowded, val_model, conf=0.25)
    print("Potential empty/low-activity example:")
    _ = run_inference(maybe_empty, val_model, conf=0.10)
else:
    print("Not enough validation images to demo inference.")


## 9. Exporting the Model for Edge Deployment

- Find \`best.pt\` in the run folder.
- Export to **ONNX** (works in many runtimes, C++/Rust) and **TorchScript** (for PyTorch mobile/edge).
- Smaller models (\`yolov8n\`) or INT8 quantization are preferred on Raspberry Pi/Jetson. Use lower \`imgsz\` (e.g., 416) to trade detail for speed if needed.

In [None]:
# Export best weights to ONNX and TorchScript
export_model = YOLO(best_weights)
onnx_path = export_model.export(format='onnx', imgsz=640, opset=12, dynamic=True)
ts_path = export_model.export(format='torchscript', imgsz=640)

print("Exported:")
print("  ONNX:", onnx_path)
print("  TorchScript:", ts_path)

print("Files in weights folder:")
for p in (Path(best_weights).parent).glob('*'):
    print(" -", p.name)


## 10. Next Steps and Extensions

- Train on a larger mix (e.g., Caltech Camera Traps, Snapshot Serengeti, WCS) or your own camera data; merge datasets by harmonizing class names.
- Group rare species into broader buckets (e.g., "bird", "deer", "predator") to reduce class imbalance.
- Add **temporal cues**: run detection over short clips or consecutive frames to suppress false positives on empty frames.
- Experiment with **augmentation** (mixup, mosaic, random brightness) to improve low-light/night robustness.
- Deploy to edge: quantize the ONNX model (e.g., with ONNX Runtime or TensorRT), run inference in a loop, and trigger video recording or SMS alerts only when confidence crosses a threshold.
