# ðŸŸ¢ GreenFlow Rabat â€” Edge AI Traffic Intelligence
## Hackathon RamadnIA 2026

**Pipeline:** CCTV â†’ RTSP â†’ Edge Device (YOLO11n) â†’ JSON Webhook â†’ n8n Orchestration

**Classes:**
| ID | Label | Description |
|----|-------|-------------|
| 0 | License Plate | Moroccan plates |
| 1 | Car | Standard vehicles |
| 2 | Grand Taxi | Mercedes W123/W124 shared taxis |
| 3 | Triporteur | 3-wheeled cargo motos |


In [4]:
# ============================================================
# CELL 1: Environment Setup & GPU Check
# ============================================================
# Run this first every session. No HuggingFace, no Colab deps.
# Works on: Local machine, Colab, Kaggle, Paperspace.
# ============================================================

import subprocess, sys

def install(pkg):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

# Core dependencies
for package in ["ultralytics", "roboflow", "opencv-python-headless", "requests"]:
    install(package)

import torch, platform

print("=" * 50)
print("GreenFlow Rabat â€” Environment Check")
print("=" * 50)
print(f"Python:   {platform.python_version()}")
print(f"PyTorch:  {torch.__version__}")
print(f"CUDA:     {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU:      {torch.cuda.get_device_name(0)}")
else:
    print("GPU:      None (CPU mode â€” OK for data prep, slow for training)")
print(f"Platform: {platform.system()} {platform.machine()}")
print("=" * 50)

GreenFlow Rabat â€” Environment Check
Python:   3.12.12
PyTorch:  2.9.0+cu128
CUDA:     True
GPU:      Tesla T4
Platform: Linux x86_64


In [5]:
# ============================================================
# CELL 2: Frame Extraction from Local Rabat Driving Video
# ============================================================
# Extracts 1 frame every 5 seconds from a 40-min 1080p video.
# ~480 frames â†’ sent to Roboflow Label Assist for annotation.
# ============================================================

import cv2
import os
from pathlib import Path

# ---- CONFIGURATION ----
VIDEO_PATH = r"path\to\YTDown.com_YouTube_Driving-in-Rabat.mp4"  # <-- UPDATE THIS
OUTPUT_DIR = Path("extracted_frames")
EXTRACT_EVERY_N_SECONDS = 5
# ------------------------

