In [1]:
import cv2
def extract_frames(video_path, output_dir, frequency=10):
    cap = cv2.VideoCapture(video_path)
    # Make sure we successfully opened the video
    if not cap.isOpened():
        print(f"Error: Could not open video at {video_path}")
        return []

    frames = []
    count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if count % frequency == 0:
            frames.append(frame)  # Store frame in memory

        count += 1

    cap.release()
    print(f"Extracted {len(frames)} frames")
    return frames

In [2]:
def preprocess_image(image):
    """
    Enhance image for better feature detection

    Args:
        image: Input image
    Returns:
        Processed image
    """
    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Apply histogram equalization for better contrast
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    equalized = clahe.apply(gray)

    # Apply Gaussian blur to reduce noise
    blurred = cv2.GaussianBlur(equalized, (5, 5), 0)

    return blurred

Image Stitching

In [3]:
import numpy as np
def detect_and_match_features(img1, img2):
    """
    Detect and match features between two images

    Args:
        img1, img2: Input images
    Returns:
        Matching points in both images
    """
    # Initialize ORB detector
    orb = cv2.ORB_create(nfeatures=1000)

    # Find keypoints and descriptors
    kp1, des1 = orb.detectAndCompute(img1, None)
    kp2, des2 = orb.detectAndCompute(img2, None)

    # Create matcher
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

    # Match descriptors
    matches = bf.match(des1, des2)

    # Sort by distance
    matches = sorted(matches, key=lambda x: x.distance)

    # Take only good matches
    good_matches = matches[:50]

    # Extract location of matched keypoints
    src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
    dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)

    return src_pts, dst_pts, good_matches

In [4]:
def compute_homography(src_pts, dst_pts):
    """
    Compute homography matrix between two sets of points

    Args:
        src_pts, dst_pts: Matching points in two images
    Returns:
        Homography matrix
    """
    H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
    return H, mask

In [5]:
def stitch_images_custom(images):
    # Convert to grayscale for better feature detection
    gray_images = [cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) for img in images]

    # Create feature detector
    orb = cv2.ORB_create(nfeatures=2000)

    # Find keypoints and descriptors
    keypoints = []
    descriptors = []
    for img in gray_images:
        kp, des = orb.detectAndCompute(img, None)
        keypoints.append(kp)
        descriptors.append(des)

    # Create matches between consecutive images
    matches_list = []
    for i in range(len(images)-1):
        bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        matches = bf.match(descriptors[i], descriptors[i+1])
        matches = sorted(matches, key=lambda x: x.distance)
        matches_list.append(matches[:100])  # Take top 100 matches

    # Print diagnostic information
    for i, matches in enumerate(matches_list):
        print(f"Image pair {i}-{i+1}: {len(matches)} good matches")

    # Continue with stitching implementation or use OpenCV's stitcher
    stitcher = cv2.Stitcher_create()
    status, panorama = stitcher.stitch(images)

    if status == cv2.Stitcher_OK:
        return panorama
    else:
        print(f"Stitching failed with status code: {status}")
        return None

Obstacle detection

In [6]:
def detect_obstacles(image):
    """
    Detect obstacles in terrain image

    Args:
        image: Input terrain image
    Returns:
        Binary mask with obstacles marked as white (255)
    """
    # Convert to grayscale if not already
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image

    # Apply adaptive thresholding
    thresh = cv2.adaptiveThreshold(
        gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY_INV, 11, 2
    )

    # Detect edges using Canny
    edges = cv2.Canny(gray, 100, 200)

    # Combine thresholding and edge detection
    combined = cv2.bitwise_or(thresh, edges)

    # Apply morphological operations to clean up
    kernel = np.ones((5, 5), np.uint8)
    obstacle_mask = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel)

    return obstacle_mask

In [7]:
def classify_terrain(image, obstacle_mask):
    """
    Classify terrain into traversable and non-traversable areas

    Args:
        image: Original terrain image
        obstacle_mask: Binary mask with obstacles
    Returns:
        Visualization with traversable areas marked
    """
    # Create a copy of the input image
    visualization = image.copy()

    # Create a colored overlay for obstacles (red)
    red_mask = np.zeros_like(image)
    red_mask[obstacle_mask > 0] = [0, 0, 255]  # BGR format

    # Create a colored overlay for safe areas (green)
    green_mask = np.zeros_like(image)
    green_mask[obstacle_mask == 0] = [0, 255, 0]  # BGR format

    # Blend with original image
    alpha = 0.3  # Transparency factor
    visualization = cv2.addWeighted(visualization, 1, red_mask, alpha, 0)
    visualization = cv2.addWeighted(visualization, 1, green_mask, alpha * 0.5, 0)

    return visualization

Path Planning

