In [1]:
#Install Dependencies and Setup FastAPI Server
!pip install -q ultralytics fastapi uvicorn pyngrok python-multipart -q


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/974.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━[0m [32m870.4/974.5 kB[0m [31m26.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m974.5/974.5 kB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/95.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.2/95.2 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.3/62.3 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# Upload your model
from google.colab import files
print("Please upload your YOLOv11 model weights (best.pt):")
uploaded = files.upload()


Please upload your YOLOv11 model weights (best.pt):


Saving best_remya.pt to best_remya.pt


In [None]:
%%writefile main.py
import io
import cv2
import base64
import numpy as np
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
from typing import List, Optional, Union
import tempfile
import os

# Import your model loader
# For Colab, you'll need to adapt how you load the model
from model_loader import load_model

# Load the YOLO model
model = load_model()

app = FastAPI(title="Pothole Detection API")

# Allow CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # In production, replace with your Django domain
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class PotholeDetection(BaseModel):
    pothole_detected: bool
    severity: float
    image_base64: Optional[str] = None
    detections: List[dict] = []

class VideoAnalysisResult(BaseModel):
    average_severity: float
    damaged_road_percentage: float
    video_base64: Optional[str] = None
    total_potholes_detected: int

def decode_image(image_data):
    """Convert base64 or file data to OpenCV image"""
    try:
        # For base64 encoded images
        if isinstance(image_data, str) and image_data.startswith('data:image'):
            # Extract base64 content after the comma
            base64_data = image_data.split(',')[1]
            image_bytes = base64.b64decode(base64_data)
        elif isinstance(image_data, str):
            # Already base64 without data URI scheme
            image_bytes = base64.b64decode(image_data)
        else:
            # For direct file uploads
            image_bytes = image_data

        nparr = np.frombuffer(image_bytes, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        if img is None:
            raise ValueError("Failed to decode image")

        return img
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid image data: {str(e)}")

def encode_image_to_base64(cv_image):
    """Convert OpenCV image to base64 string"""
    _, buffer = cv2.imencode('.jpg', cv_image)
    return base64.b64encode(buffer).decode('utf-8')

@app.post("/predict-image/", response_model=PotholeDetection)
async def predict_pothole(file: Union[UploadFile, None] = None, image_base64: str = Form(None)):
    """
    Process an image to detect potholes. Accepts either file upload or base64 image.
    """
    if file is None and image_base64 is None:
        raise HTTPException(status_code=400, detail="Either file or image_base64 must be provided")

    try:
        # Process based on input type
        if file:
            contents = await file.read()
            img = decode_image(contents)
        else:
            img = decode_image(image_base64)

        # **Step 2: Perform inference using YOLO model**
        results_list = model.predict(img)

        if not results_list or results_list[0].masks is None:  # No detections
            return {
                "pothole_detected": False,
                "severity": 0.0,
                "image_base64": encode_image_to_base64(img),
                "detections": [],
            }

        results = results_list[0]  # Extract first result
        masks = results.masks.data.cpu().numpy()  # Get segmentation masks
        confidences = results.boxes.conf.cpu().numpy()  # Get confidence scores

        # **Compute Image Area**
        image_height, image_width, _ = img.shape
        image_area = image_height * image_width

        # **Initialize mask and area storage**
        total_pothole_area = 0
        pothole_areas = []
        combined_mask = np.zeros((image_height, image_width), dtype=np.uint8)  # Accumulate all pothole masks
        pothole_detections = []

        for i, (mask, confidence) in enumerate(zip(masks, confidences)):
            pothole_number = i + 1  # Assign a unique number to each pothole
            binary_mask = (mask > 0).astype(np.uint8) * 255  # Convert mask to binary
            combined_mask = cv2.bitwise_or(combined_mask, binary_mask)  # Merge masks

            contours, _ = cv2.findContours(
                binary_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE
            )

            if len(contours) == 0:
                print(f"Warning: No contours found for pothole {pothole_number}")
                continue  # Skip this mask

            # Get the largest contour area (for pothole)
            contour = max(contours, key=cv2.contourArea)
            area = cv2.contourArea(contour)
            total_pothole_area += area

            # Compute Pothole Percentage
            pothole_percentage = (area / image_area) * 100
            pothole_areas.append(f"Pothole {pothole_number}: {pothole_percentage:.2f}%")

            # Store detection info
            pothole_detections.append({
                "pothole_number": pothole_number,
                "confidence": round(float(confidence), 2),
                "pothole_percentage": round(pothole_percentage, 2),
            })

            # **Step 5: Draw contours and assign a pothole number**
            cv2.drawContours(img, [contour], -1, (0, 255, 0), thickness=3)

            # Get bounding box for placing the pothole number
            x, y, w, h = cv2.boundingRect(contour)
            cv2.putText(
                img, str(pothole_number), (x + w // 2, y + h // 2),  # Place number inside pothole
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2
            )

        # **Step 6: Compute severity percentage**
        severity_percentage = (total_pothole_area / image_area) * 100 if image_area > 0 else 0

        # **Step 7: Overlay segmentation mask (Now covering all potholes)**
        colored_mask = cv2.applyColorMap(combined_mask, cv2.COLORMAP_JET)
        masked_img = cv2.addWeighted(img, 0.6, colored_mask, 0.4, 0)

        # **Step 8: Display Summary Labels at the Top-Left (No Black Background)**
        summary_text = f"Total Damage: {severity_percentage:.2f}%"

        # Draw white text with black shadow for visibility
        cv2.putText(masked_img, summary_text, (15, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 3)  # Shadow
        cv2.putText(masked_img, summary_text, (15, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)  # Main Text

        # Display individual pothole percentages below the total
        y_offset = 60
        for pothole_info in pothole_areas:
            cv2.putText(masked_img, pothole_info, (15, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 3)  # Shadow
            cv2.putText(masked_img, pothole_info, (15, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)  # Main Text
            y_offset += 20

        # Encode the processed image to base64
        processed_image_base64 = encode_image_to_base64(masked_img)

        return {
            "pothole_detected": True,
            "severity": round(severity_percentage, 2),
            "image_base64": processed_image_base64,  # Return processed image as base64
            "detections": pothole_detections,  # Confidence scores and pothole percentages
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}")


@app.post("/process-video/", response_model=VideoAnalysisResult)
async def process_video(file: UploadFile = File(...)):
    """
    Process a video file to detect potholes
    """
    try:
        # Save uploaded video to a temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video:
            temp_video_path = temp_video.name
            temp_video.write(await file.read())

        # Process the video (adapted from your video_processing.py)
        cap = cv2.VideoCapture(temp_video_path)
        if not cap.isOpened():
            raise HTTPException(status_code=400, detail=f"Unable to open video file")

        width, height = (
            int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
        )
        fps = cap.get(cv2.CAP_PROP_FPS)

        processed_video_path = "temp_processed_video.mp4"
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(processed_video_path, fourcc, fps, (width, height))

        frame_count = 0
        total_severity = 0
        unique_potholes = set()

        global_road_mask = np.zeros((height, width), dtype=np.uint8)  # Store total visible road
        global_pothole_mask = np.zeros((height, width), dtype=np.uint8)  # Store all potholes

        orb = cv2.ORB_create(500)
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        prev_frame_gray = None
        prev_kp = None
        prev_des = None

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            homography_matrix = None

            # Compute Homography
            if prev_frame_gray is not None:
                kp, des = orb.detectAndCompute(gray_frame, None)
                if kp and des is not None and prev_kp is not None and prev_des is not None:
                    matches = bf.match(des, prev_des)
                    matches = sorted(matches, key=lambda x: x.distance)[:50]

                    if len(matches) > 10:
                        src_pts = np.float32([kp[m.queryIdx].pt for m in matches]).reshape(
                            -1, 1, 2
                        )
                        dst_pts = np.float32(
                            [prev_kp[m.trainIdx].pt for m in matches]
                        ).reshape(-1, 1, 2)
                        homography_matrix, _ = cv2.findHomography(
                            src_pts, dst_pts, cv2.RANSAC, 5.0
                        )

            prev_frame_gray = gray_frame.copy()
            prev_kp, prev_des = orb.detectAndCompute(gray_frame, None)

            # Run YOLO segmentation model on the frame
            results_list = model.predict(frame, show=False)
            if not results_list:
                out.write(frame)
                continue

            results = results_list[0]
            masks = results.masks
            boxes = results.boxes

            binary_mask = np.zeros((frame.shape[0], frame.shape[1]), dtype=np.uint8)
            detected_potholes = []

            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    confidence = float(box.conf[0])

                    center_x = (x1 + x2) // 2
                    center_y = (y1 + y2) // 2
                    pothole_id = (center_x // 10, center_y // 10)

                    if pothole_id not in unique_potholes:
                        unique_potholes.add(pothole_id)

                    detected_potholes.append((x1, y1, x2, y2, confidence))

            if masks is not None and hasattr(masks, "data"):
                mask_data = masks.data.cpu().numpy()
                for mask in mask_data:
                    mask = (mask * 255).astype(np.uint8)
                    mask_resized = cv2.resize(mask, (frame.shape[1], frame.shape[0]))
                    binary_mask = cv2.bitwise_or(binary_mask, mask_resized)

            # If homography is available, align binary mask
            if homography_matrix is not None:
                binary_mask = cv2.warpPerspective(
                    binary_mask, homography_matrix, (width, height)
                )

            # Update Global Road Mask
            new_road_pixels = (
                (frame[:, :, 0] > 50) | (frame[:, :, 1] > 50) | (frame[:, :, 2] > 50)
            )
            global_road_mask[new_road_pixels] = 255

            # Update Global Pothole Mask
            global_pothole_mask = cv2.bitwise_or(global_pothole_mask, binary_mask)

            total_road_pixels = np.sum(global_road_mask > 0)
            total_pothole_pixels = np.sum(global_pothole_mask > 0)

            current_damage = (
                (np.sum(binary_mask > 0) / np.sum(global_road_mask > 0)) * 100
                if np.sum(global_road_mask > 0) > 0
                else 0
            )
            total_damage = (
                (total_pothole_pixels / total_road_pixels) * 100
                if total_road_pixels > 0
                else 0
            )

            total_severity += current_damage

            # Display Labels
            cv2.putText(
                frame,
                f"Total Damage: {((total_damage)*(1/2)):.2f}%",
                (20, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                (0, 0, 255),
                2,
            )

            cv2.putText(
                frame,
                f"Current Frame: {current_damage:.2f}%",
                (20, 60),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                (255, 0, 0),
                2,
            )

            # Draw pothole contours
            contours, _ = cv2.findContours(
                binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
            )
            for contour in contours:
                cv2.drawContours(frame, [contour], -1, (0, 255, 0), 2)

            mask_colored = cv2.merge(
                [np.zeros_like(binary_mask), np.zeros_like(binary_mask), binary_mask]
            )
            frame = cv2.addWeighted(frame, 0.6, mask_colored, 0.4, 0)

            for x1, y1, x2, y2, confidence in detected_potholes:
                cv2.putText(
                    frame,
                    f"{confidence:.2f}",
                    (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6,
                    (0, 255, 0),
                    2,
                )

            if frame.shape != (height, width, 3):
                frame = cv2.resize(frame, (width, height))

            out.write(frame)

        cap.release()
        out.release()

        avg_severity = total_severity / frame_count if frame_count > 0 else 0
        total_potholes_detected = len(unique_potholes)

        # Read the processed video file and convert to base64
        with open(processed_video_path, "rb") as video_file:
            video_content = video_file.read()
            video_base64 = base64.b64encode(video_content).decode('utf-8')

        # Clean up temporary files
        os.remove(temp_video_path)
        os.remove(processed_video_path)

        return {
            "average_severity": round(avg_severity, 2),
            "damaged_road_percentage": round(total_damage * (1/2), 2),
            "video_base64": video_base64,
            "total_potholes_detected": total_potholes_detected,
        }

    except Exception as e:
        # Clean up in case of error
        if 'temp_video_path' in locals() and os.path.exists(temp_video_path):
            os.remove(temp_video_path)
        if 'processed_video_path' in locals() and os.path.exists(processed_video_path):
            os.remove(processed_video_path)

        raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}")


# For running in Colab with ngrok
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Writing main.py


In [5]:
!pip install nest_asyncio



In [32]:
import io
import cv2
import base64
import numpy as np
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
import uvicorn
from typing import List, Optional, Union
import tempfile
import requests
import nest_asyncio
from fastapi.responses import JSONResponse

# Apply nest_asyncio to allow running asyncio within Jupyter
nest_asyncio.apply()

# Import your model loader
# For Colab, you'll need to adapt how you load the model
from model_loader import load_model

# Load the YOLO model
model = load_model()

app = FastAPI(title="Pothole Detection API")

# Allow CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class PotholeDetection(BaseModel):
    pothole_detected: bool
    severity: float
    image_base64: Optional[str] = None
    detections: List[dict] = []

class VideoAnalysisResult(BaseModel):
    average_severity: float
    damaged_road_percentage: float
    video_base64: Optional[str] = None
    total_potholes_detected: int


class VideoURLRequest(BaseModel):
  video_url:HttpUrl


def decode_image(image_data):
    """Convert base64 or file data to OpenCV image"""
    try:
        # For base64 encoded images
        if isinstance(image_data, str) and image_data.startswith('data:image'):
            # Extract base64 content after the comma
            base64_data = image_data.split(',')[1]
            image_bytes = base64.b64decode(base64_data)
        elif isinstance(image_data, str):
            # Already base64 without data URI scheme
            image_bytes = base64.b64decode(image_data)
        else:
            # For direct file uploads
            image_bytes = image_data

        nparr = np.frombuffer(image_bytes, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        if img is None:
            raise ValueError("Failed to decode image")

        return img
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid image data: {str(e)}")

def encode_image_to_base64(cv_image):
    """Convert OpenCV image to base64 string"""
    _, buffer = cv2.imencode('.jpg', cv_image)
    return base64.b64encode(buffer).decode('utf-8')

def process_image_with_model(img):
    """Process image with YOLO model and return results"""
    # Perform inference
    results_list = model.predict(img)

    # Handle case with no detections
    if not results_list or not hasattr(results_list[0], 'masks') or results_list[0].masks is None:
        return {
            "pothole_detected": False,
            "severity": 0.0,
            "image_base64": encode_image_to_base64(img),
            "detections": [],
        }

    results = results_list[0]

    # Get image dimensions
    image_height, image_width, _ = img.shape
    image_area = image_height * image_width

    # Extract masks and confidences
    masks = results.masks.data.cpu().numpy()
    confidences = results.boxes.conf.cpu().numpy()

    # Initialize processing variables
    total_pothole_area = 0
    pothole_areas = []
    combined_mask = np.zeros((image_height, image_width), dtype=np.uint8)
    pothole_detections = []

    # Process each detected pothole
    for i, (mask, confidence) in enumerate(zip(masks, confidences)):
        pothole_number = i + 1

        # Convert mask to binary
        binary_mask = (mask > 0).astype(np.uint8) * 255
        combined_mask = cv2.bitwise_or(combined_mask, binary_mask)

        # Find contours in the mask
        contours, _ = cv2.findContours(binary_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

        if not contours:
            continue

        # Get largest contour (pothole)
        contour = max(contours, key=cv2.contourArea)
        area = cv2.contourArea(contour)
        total_pothole_area += area

        # Calculate pothole percentage
        pothole_percentage = (area / image_area) * 100
        pothole_areas.append(f"Pothole {pothole_number}: {pothole_percentage:.2f}%")

        # Store detection info
        pothole_detections.append({
            "pothole_number": pothole_number,
            "confidence": round(float(confidence), 2),
            "pothole_percentage": round(pothole_percentage, 2),
        })

        # Draw contours and label
        cv2.drawContours(img, [contour], -1, (0, 255, 0), thickness=3)
        x, y, w, h = cv2.boundingRect(contour)
        cv2.putText(img, str(pothole_number), (x + w // 2, y + h // 2),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

    # Calculate overall severity
    severity_percentage = (total_pothole_area / image_area) * 100 if image_area > 0 else 0

    # Create visualization with mask overlay
    colored_mask = cv2.applyColorMap(combined_mask, cv2.COLORMAP_JET)
    masked_img = cv2.addWeighted(img, 0.6, colored_mask, 0.4, 0)

    # Add summary text
    summary_text = f"Total Damage: {severity_percentage:.2f}%"
    cv2.putText(masked_img, summary_text, (15, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 3)
    cv2.putText(masked_img, summary_text, (15, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

    # Add individual pothole percentages
    y_offset = 60
    for pothole_info in pothole_areas:
        cv2.putText(masked_img, pothole_info, (15, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 3)
        cv2.putText(masked_img, pothole_info, (15, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        y_offset += 20

    # Return results
    return {
        "pothole_detected": True,
        "severity": round(severity_percentage, 2),
        "image_base64": encode_image_to_base64(masked_img),
        "detections": pothole_detections,
    }

@app.post("/predict-image/", response_model=PotholeDetection)
async def predict_pothole(file: Union[UploadFile, None] = None, image_base64: str = Form(None)):
    """
    Process an image to detect potholes. Accepts either file upload or base64 image.
    """
    if file is None and image_base64 is None:
        raise HTTPException(status_code=400, detail="Either file or image_base64 must be provided")

    try:
        # Process based on input type
        if file:
            contents = await file.read()
            img = decode_image(contents)
        else:
            img = decode_image(image_base64)

        # Process the image with the model
        return process_image_with_model(img)

    except Exception as e:
        import traceback
        return JSONResponse(
            status_code=500,
            content={"detail": f"Error processing image: {str(e)}",
                     "traceback": traceback.format_exc()}
        )

@app.post("/process-video/", response_model=VideoAnalysisResult)
# async def process_video(file: UploadFile = File(...)):
async def process_video(request: VideoURLRequest):
    """
    # Process a video file to detect potholes

    """
    video_url = request.video_url
    temp_video_path = None
    # processed_video_path = None

    try:
        response = requests.get(video_url)
        response.raise_for_status()

        # Save uploaded video to a temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_video:
            temp_video.write(response.content)
            temp_video_path = temp_video.name

        # Set up video capture
        cap = cv2.VideoCapture(temp_video_path)
        if not cap.isOpened():
            raise HTTPException(status_code=400, detail="Unable to open video file")

        # Get video properties
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        # Set up video writer
        processed_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(processed_video_path, fourcc, fps, (width, height))

        # Initialize video processing variables
        frame_count = 0
        total_severity = 0
        unique_potholes = set()

        # Initialize masks for road and pothole tracking
        global_road_mask = np.zeros((height, width), dtype=np.uint8)
        global_pothole_mask = np.zeros((height, width), dtype=np.uint8)

        # Set up ORB feature detector for homography
        orb = cv2.ORB_create(500)
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        prev_frame_gray = None
        prev_kp = None
        prev_des = None

        # Process video frame by frame
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Calculate homography between consecutive frames
            homography_matrix = None
            if prev_frame_gray is not None:
                kp, des = orb.detectAndCompute(gray_frame, None)
                if kp and des is not None and prev_kp and prev_des is not None:
                    matches = bf.match(des, prev_des)
                    if matches:
                        matches = sorted(matches, key=lambda x: x.distance)[:50]

                        if len(matches) > 10:
                            src_pts = np.float32([kp[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
                            dst_pts = np.float32([prev_kp[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
                            homography_matrix, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)

            # Update previous frame data
            prev_frame_gray = gray_frame.copy()
            prev_kp, prev_des = orb.detectAndCompute(gray_frame, None)

            # Run YOLO model on current frame
            results_list = model.predict(frame, show=False)

            # Handle empty results
            if not results_list:
                out.write(frame)
                continue

            results = results_list[0]
            binary_mask = np.zeros((frame.shape[0], frame.shape[1]), dtype=np.uint8)
            detected_potholes = []

            # Process bounding boxes
            if hasattr(results, 'boxes') and results.boxes is not None:
                for box in results.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    confidence = float(box.conf[0])

                    # Track unique potholes using grid-based approach
                    center_x = (x1 + x2) // 2
                    center_y = (y1 + y2) // 2
                    pothole_id = (center_x // 10, center_y // 10)

                    if pothole_id not in unique_potholes:
                        unique_potholes.add(pothole_id)

                    detected_potholes.append((x1, y1, x2, y2, confidence))

            # Process segmentation masks
            if hasattr(results, 'masks') and results.masks is not None and hasattr(results.masks, 'data'):
                mask_data = results.masks.data.cpu().numpy()
                for mask in mask_data:
                    mask = (mask * 255).astype(np.uint8)
                    mask_resized = cv2.resize(mask, (frame.shape[1], frame.shape[0]))
                    binary_mask = cv2.bitwise_or(binary_mask, mask_resized)

            # Apply homography to mask if available
            if homography_matrix is not None:
                try:
                    binary_mask = cv2.warpPerspective(binary_mask, homography_matrix, (width, height))
                except Exception:
                    # Continue if homography transformation fails
                    pass

            # Update global masks
            new_road_pixels = (frame[:, :, 0] > 50) | (frame[:, :, 1] > 50) | (frame[:, :, 2] > 50)
            global_road_mask[new_road_pixels] = 255
            global_pothole_mask = cv2.bitwise_or(global_pothole_mask, binary_mask)

            # Calculate damage percentages
            total_road_pixels = np.sum(global_road_mask > 0)
            total_pothole_pixels = np.sum(global_pothole_mask > 0)

            current_damage = (np.sum(binary_mask > 0) / np.sum(global_road_mask > 0)) * 100 if np.sum(global_road_mask > 0) > 0 else 0
            total_damage = (total_pothole_pixels / total_road_pixels) * 100 if total_road_pixels > 0 else 0

            # Apply damage calibration factor (preserved from original code)
            calibrated_damage = total_damage * (1/2)

            total_severity += current_damage

            # Add information to frame
            cv2.putText(frame, f"Total Damage: {calibrated_damage:.2f}%", (20, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
            cv2.putText(frame, f"Current Frame: {current_damage:.2f}%", (20, 60),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)

            # Draw pothole contours
            contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            for contour in contours:
                cv2.drawContours(frame, [contour], -1, (0, 255, 0), 2)

            # Add mask overlay
            mask_colored = cv2.merge([np.zeros_like(binary_mask), np.zeros_like(binary_mask), binary_mask])
            frame = cv2.addWeighted(frame, 0.6, mask_colored, 0.4, 0)

            # Add confidence scores
            for x1, y1, x2, y2, confidence in detected_potholes:
                cv2.putText(frame, f"{confidence:.2f}", (x1, y1 - 10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

            # Ensure frame has correct dimensions
            if frame.shape != (height, width, 3):
                frame = cv2.resize(frame, (width, height))

            # Write frame to output video
            out.write(frame)

        # Clean up video objects
        cap.release()
        out.release()

        # Calculate final statistics
        avg_severity = total_severity / frame_count if frame_count > 0 else 0
        total_potholes_detected = len(unique_potholes)

        # Convert processed video to base64
        with open(processed_video_path, "rb") as video_file:
            video_content = video_file.read()
            video_base64 = base64.b64encode(video_content).decode('utf-8')

        # Return results
        return {
            "average_severity": round(avg_severity, 2),
            "damaged_road_percentage": round(total_damage * (1/2), 2),
            "video_base64": video_base64,
            "total_potholes_detected": total_potholes_detected,
        }

    except Exception as e:
        import traceback
        return JSONResponse(
            status_code=500,
            content={"detail": f"Error processing video: {str(e)}",
                     "traceback": traceback.format_exc()}
        )

    finally:
        # Clean up temporary files
        for path in [temp_video_path, processed_video_path]:
            if path and os.path.exists(path):
                try:
                    os.remove(path)
                except Exception:
                    pass

# For Jupyter/Colab, use this code for running the server
# First, you'll need to install nest_asyncio:


# Run the FastAPI app without blocking the notebook:
def run_server():
    # Import needed here to avoid circular imports
    import uvicorn
    config = uvicorn.Config(app, host="0.0.0.0", port=8000)
    server = uvicorn.Server(config)
    server.run()



In [37]:
import threading
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True
server_thread.start()
print("FastAPI server started in background thread on http://0.0.0.0:8000")

FastAPI server started in background thread on http://0.0.0.0:8000


In [38]:
%%writefile model_loader.py
from ultralytics import YOLO
import os

def load_model():
    """
    Load the YOLOv11 segmentation model

    In Colab you may need to:
    1. Install ultralytics: !pip install ultralytics
    2. Download your model weights
    """
    # Path to your model weights - update as needed
    # If you have your model weights in Google Drive, you might need to mount it first
    model_path = "/content/best_remya.pt"  # Update with actual path

    # Check if model exists, otherwise download or raise error
    if not os.path.exists(model_path):
        # You can provide instructions for downloading from Google Drive here
        raise FileNotFoundError(f"Model file not found at {model_path}. Please upload your model weights.")

    # Load the model
    model = YOLO(model_path)

    return model

Overwriting model_loader.py


In [39]:
# Setup ngrok tunnel
from pyngrok import ngrok

# Set your ngrok auth token (needed for longer session)
# Get your auth token from https://dashboard.ngrok.com/get-started/your-authtoken
!ngrok authtoken 2uqM61EwtXl1ckT4mXnjitdD3aQ_2WceDsogrzHe5C4ZG1UKw


Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [40]:
# Start ngrok tunnel to port 8000
ngrok_tunnel = ngrok.connect(8000)
print("Public URL:", ngrok_tunnel.public_url)

Public URL: https://4527-104-196-44-117.ngrok-free.app


In [36]:
!pkill -f ngrok


In [None]:
# Launch the FastAPI app
!uvicorn main:app --host 0.0.0.0 --port 8000