def extract_frames(video_path: str, output_dir: Path, interval_sec: int = 5) -> int:
    """Extract frames from video at fixed time intervals.
    
    Args:
        video_path:  Path to the source .mp4 file.
        output_dir:  Directory to save extracted .jpg frames.
        interval_sec: Seconds between each captured frame.
    
    Returns:
        Number of frames extracted.
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open video: {video_path}")
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration_sec = total_frames / fps
    frame_interval = int(fps * interval_sec)
    
    print(f"Video: {fps:.1f} FPS | {total_frames} frames | {duration_sec/60:.1f} min")
    print(f"Extracting 1 frame every {interval_sec}s (every {frame_interval} frames)...")
    
    saved = 0
    frame_idx = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_idx % frame_interval == 0:
            filename = output_dir / f"rabat_{frame_idx:06d}.jpg"
            cv2.imwrite(str(filename), frame)
            saved += 1
        frame_idx += 1
    
    cap.release()
    print(f"âœ“ Extracted {saved} frames â†’ {output_dir}/")
    return saved

# Execute
if os.path.exists(VIDEO_PATH):
    n = extract_frames(VIDEO_PATH, OUTPUT_DIR, EXTRACT_EVERY_N_SECONDS)
else:
    print(f"âš  Video not found at: {VIDEO_PATH}")
    print("  Update VIDEO_PATH above, then re-run this cell.")

âš  Video not found at: path\to\YTDown.com_YouTube_Driving-in-Rabat.mp4
  Update VIDEO_PATH above, then re-run this cell.


In [6]:
# ============================================================
# CELL 3: Download Base Dataset from Roboflow
# ============================================================
# Source: CCTV.v9i.yolov8 â€” pre-labeled CCTV footage.
# Merged with our Rabat frames after Label Assist annotation.
# ============================================================

from roboflow import Roboflow

# ---- CONFIGURATION ----
ROBOFLOW_API_KEY = "YOUR_API_KEY"  # <-- UPDATE THIS (Settings > API Key)
WORKSPACE = "YOUR_WORKSPACE"       # <-- UPDATE THIS
PROJECT   = "YOUR_PROJECT"         # <-- UPDATE THIS (e.g., "cctv-xxxxx")  
VERSION   = 9                      # <-- Dataset version number
# ------------------------

rf = Roboflow(api_key=ROBOFLOW_API_KEY)
project = rf.workspace(WORKSPACE).project(PROJECT)
dataset = project.version(VERSION).download("yolov8")

print(f"\nâœ“ Dataset downloaded to: {dataset.location}")
print(f"  Train: {dataset.location}/train/")
print(f"  Valid: {dataset.location}/valid/")
print(f"  Test:  {dataset.location}/test/")

upload and label your dataset, and get an API KEY here: https://app.roboflow.com/?model=undefined&ref=undefined
loading Roboflow workspace...


RoboflowError: {"error":{"message":"Unsupported get request. Workspace with ID \"YOUR_WORKSPACE\" does not exist or cannot be loaded due to missing permissions.","status":404,"type":"GraphMethodException","hint":"You can see your available workspaces by issuing a GET request to /workspaces"}}

In [None]:
# ============================================================
# CELL 4: Train YOLO11n on Merged Dataset
# ============================================================
# YOLO11n chosen over YOLOv8n for:
#   - C2PSA attention â†’ better small object detection
#   - 20% fewer FLOPs â†’ faster on RPi5/Jetson
#   - Same Ultralytics API â†’ zero migration cost
# ============================================================

from ultralytics import YOLO
import torch

# ---- CONFIGURATION ----
DATA_YAML    = "path/to/data.yaml"   # <-- UPDATE: Roboflow dataset.location + "/data.yaml"
EPOCHS       = 100
IMG_SIZE     = 640                   # Native YOLO input resolution
BATCH_SIZE   = 16                   # Reduce to 8 if OOM on free Colab
MODEL_BASE   = "yolo11n.pt"         # Nano variant for edge deployment
PROJECT_NAME = "greenflow_rabat"
RUN_NAME     = "v1_cctv_rabat"
# ------------------------

model = YOLO(MODEL_BASE)

results = model.train(
    data=DATA_YAML,
    epochs=EPOCHS,
    imgsz=IMG_SIZE,
    batch=BATCH_SIZE,
    project=PROJECT_NAME,
    name=RUN_NAME,
    
    # --- Small Object Optimizations ---
    mosaic=1.0,          # Mosaic augmentation: 4 images in 1 â†’ more small objects per batch
    scale=0.9,           # Aggressive scale augmentation to simulate far/near objects
    mixup=0.15,          # Light mixup to improve generalization
    copy_paste=0.1,      # Copy-paste augmentation for rare classes (Triporteur)
    
    # --- Training Quality ---
    patience=15,         # Early stopping if no improvement for 15 epochs
    save_period=10,      # Checkpoint every 10 epochs
    cos_lr=True,         # Cosine learning rate scheduler
    lr0=0.01,            # Initial learning rate
    lrf=0.01,            # Final learning rate factor
    
    # --- Hardware ---
    device=0 if torch.cuda.is_available() else "cpu",
    workers=4,
    amp=True,            # Mixed precision (FP16) â€” 2x speedup on GPU
    
    verbose=True,
)

print("\n" + "=" * 50)
print("Training complete!")
print(f"  Best weights: {results.save_dir}/weights/best.pt")
print("=" * 50)

In [None]:
# ============================================================
# CELL 5: Validate & Visualize Results
# ============================================================

from ultralytics import YOLO
from pathlib import Path

# ---- CONFIGURATION ----
BEST_WEIGHTS = "greenflow_rabat/v1_cctv_rabat/weights/best.pt"  # <-- auto from training
# ------------------------

model = YOLO(BEST_WEIGHTS)

# Run validation on test set
metrics = model.val()

print("=" * 50)
print("Validation Results")
print("=" * 50)
print(f"  mAP@0.5:      {metrics.box.map50:.4f}")
print(f"  mAP@0.5:0.95: {metrics.box.map:.4f}")
print(f"  Precision:     {metrics.box.mp:.4f}")
print(f"  Recall:        {metrics.box.mr:.4f}")

# Per-class breakdown
class_names = ["License Plate", "Car", "Grand Taxi", "Triporteur"]
print("\nPer-Class mAP@0.5:")
for i, name in enumerate(class_names):
    if i < len(metrics.box.maps):
        print(f"  {name:15s}: {metrics.box.maps[i]:.4f}")
print("=" * 50)

In [None]:
# ============================================================
# CELL 6: Export Model for Edge Deployment
# ============================================================
# Export to ONNX (universal) + NCNN (Raspberry Pi optimized)
# or TensorRT (Jetson optimized). Choose based on your HW.
# ============================================================

from ultralytics import YOLO

BEST_WEIGHTS = "greenflow_rabat/v1_cctv_rabat/weights/best.pt"
model = YOLO(BEST_WEIGHTS)

# --- Option A: ONNX (Universal â€” works on RPi5 & Jetson) ---
onnx_path = model.export(format="onnx", imgsz=640, half=False, simplify=True)
print(f"âœ“ ONNX exported: {onnx_path}")

# --- Option B: NCNN (Best for Raspberry Pi 5 CPU) ---
# Uncomment below for RPi5 deployment:
# ncnn_path = model.export(format="ncnn", imgsz=640, half=True)
# print(f"âœ“ NCNN exported: {ncnn_path}")

# --- Option C: TensorRT (Best for Jetson with GPU) ---
# Uncomment below for Jetson deployment:
# engine_path = model.export(format="engine", imgsz=640, half=True)
# print(f"âœ“ TensorRT exported: {engine_path}")

print("\nDeployment Guide:")
print("  RPi5 (CPU):     Use NCNN export â†’ ~10-14 FPS @ 640px")
print("  Jetson Nano:    Use TensorRT FP16 â†’ ~30-40 FPS @ 640px")
print("  Jetson Orin:    Use TensorRT FP16 â†’ ~80+ FPS @ 640px")

In [None]:
# ============================================================
# CELL 7: Real-Time RTSP Inference + n8n Webhook Dispatch
# ============================================================
# THIS IS THE EDGE DEPLOYMENT SCRIPT.
# Runs on RPi5 / Jetson connected to CCTV via RTSP.
# Sends lightweight JSON to n8n â€” NEVER sends images.
# ============================================================

import cv2
import json
import time
import requests
from datetime import datetime
from collections import defaultdict
from ultralytics import YOLO

# ---- CONFIGURATION ----
RTSP_URL     = "rtsp://admin:password@192.168.1.100:554/stream1"  # <-- CCTV RTSP URL
MODEL_PATH   = "greenflow_rabat/v1_cctv_rabat/weights/best.onnx"  # <-- Exported model
N8N_WEBHOOK  = "http://localhost:5678/webhook/greenflow-traffic"   # <-- n8n webhook URL
CONFIDENCE   = 0.4          # Minimum detection confidence
REPORT_INTERVAL = 30        # Send report to n8n every N seconds
CONGESTION_THRESHOLD = 15   # Vehicle count that triggers "congested" alert
CLASS_NAMES  = {0: "license_plate", 1: "car", 2: "grand_taxi", 3: "triporteur"}
# ------------------------

def build_traffic_payload(detections: list, fps: float, camera_id: str = "CAM-RABAT-01") -> dict:
    """Build a lightweight JSON payload from detection results.
    
    Privacy-by-Design: NO images, NO coordinates â€” only counts and metadata.
    """
    counts = defaultdict(int)
    confidences = defaultdict(list)
    
    for det in detections:
        cls_id = int(det.cls)
        cls_name = CLASS_NAMES.get(cls_id, f"unknown_{cls_id}")
        counts[cls_name] += 1
        confidences[cls_name].append(float(det.conf))
    
    total_vehicles = counts.get("car", 0) + counts.get("grand_taxi", 0) + counts.get("triporteur", 0)
    
    # Congestion level logic
    if total_vehicles >= CONGESTION_THRESHOLD * 2:
        congestion = "CRITICAL"
    elif total_vehicles >= CONGESTION_THRESHOLD:
        congestion = "HIGH"
    elif total_vehicles >= CONGESTION_THRESHOLD * 0.5:
        congestion = "MODERATE"
    else:
        congestion = "LOW"
    
    # Green light recommendation (consultative architecture)
    if congestion in ("CRITICAL", "HIGH"):
        green_advice_sec = 90   # Extend green to flush queue
    elif congestion == "MODERATE":
        green_advice_sec = 45   # Standard duration
    else:
        green_advice_sec = 20   # Short green, low traffic
    
    return {
        "camera_id": camera_id,
        "timestamp": datetime.now().isoformat(),
        "fps": round(fps, 1),
        "total_vehicles": total_vehicles,
        "congestion_level": congestion,
        "green_light_advice_seconds": green_advice_sec,
        "vehicle_counts": dict(counts),
        "avg_confidence": {k: round(sum(v)/len(v), 3) for k, v in confidences.items()},
        "license_plates_detected": counts.get("license_plate", 0),
    }


def send_to_n8n(payload: dict, webhook_url: str) -> bool:
    """Send JSON payload to n8n webhook. Fail silently on network error."""
    try:
        resp = requests.post(webhook_url, json=payload, timeout=5)
        return resp.status_code == 200
    except requests.RequestException:
        return False


def run_edge_inference(rtsp_url: str, model_path: str, webhook_url: str):
    """Main inference loop for edge deployment."""
    
    model = YOLO(model_path, task="detect")
    cap = cv2.VideoCapture(rtsp_url)
    
    if not cap.isOpened():
        print(f"ERROR: Cannot connect to RTSP stream: {rtsp_url}")
        return
    
    print(f"Connected to {rtsp_url}")
    print(f"Sending reports to n8n every {REPORT_INTERVAL}s")
    print("Press Ctrl+C to stop.\n")
    
    last_report_time = time.time()
    frame_count = 0
    fps_start = time.time()
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Stream interrupted. Reconnecting in 5s...")
                time.sleep(5)
                cap = cv2.VideoCapture(rtsp_url)
                continue
            
            # Run inference
            results = model(frame, conf=CONFIDENCE, verbose=False)
            frame_count += 1
            
            # Calculate FPS
            elapsed = time.time() - fps_start
            current_fps = frame_count / elapsed if elapsed > 0 else 0
            
            # Periodic n8n report
            if time.time() - last_report_time >= REPORT_INTERVAL:
                if results[0].boxes is not None and len(results[0].boxes) > 0:
                    payload = build_traffic_payload(results[0].boxes, current_fps)
                    sent = send_to_n8n(payload, webhook_url)
                    status = "SENT" if sent else "QUEUED (n8n offline)"
                    print(f"[{payload['timestamp']}] {payload['congestion_level']:8s} | "
                          f"Vehicles: {payload['total_vehicles']:3d} | "
                          f"FPS: {current_fps:.1f} | "
                          f"Green Advice: {payload['green_light_advice_seconds']}s | {status}")
                
                last_report_time = time.time()
                frame_count = 0
                fps_start = time.time()
                
    except KeyboardInterrupt:
        print("\nStopped by user.")
    finally:
        cap.release()

# ---- Uncomment to run on the edge device ----
# run_edge_inference(RTSP_URL, MODEL_PATH, N8N_WEBHOOK)
print("Edge inference script ready.")
print("Uncomment the last line and run on your RPi5/Jetson.")

In [None]:
# ============================================================
# CELL 8: Simulate Edge Inference on a Test Image/Video
# ============================================================
# Use this to demo the pipeline without an RTSP camera.
# Feed it a local video or image and see the JSON output.
# ============================================================

from ultralytics import YOLO
from collections import defaultdict
from datetime import datetime
import cv2, json

BEST_WEIGHTS = "greenflow_rabat/v1_cctv_rabat/weights/best.pt"
TEST_SOURCE  = "path/to/test_video_or_image.mp4"  # <-- UPDATE THIS
CLASS_NAMES  = {0: "license_plate", 1: "car", 2: "grand_taxi", 3: "triporteur"}

model = YOLO(BEST_WEIGHTS)
results = model(TEST_SOURCE, conf=0.4, save=True, stream=False)

# Build sample payload from first result
for r in results:
    counts = defaultdict(int)
    if r.boxes is not None:
        for box in r.boxes:
            cls_name = CLASS_NAMES.get(int(box.cls), "unknown")
            counts[cls_name] += 1
    
    sample_payload = {
        "camera_id": "DEMO-01",
        "timestamp": datetime.now().isoformat(),
        "total_vehicles": counts.get("car", 0) + counts.get("grand_taxi", 0) + counts.get("triporteur", 0),
        "vehicle_counts": dict(counts),
        "congestion_level": "HIGH" if sum(counts.values()) > 10 else "LOW",
        "green_light_advice_seconds": 90 if sum(counts.values()) > 10 else 30,
    }
    
    print("Sample n8n Webhook Payload:")
    print(json.dumps(sample_payload, indent=2))
    break  # Show first frame only

print(f"\nAnnotated results saved to: {results[0].save_dir}")