In [1]:
import json
import os

# =============================================================================
# STEP 1: Define Parking Zones (Manual Annotation Based on Aerial Image)
# =============================================================================

output_dir = "../data"
output_json = os.path.join(output_dir, "parking_zones.json")

os.makedirs(output_dir, exist_ok=True)

# Parking zone definitions based on annotated aerial photo
# Red = occupied, Green = free
parking_zones = [
    {
        "zone_id": "spot_1",
        "status": "free",
        "points": [[930, 120], [970, 130], [960, 200], [920, 190]]
    },
    {
        "zone_id": "spot_2",
        "status": "free",
        "points": [[970, 130], [1010, 140], [1000, 210], [960, 200]]
    },
    {
        "zone_id": "spot_3",
        "status": "free",
        "points": [[1010, 140], [1050, 150], [1040, 220], [1000, 210]]
    },
    {
        "zone_id": "spot_4",
        "status": "free",
        "points": [[1050, 150], [1090, 160], [1080, 230], [1040, 220]]
    }
]

# Save to JSON
with open(output_json, "w") as f:
    json.dump(parking_zones, f, indent=2)

print(f"✅ Parking zone configuration saved to {output_json}")


✅ Parking zone configuration saved to ../data/parking_zones.json


In [2]:
# =============================================================================
# STEP 4 FULL FIXED: YOLOv8 PARKING MANAGEMENT VISUALIZATION
# =============================================================================

import cv2
import json
import numpy as np
from ultralytics import YOLO
from datetime import datetime
import os


class ParkingVisualizer:
    def __init__(self, model_path, region_json, video_path, output_path, summary_json=None):
        """
        YOLOv8-based Parking Visualization System
        """
        self.model = YOLO(model_path)
        self.video_path = video_path
        self.output_path = output_path
        self.summary_json = summary_json

        # Load regions from JSON
        with open(region_json, "r") as f:
            data = json.load(f)

        # Accept both dict and list JSON formats
        if isinstance(data, dict) and "parking_zones" in data:
            self.regions = data["parking_zones"]
        elif isinstance(data, list):
            self.regions = data
        else:
            raise ValueError("❌ Invalid JSON structure. Expected a list or dict with 'parking_zones' key.")

        # Colors for each state
        self.colors = {
            "occupied": (0, 0, 255),  # Red
            "free": (0, 255, 0),      # Green
            "available": (0, 255, 0)
        }

    def is_inside_region(self, point, polygon):
        """Check if an object's center point lies inside a polygon region."""
        return cv2.pointPolygonTest(np.array(polygon, np.int32), point, False) >= 0

    def run(self, show_live=False):
        """Run visualization and save annotated video + summary JSON."""
        cap = cv2.VideoCapture(self.video_path)
        if not cap.isOpened():
            print("❌ Error: Unable to open video file.")
            return

        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        os.makedirs(os.path.dirname(self.output_path), exist_ok=True)
        out = cv2.VideoWriter(
            self.output_path,
            cv2.VideoWriter_fourcc(*"mp4v"),
            fps,
            (frame_width, frame_height),
        )

        print("🚗 Processing video frames for parking visualization...")
        frame_count = 0
        total_spots = len(self.regions)
        start_time = datetime.now()

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            detections = self.model(frame, verbose=False)
            boxes = detections[0].boxes.xyxy.cpu().numpy() if detections and detections[0].boxes else []

            region_status = {r.get("zone_id", str(i)): "free" for i, r in enumerate(self.regions)}

            # --- Check detections ---
            for box in boxes:
                x1, y1, x2, y2 = map(int, box[:4])
                cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)

                for i, region in enumerate(self.regions):
                    polygon = region.get("points") or []
                    if polygon and self.is_inside_region((cx, cy), polygon):
                        region_status[region.get("zone_id", str(i))] = "occupied"

            # --- Draw polygons ---
            for i, region in enumerate(self.regions):
                polygon = np.array(region.get("points", []), np.int32)
                zone_id = region.get("zone_id", f"spot_{i+1}")
                status = region_status[zone_id]
                color = self.colors.get(status, (255, 255, 255))

                cv2.polylines(frame, [polygon], True, color, 2)
                cv2.putText(frame, f"{zone_id}: {status.upper()}",
                            (polygon[0][0], polygon[0][1] - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            occupied = list(region_status.values()).count("occupied")
            available = total_spots - occupied
            occupancy_rate = (occupied / total_spots) * 100 if total_spots else 0

            # Info box
            cv2.rectangle(frame, (10, 10), (400, 90), (50, 50, 50), -1)
            cv2.putText(frame, f"Total Spots: {total_spots}", (20, 35),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
            cv2.putText(frame, f"Available: {available}  Occupied: {occupied}", (20, 60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
            cv2.putText(frame, f"Occupancy: {occupancy_rate:.1f}%", (220, 85),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            out.write(frame)
            frame_count += 1

            if show_live:
                try:
                    cv2.imshow("Smart Parking Visualization", frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                except cv2.error:
                    print("⚠️ GUI not supported — skipping live display.")
                    show_live = False

        # --- Cleanup ---
        cap.release()
        out.release()

        # ✅ Safe for headless environments (no GUI crash)
        try:
            if show_live:
                cv2.destroyAllWindows()
        except Exception:
            pass

        duration = (datetime.now() - start_time).seconds

        print(f"✅ Saved to {self.output_path}")
        print(f"📊 Final Occupancy Rate: {occupancy_rate:.1f}% ({occupied}/{total_spots})")

        # Save JSON summary
        if self.summary_json:
            with open(self.summary_json, "w") as f:
                json.dump(region_status, f, indent=4)
            print(f"🧾 Summary JSON saved to {self.summary_json}")


# =============================================================================
# Example Usage
# =============================================================================

model_path = "../models/yolov8n.pt"
region_json = "../data/parking_zones.json"
video_path = "../data/sample_video.mp4"
output_path = "../output/parking_visual.mp4"
summary_json = "../output/final_parking_status.json"

visualizer = ParkingVisualizer(model_path, region_json, video_path, output_path, summary_json)
visualizer.run(show_live=False)


🚗 Processing video frames for parking visualization...
✅ Saved to ../output/parking_visual.mp4
📊 Final Occupancy Rate: 0.0% (0/4)
🧾 Summary JSON saved to ../output/final_parking_status.json
