# Kiti Autonomous Vehicle - Complete Pipeline

This notebook combines **Area Marking** and **Optical Flow** analysis for a complete autonomous vehicle computer vision pipeline.

## Features
- **Part 1: Area Marking** - Define region of interest (ROI) for obstacle detection
- **Part 2: Object Detection** - YOLO-based object detection
- **Part 3: Optical Flow Analysis** - Motion detection and trajectory prediction
- **Part 4: Trajectory Prediction** - Linear Regression and Kalman Filter methods

## Project Structure
The production-ready Python modules are available in the `src/` directory:
- `src/main.py` - Main pipeline orchestration
- `src/video_processor.py` - Video frame extraction
- `src/object_detection.py` - YOLO object detection
- `src/optical_flow.py` - Optical flow analysis
- `config/settings.py` - Centralized configuration

---
# Part 1: Setup and Dependencies

In [None]:
# Install required dependencies
!pip install ultralytics opencv-python scikit-learn matplotlib numpy

In [None]:
import os
from glob import glob
import re
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from collections import deque

%matplotlib inline

In [None]:
# Mount Google Drive (for Colab usage)
from google.colab import drive
drive.mount("/content/drive")

In [None]:
# Define the folder path for input videos
data_path = "/content/drive/MyDrive/Video"

# List video files in the folder
video_paths = [os.path.join(data_path, file) for file in os.listdir(data_path) if file.lower().endswith(".mp4")]
print("Available videos:", video_paths)

---
# Part 2: Area Marking

Define the central 40% region of interest (ROI) for obstacle detection. Objects within this area are prioritized for autonomous vehicle response.

In [None]:
def mark_central_area(video_path, output_path="marked_video.mp4"):
    """
    Draw a green box over the central 40% of the video frame.
    This defines the region of interest for obstacle detection.
    """
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))

    start_x = int(0.3 * frame_width)
    end_x = int(0.7 * frame_width)

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        cv2.rectangle(frame, (start_x, 0), (end_x, frame_height), (0, 255, 0), 3)
        out.write(frame)
        frame_count += 1

    cap.release()
    out.release()
    print(f"Processed {frame_count} frames. Saved to: {output_path}")
    return output_path

---
# Part 3: Motion-Based Obstacle Detection

Use background subtraction to detect moving objects within the ROI.

In [None]:
def detect_obstacles_motion(video_path, output_path="obstacle_detected.mp4"):
    """
    Detect obstacles using background subtraction.
    """
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))

    start_x = int(0.3 * frame_width)
    end_x = int(0.7 * frame_width)

    fgbg = cv2.createBackgroundSubtractorMOG2(history=100, varThreshold=50)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        fgmask = fgbg.apply(frame)
        _, thresh = cv2.threshold(fgmask, 200, 255, cv2.THRESH_BINARY)
        thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8))

        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        cv2.rectangle(frame, (start_x, 0), (end_x, frame_height), (0, 255, 0), 3)

        for cnt in contours:
            x, y, w, h = cv2.boundingRect(cnt)
            if w * h < 500:
                continue
            center_x = x + w // 2
            if start_x <= center_x <= end_x:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                cv2.putText(frame, "Obstacle Detected", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        out.write(frame)

    cap.release()
    out.release()
    print(f"Obstacle detection complete. Saved to: {output_path}")

---
# Part 4: YOLO Object Detection

Use YOLOv5/YOLOv8 for accurate object detection with class labels.

In [None]:
import torch

def detect_objects_yolo(video_path, output_path="yolo_annotated.mp4", confidence=0.3):
    """
    Detect objects using YOLOv5 model.
    """
    model = torch.hub.load("ultralytics/yolov5", "yolov5s", pretrained=True)
    model.conf = confidence

    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (frame_width, frame_height))

    start_x = int(0.3 * frame_width)
    end_x = int(0.7 * frame_width)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = model(frame[..., ::-1])
        detections = results.xyxy[0]

        cv2.rectangle(frame, (start_x, 0), (end_x, frame_height), (0, 255, 0), 3)

        for *box, conf, cls in detections:
            x1, y1, x2, y2 = map(int, box)
            class_name = model.names[int(cls)]
            center_x = (x1 + x2) // 2
            if start_x <= center_x <= end_x:
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                label = f"{class_name} ({conf:.2f})"
                cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        out.write(frame)

    cap.release()
    out.release()
    print(f"YOLO detection complete. Saved to: {output_path}")

---
# Part 5: Optical Flow Analysis

Track motion between frames using Farneback optical flow algorithm.

In [None]:
def analyze_optical_flow(video_path, output_path="optical_flow.mp4"):
    """
    Analyze optical flow to track motion and detect moving objects.
    """
    cap = cv2.VideoCapture(video_path)

    ret, first_frame = cap.read()
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    h, w = first_frame.shape[:2]

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 20, (w, h))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        motion_mask = (mag > 1.0).astype(np.uint8) * 255

        contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        largest = None
        max_area = 0
        for cnt in contours:
            if cv2.contourArea(cnt) > 500 and cv2.contourArea(cnt) > max_area:
                max_area = cv2.contourArea(cnt)
                largest = cnt

        if largest is not None:
            x, y, w_box, h_box = cv2.boundingRect(largest)
            cx, cy = x + w_box // 2, y + h_box // 2
            cv2.rectangle(frame, (x, y), (x + w_box, y + h_box), (0, 255, 0), 2)
            cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)

        prev_gray = gray
        out.write(frame)

    cap.release()
    out.release()
    print(f"Optical flow analysis complete. Saved to: {output_path}")