In [8]:
def create_navigation_graph(obstacle_mask, grid_size=20):
    """
    Create navigation graph from obstacle mask

    Args:
        obstacle_mask: Binary mask with obstacles
        grid_size: Size of grid cells
    Returns:
        Graph representation and node coordinates
    """
    height, width = obstacle_mask.shape

    # Create grid
    nodes = {}
    node_coords = {}
    node_id = 0

    for y in range(0, height, grid_size):
        for x in range(0, width, grid_size):
            # Check if grid cell is mostly obstacle-free
            cell = obstacle_mask[y:y+grid_size, x:x+grid_size]
            if cell.size > 0 and np.mean(cell) < 127:  # Mostly traversable
                nodes[node_id] = []
                node_coords[node_id] = (x + grid_size//2, y + grid_size//2)
                node_id += 1

    # Connect neighboring nodes
    for node in nodes:
        x1, y1 = node_coords[node]
        for other_node in nodes:
            if node == other_node:
                continue

            x2, y2 = node_coords[other_node]
            # Check if nodes are neighbors (within 1.5 * grid_size)
            dist = np.sqrt((x2-x1)**2 + (y2-y1)**2)
            if dist < 1.5 * grid_size:
                # Check if connection crosses obstacles
                line_mask = np.zeros_like(obstacle_mask)
                cv2.line(line_mask, (x1, y1), (x2, y2), 255, 1)
                line_pixels = np.logical_and(line_mask > 0, obstacle_mask > 0)

                if np.sum(line_pixels) < 0.3 * np.sum(line_mask > 0):
                    # Connection is mostly obstacle-free
                    nodes[node].append((other_node, dist))

    return nodes, node_coords

In [9]:
import heapq

def a_star(graph, start, goal, node_coords):
    """
    Find shortest path using A* algorithm

    Args:
        graph: Navigation graph
        start: Start node ID
        goal: Goal node ID
        node_coords: Dictionary mapping node IDs to coordinates
    Returns:
        Path as list of node IDs and total distance
    """
    # Helper function to calculate heuristic (Euclidean distance)
    def heuristic(node):
        x1, y1 = node_coords[node]
        x2, y2 = node_coords[goal]
        return np.sqrt((x2-x1)**2 + (y2-y1)**2)

    # Priority queue for open nodes
    open_set = [(0, start)]  # (f_score, node)

    # For reconstructing the path
    came_from = {}

    # Cost from start to current node
    g_score = {node: float('inf') for node in graph}
    g_score[start] = 0

    # Estimated total cost from start to goal through node
    f_score = {node: float('inf') for node in graph}
    f_score[start] = heuristic(start)

    while open_set:
        _, current = heapq.heappop(open_set)

        if current == goal:
            # Reconstruct path
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            path.reverse()

            # Calculate total distance
            total_distance = g_score[goal]

            return path, total_distance

        for neighbor, distance in graph[current]:
            tentative_g = g_score[current] + distance

            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f_score[neighbor] = tentative_g + heuristic(neighbor)

                # Add to open set if not already there
                for item in open_set:
                    if item[1] == neighbor:
                        break
                else:
                    heapq.heappush(open_set, (f_score[neighbor], neighbor))

    return None, float('inf')  # No path found

In [10]:
def visualize_path(image, path, node_coords):
    """
    Draw the optimal path on terrain image

    Args:
        image: Terrain image
        path: List of node IDs forming the path
        node_coords: Dictionary mapping node IDs to coordinates
    Returns:
        Image with path visualized
    """
    result = image.copy()

    # Draw path
    for i in range(len(path) - 1):
        start = node_coords[path[i]]
        end = node_coords[path[i+1]]
        cv2.line(result, start, end, (0, 255, 255), 3)  # Yellow line

    # Mark start and end points
    cv2.circle(result, node_coords[path[0]], 10, (255, 0, 0), -1)  # Blue start
    cv2.circle(result, node_coords[path[-1]], 10, (0, 0, 255), -1)  # Red end

    return result

In [11]:
def calculate_path_distance(path, node_coords):
    """
    Calculate total distance along path

    Args:
        path: List of node IDs forming the path
        node_coords: Dictionary mapping node IDs to coordinates
    Returns:
        Total distance in pixels
    """
    total_distance = 0

    for i in range(len(path) - 1):
        x1, y1 = node_coords[path[i]]
        x2, y2 = node_coords[path[i+1]]
        segment_distance = np.sqrt((x2-x1)**2 + (y2-y1)**2)
        total_distance += segment_distance

    return total_distance

In [None]:
def process_drone_imagery(video_path, start_coords, end_coords):
    """
    Full processing pipeline from drone video to optimal path

    Args:
        video_path: Path to drone video
        start_coords: Starting coordinates (x, y)
        end_coords: Ending coordinates (x, y)
    Returns:
        Final visualization with path
    """
    # 1. Extract frames
    frames = []
    cap = cv2.VideoCapture(video_path)
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if len(frames) % 10 == 0:  # Take every 10th frame
            frames.append(frame)
    cap.release()

    # 2. Preprocess frames
    processed_frames = [preprocess_image(frame) for frame in frames]

    # 3. Stitch images
    terrain_map = stitch_images_custom(processed_frames)

    # 4. Detect obstacles
    obstacle_mask = detect_obstacles(terrain_map)

    # 5. Create navigation graph
    graph, node_coords = create_navigation_graph(obstacle_mask)

    # 6. Find closest nodes to start and end coordinates
    start_node = min(node_coords.keys(),
                     key=lambda n: ((node_coords[n][0] - start_coords[0])**2 +
                                    (node_coords[n][1] - start_coords[1])**2))
    end_node = min(node_coords.keys(),
                   key=lambda n: ((node_coords[n][0] - end_coords[0])**2 +
                                  (node_coords[n][1] - end_coords[1])**2))

    # 7. Find optimal path
    path, distance = a_star(graph, start_node, end_node, node_coords)

    # 8. Visualize results
    terrain_with_obstacles = classify_terrain(terrain_map, obstacle_mask)
    final_visualization = visualize_path(terrain_with_obstacles, path, node_coords)

    # 9. Add distance information
    text = f"Optimal path distance: {distance:.2f} units"
    cv2.putText(final_visualization, text, (20, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    return final_visualization

In [14]:
process_drone_imagery('/content/a-flight-over-the-woods-SBV-337898150-preview.mp4',(100,100),(200,200))

Extracting frames from video...

Extracted 166 frames

First frame shape: (1080, 1920, 3), dtype: uint8

Stitching images...

Optimal path distance: 564.4m