---
# Part 6: Trajectory Prediction

Predict future object positions using Linear Regression or Kalman Filter.

In [None]:
def predict_trajectory_linear(video_path, output_path="predicted_path.mp4"):
    """
    Predict trajectory using Linear Regression based on motion history.
    """
    cap = cv2.VideoCapture(video_path)

    ret, first_frame = cap.read()
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    h, w = first_frame.shape[:2]

    trajectory = deque(maxlen=30)
    frame_count = 0

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 20, (w, h))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        motion_mask = (mag > 1.0).astype(np.uint8) * 255

        contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        largest = None
        max_area = 0

        for cnt in contours:
            if cv2.contourArea(cnt) > 500 and cv2.contourArea(cnt) > max_area:
                max_area = cv2.contourArea(cnt)
                largest = cnt

        if largest is not None:
            x, y, w_box, h_box = cv2.boundingRect(largest)
            cx, cy = x + w_box // 2, y + h_box // 2

            trajectory.append((frame_count, cx, cy))
            cv2.rectangle(frame, (x, y), (x + w_box, y + h_box), (0, 255, 0), 2)
            cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)

        if len(trajectory) >= 10:
            arr = np.array(trajectory)
            times = arr[:, 0].reshape(-1, 1)
            xs, ys = arr[:, 1], arr[:, 2]

            model_x = LinearRegression().fit(times, xs)
            model_y = LinearRegression().fit(times, ys)

            future_times = np.array([[frame_count + i] for i in range(1, 31)])
            pred_xs = model_x.predict(future_times).astype(int)
            pred_ys = model_y.predict(future_times).astype(int)

            for px, py in zip(pred_xs, pred_ys):
                if 0 <= px < w and 0 <= py < h:
                    cv2.circle(frame, (px, py), 2, (255, 0, 0), -1)

        prev_gray = gray
        frame_count += 1
        out.write(frame)

    cap.release()
    out.release()
    print(f"Trajectory prediction complete. Saved to: {output_path}")

In [None]:
def predict_trajectory_kalman(video_path, output_path="kalman_path.mp4"):
    """
    Predict trajectory using Kalman Filter for smoothed predictions.
    """
    kf = cv2.KalmanFilter(4, 2)
    kf.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
    kf.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
    kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03
    kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 1

    cap = cv2.VideoCapture(video_path)
    ret, first_frame = cap.read()
    if not ret:
        raise RuntimeError("Failed to read video")

    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    h, w = first_frame.shape[:2]

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 20, (w, h))
    predicted_points = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        motion_mask = (mag > 1.0).astype(np.uint8) * 255

        contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        largest = None
        max_area = 0

        for cnt in contours:
            if cv2.contourArea(cnt) > 500 and cv2.contourArea(cnt) > max_area:
                max_area = cv2.contourArea(cnt)
                largest = cnt

        if largest is not None:
            x, y, w_box, h_box = cv2.boundingRect(largest)
            cx, cy = x + w_box // 2, y + h_box // 2

            measurement = np.array([[np.float32(cx)], [np.float32(cy)]])
            kf.correct(measurement)

            prediction = kf.predict()
            px, py = int(prediction[0][0]), int(prediction[1][0])

            predicted_points.append((px, py))

            cv2.circle(frame, (cx, cy), 5, (0, 0, 255), -1)
            cv2.circle(frame, (px, py), 5, (255, 0, 0), -1)

            for i in range(1, len(predicted_points)):
                cv2.line(frame, predicted_points[i - 1], predicted_points[i], (255, 0, 0), 2)

        prev_gray = gray
        out.write(frame)

    cap.release()
    out.release()
    print(f"Kalman prediction complete. Saved to: {output_path}")

---
# Part 7: Complete Pipeline

Run the full autonomous vehicle vision pipeline.

In [None]:
def run_complete_pipeline(video_path):
    """
    Run the complete autonomous vehicle vision pipeline.
    """
    print("=" * 60)
    print("Kiti Autonomous Vehicle - Vision Pipeline")
    print("=" * 60)

    print("\n[1/5] Area Marking...")
    mark_central_area(video_path, "step1_marked.mp4")

    print("\n[2/5] Motion-Based Obstacle Detection...")
    detect_obstacles_motion(video_path, "step2_motion_detection.mp4")

    print("\n[3/5] YOLO Object Detection...")
    detect_objects_yolo(video_path, "step3_yolo_detection.mp4")

    print("\n[4/5] Optical Flow Analysis...")
    analyze_optical_flow(video_path, "step4_optical_flow.mp4")

    print("\n[5/5] Trajectory Prediction (Linear + Kalman)...")
    predict_trajectory_linear(video_path, "step5a_linear_prediction.mp4")
    predict_trajectory_kalman(video_path, "step5b_kalman_prediction.mp4")

    print("\n" + "=" * 60)
    print("Pipeline Complete!")
    print("=" * 60)

In [None]:
# Example: Run the pipeline on a video
# video_path = "/content/drive/MyDrive/Video/clip 2.mp4"
# run_complete_pipeline(video_path)

---
# Download Results

Download the processed videos.

In [None]:
from google.colab import files

# Uncomment to download specific outputs
# files.download("step1_marked.mp4")
# files.download("step3_yolo_detection.mp4")
# files.download("step5b_kalman_prediction.mp4")