In [2]:
import googlemaps
import requests
import polyline
import math
import os
import time
import geopy.distance
import numpy as np
from io import BytesIO
import cv2
from PIL import Image
import torch
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from pyproj import Geod
import folium
from folium.plugins import HeatMap, MeasureControl
from folium.features import GeoJson, GeoJsonPopup
from shapely.geometry import Point, LineString
from shapely.ops import unary_union
from dotenv import load_dotenv

# Load environment variables at the very beginning
load_dotenv()

# API key for Google Maps (replace with your own key)
key = os.getenv("GOOGLE_API_KEY")

# Route endpoints
start_point = "1233 Lawrence Ave W, Toronto, Ontario"
end_point = "1242 Lawrence Ave W, North York, ON M6L 1A3"

# Camera parameters
pitch_values = [0, 30, 60]  # Vertical tilt in degrees
fov_def = [55]  # Default horizontal field of view in degrees
fov_special = [40]  # Special FOV for diagonal angles
relative_angles_right = [90, 45, 135]  # Right-side angles relative to road heading
relative_angles_left = [270, 310, 230]  # Left-side angles relative to road heading
camera_altitude = 2.5  # Camera height above ground in meters (typical for Street View)

# Output directory
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)

# Create subfolders for images
dropped_dir = os.path.join(output_dir, "dropped")
kept_dir = os.path.join(output_dir, "kept")
os.makedirs(dropped_dir, exist_ok=True)
os.makedirs(kept_dir, exist_ok=True)

# Path to the YOLOv5 model
model_path = 'C:/Users/Daniel Roberts/Downloads/HydroOne/transmission_model.pt'

# Load YOLOv5 model
if not os.path.exists(model_path):
    print(f"Error: Model file not found at {model_path}. Falling back to YOLOv5s pre-trained model.")
    model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True, trust_repo=True)
else:
    model = torch.hub.load('ultralytics/yolov5', 'custom', path=model_path, trust_repo=True)
    print(f"Loaded custom model from {model_path}")

# Initialize Geod object for WGS84 ellipsoid
geod = Geod(ellps='WGS84')

### Utility Functions

def calculate_bearing(lat1, lon1, lat2, lon2):
    """Calculate the bearing between two points in degrees."""
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlon = lon2 - lon1
    y = math.sin(dlon) * math.cos(lat2)
    x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon)
    initial_bearing = math.atan2(y, x)
    compass_bearing = (math.degrees(initial_bearing) + 360) % 360
    return compass_bearing

def get_route_coordinates(start_location, end_location):
    """Retrieve route coordinates from Google Directions API."""
    base_url = "https://maps.googleapis.com/maps/api/directions/json"
    params = {
        'origin': start_location,
        'destination': end_location,
        'key': key,
    }
    response = requests.get(base_url, params=params)
    data = response.json()
    if not data.get('routes'):
        print("No routes found in the Directions API response.")
        return []
    coordinates = []
    for leg in data['routes'][0]['legs']:
        for step in leg['steps']:
            points = step['polyline']['points']
            decoded_points = polyline.decode(points)
            coordinates.extend(decoded_points)
    return coordinates

def haversine(lat1, lon1, lat2, lon2):
    """Calculate Haversine distance between two points in kilometers."""
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    delta_lat = lat2 - lat1
    delta_lon = lon2 - lon1
    a = math.sin(delta_lat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2)**2
    c = 2 * math.asin(math.sqrt(a))
    r = 6371  # Earth's radius in kilometers
    return r * c

def interpolate_route(snapshot_points, interval=20):
    """Interpolate points along a route at a specified interval in meters."""
    new_points = [snapshot_points[0]]
    for i in range(len(snapshot_points) - 1):
        lat1, lon1 = snapshot_points[i]
        lat2, lon2 = snapshot_points[i + 1]
        distance = haversine(lat1, lon1, lat2, lon2) * 1000  # Convert to meters
        if distance < interval:
            continue
        bearing = calculate_bearing(lat1, lon1, lat2, lon2)
        num_steps = int(distance // interval)
        for step in range(1, num_steps + 1):
            new_point = geopy.distance.distance(meters=interval * step).destination((lat1, lon1), bearing)
            new_points.append((new_point.latitude, new_point.longitude))
    new_points.append(snapshot_points[-1])
    return new_points

def get_streetview_image(lat, lng, heading, pitch, fov):
    """Fetch a Street View image from Google Street View API."""
    base_url = "https://maps.googleapis.com/maps/api/streetview"
    params = {
        "size": "1028x1028",
        "location": f"{lat},{lng}",
        "heading": heading,
        "pitch": pitch,
        "fov": fov,
        "key": key,
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        return Image.open(BytesIO(response.content))
    else:
        print(f"Failed to get image at ({lat}, {lng}) with heading {heading}, pitch {pitch}, fov {fov}")
        return None

def has_transmission_lines(image):
    """Detect transmission lines in an image using YOLOv5."""
    results = model(image)
    return len(results.xyxy[0]) > 0  # True if transmission lines detected

### Improvement Functions

def calculate_powerline_points(interpolated_points, bearings, offset_distance, powerline_side):
    """Calculate powerline positions with precise geodetic offsets."""
    powerline_points = []
    for i, (lat, lon) in enumerate(interpolated_points):
        bearing = bearings[i] if i < len(bearings) else bearings[-1]
        perp_bearing = (bearing + 90) % 360 if powerline_side == "right" else (bearing + 270) % 360
        lon_pl, lat_pl, _ = geod.fwd(lon, lat, perp_bearing, offset_distance)
        powerline_points.append((lat_pl, lon_pl))
    return powerline_points

def calculate_coverage_range(camera_pos, powerline_pos, fov_h, fov_v, pitch, altitude):
    """Calculate horizontal and vertical coverage range based on camera parameters."""
    # 2D distance in degrees (approximation for small distances)
    distance = haversine(camera_pos[0], camera_pos[1], powerline_pos[0], powerline_pos[1]) * 1000  # meters
    if distance == 0:
        return 0, 0  # Avoid division by zero
    # Horizontal coverage
    horizontal_range = 2 * distance * math.tan(math.radians(fov_h / 2))
    # Vertical coverage adjusted by pitch
    pitch_rad = math.radians(pitch)
    try:
        vertical_range = 2 * altitude * math.tan(math.radians(fov_v / 2)) / math.cos(pitch_rad)
    except ZeroDivisionError:
        vertical_range = float('inf') if pitch == 90 or pitch == -90 else 0
    return horizontal_range, vertical_range

def generate_coverage_map(powerline_points, coverage_data, output_file="coverage_map.html"):
    """Generate an enhanced Folium map showing powerline path, coverage, and detailed info."""
    # Create map centered on the first powerline point
    m = folium.Map(
        location=[powerline_points[0][0], powerline_points[0][1]],
        zoom_start=15,
        tiles="CartoDB positron"
    )

    # Calculate coverage buffers using Shapely for accurate percentage
    coverage_buffers = []
    avg_lat = np.mean([p[0] for p in powerline_points])
    meters_per_deg_lon = 111000 * np.cos(np.radians(avg_lat))  # Approximate meters per degree longitude
    for key, (h_range, v_range) in coverage_data.items():
        # Parse the string key back into lat and lon
        lat, lon = map(float, key.split(','))
        camera_point = Point(lon, lat)
        radius_deg = h_range / meters_per_deg_lon  # Radius in degrees
        buffer = camera_point.buffer(radius_deg)
        coverage_buffers.append(buffer)

    # Determine which powerline points are covered
    covered_powerline_points = []
    for plat, plon in powerline_points:
        point = Point(plon, plat)
        is_covered = any(buffer.contains(point) for buffer in coverage_buffers)
        covered_powerline_points.append(is_covered)

    # Add powerline path with coverage-based coloring
    for i in range(len(powerline_points) - 1):
        p1 = powerline_points[i]
        p2 = powerline_points[i + 1]
        color = "green" if covered_powerline_points[i] or covered_powerline_points[i + 1] else "red"
        folium.PolyLine(
            [p1, p2],
            color=color,
            weight=2.5,
            opacity=0.8,
            tooltip="Powerline Segment"
        ).add_to(m)

    # Add distance markers between consecutive points
    for i in range(len(powerline_points) - 1):
        lat1, lon1 = powerline_points[i]
        lat2, lon2 = powerline_points[i + 1]
        distance = haversine(lat1, lon1, lat2, lon2) * 1000  # meters
        mid_point = [(lat1 + lat2) / 2, (lon1 + lon2) / 2]
        folium.Marker(
            location=mid_point,
            icon=folium.DivIcon(html=f'<div style="font-size: 10pt; color: gray">{distance:.0f}m</div>'),
            tooltip=f"Segment {i+1} Distance"
        ).add_to(m)

    # Add start and end markers
    folium.Marker(
        location=powerline_points[0],
        icon=folium.Icon(color="red", icon="play", prefix="fa"),
        popup=f"Start: Lat={powerline_points[0][0]:.6f}, Lon={powerline_points[0][1]:.6f}",
        tooltip="Start Point"
    ).add_to(m)
    folium.Marker(
        location=powerline_points[-1],
        icon=folium.Icon(color="red", icon="stop", prefix="fa"),
        popup=f"End: Lat={powerline_points[-1][0]:.6f}, Lon={powerline_points[-1][1]:.6f}",
        tooltip="End Point"
    ).add_to(m)

    # Add coverage areas as circles and camera markers
    heat_data = []
    for key, (h_range, v_range) in coverage_data.items():
        # Parse the string key back into lat and lon
        lat, lon = map(float, key.split(','))
        radius = min(h_range, v_range) / 2  # meters
        folium.Circle(
            location=[lat, lon],
            radius=radius,
            color="green",
            fill=True,
            fill_opacity=0.1,
            tooltip=f"Coverage: H={h_range:.1f}m, V={v_range:.1f}m\nLat={lat:.6f}, Lon={lon:.6f}"
        ).add_to(m)
        folium.Marker(
            location=[lat, lon],
            icon=folium.Icon(color="green", icon="camera", prefix="fa"),
            popup=f"Camera: Lat={lat:.6f}, Lon={lon:.6f}",
            tooltip="Camera with Coverage"
        ).add_to(m)
        heat_data.append([lat, lon, h_range / 100])  # Weight by h_range, scaled down

    # Add heatmap if enough data
    if len(heat_data) > 5:
        HeatMap(
            heat_data,
            radius=8,
            blur=4,
            gradient={"0.4": 'lightgreen', "0.6": 'lime', "0.8": 'green', "1": 'darkgreen'}

        ).add_to(m)

    # Add GeoJSON layer for clickable powerline points
    powerline_geojson = {
        "type": "FeatureCollection",
        "features": [
            {
                "type": "Feature",
                "geometry": {"type": "Point", "coordinates": [lon, lat]},
                "properties": {"coordinates": f"Lat: {lat:.6f}, Lon: {lon:.6f}"}
            } for lat, lon in powerline_points
        ]
    }
    GeoJson(powerline_geojson, popup=GeoJsonPopup(fields=["coordinates"])).add_to(m)

    # Add legend
    legend_html = """
    <div style="position: fixed; bottom: 50px; left: 50px; width: 180px; height: 160px; 
                background-color: white; opacity: 0.7; border:2px solid grey; z-index:9999; font-size:12px;">
      <b>Legend</b> <br>
      Green Line: Covered Powerline <br>
      Red Line: Uncovered Powerline <br>
      Green Circles: Coverage Areas <br>
      Green Camera: Camera Positions <br>
      Red Icons: Start/End Points <br>
      Green Heatmap: Coverage Density <br>
    </div>
    """
    m.get_root().html.add_child(folium.Element(legend_html))

    # Add scale and measurement tool
    MeasureControl(position='bottomleft', primary_length_unit='meters').add_to(m)
    folium.plugins.Fullscreen(position='topright').add_to(m)

    # Calculate and add coverage statistics
    covered_points = sum(covered_powerline_points)
    total_points = len(powerline_points)
    total_length = (total_points - 1) * 1  # meters, assuming 1-meter intervals
    coverage_percentage = (covered_points / total_points) * 100 if total_points > 0 else 0
    coverage_stats = (f"Coverage: {covered_points} of {total_points} points<br>"
                      f"Length: {covered_points} of {total_length} m<br>"
                      f"Percentage: {coverage_percentage:.1f}%")
    folium.Marker(
        location=[powerline_points[0][0] + 0.0005, powerline_points[0][1]],  # Offset slightly
        icon=folium.DivIcon(html=f'<div style="font-size: 12pt; color: black">{coverage_stats}</div>'),
        tooltip="Coverage Summary"
    ).add_to(m)

    # Save the map
    m.save(output_file)
    print(f"Enhanced coverage map saved to: {output_file}")

    return coverage_percentage  # Optional return for further useOptional return for further use

def cleanup_dropped_images(dropped_images, output_dir, move_instead=False):
    """Clean up dropped images by deleting or moving them."""
    dropped_dir = os.path.join(output_dir, "dropped")
    if move_instead:
        os.makedirs(dropped_dir, exist_ok=True)
    for image_filename in dropped_images:
        image_path = os.path.join(output_dir, "kept", image_filename) if os.path.exists(os.path.join(output_dir, "kept", image_filename)) else os.path.join(output_dir, "dropped", image_filename)
        if os.path.exists(image_path):
            if move_instead:
                os.rename(image_path, os.path.join(dropped_dir, os.path.basename(image_filename)))
            else:
                os.remove(image_path)

### Capture and Processing Functions

def capture_right_side_images(lat, lon, road_heading, index, interpolated_powerline_points, coverage_data, lock):
    """Capture right-side images and calculate coverage."""
    has_lines = False
    dropped_images = []
    camera_pos = (lat, lon, camera_altitude)
    for angle in relative_angles_right:
        fov_values = fov_special if angle in [45, 135] else fov_def
        transmission_heading = (road_heading + angle) % 360
        for pitch in pitch_values:
            for fov in fov_values:
                image = get_streetview_image(lat, lon, transmission_heading, pitch, fov)
                if image:
                    filename = f"snapshot_right_{index}_heading_{angle}_pitch_{pitch}_fov_{fov}.jpg"
                    image_path = os.path.join(output_dir, filename)
                    image.save(image_path)
                    print(f"Saved {filename}")
                    if has_transmission_lines(image):
                        has_lines = True
                        # Calculate coverage for each powerline point
                        for powerline_pos in interpolated_powerline_points:
                            h_range, v_range = calculate_coverage_range(
                                camera_pos, powerline_pos, fov, fov, pitch, camera_altitude
                            )
                            with lock:
                                # Use string keys instead of tuple keys
                                coverage_data[f"{lat},{lon}"] = (h_range, v_range)
                    else:
                        dropped_images.append(filename)
                time.sleep(0.5)
    return has_lines, dropped_images

def capture_left_side_images(lat, lon, road_heading, index, interpolated_powerline_points, coverage_data, lock):
    """Capture left-side images and calculate coverage."""
    has_lines = False
    dropped_images = []
    camera_pos = (lat, lon, camera_altitude)
    for angle in relative_angles_left:
        fov_values = fov_special if angle in [310, 230] else fov_def
        transmission_heading = (road_heading + angle) % 360
        for pitch in pitch_values:
            for fov in fov_values:
                image = get_streetview_image(lat, lon, transmission_heading, pitch, fov)
                if image:
                    filename = f"snapshot_left_{index}_heading_{angle}_pitch_{pitch}_fov_{fov}.jpg"
                    image_path = os.path.join(output_dir, filename)
                    image.save(image_path)
                    print(f"Saved {filename}")
                    if has_transmission_lines(image):
                        has_lines = True
                        # Calculate coverage for each powerline point
                        for powerline_pos in interpolated_powerline_points:
                            h_range, v_range = calculate_coverage_range(
                                camera_pos, powerline_pos, fov, fov, pitch, camera_altitude
                            )
                            with lock:
                                # Use string keys instead of tuple keys
                                coverage_data[f"{lat},{lon}"] = (h_range, v_range)
                    else:
                        dropped_images.append(filename)
                time.sleep(0.5)
    return has_lines, dropped_images

def process_point(i, lat, lng, road_heading, interpolated_powerline_points, coverage_data, lock, dropped_images_list, dropped_images_lock):
    """Process a single point along the route."""
    print(f"Processing point {i + 1} of {len(interpolated_points)}: ({lat}, {lng})")
    print(f"Road Heading: {road_heading}")
    has_lines_right, dropped_right = capture_right_side_images(
        lat, lng, road_heading, i, interpolated_powerline_points, coverage_data, lock
    )
    has_lines_left, dropped_left = capture_left_side_images(
        lat, lng, road_heading, i, interpolated_powerline_points, coverage_data, lock
    )
    with dropped_images_lock:
        dropped_images_list.extend(dropped_right + dropped_left)

### Main Execution

# Get route coordinates
snapshot_points = get_route_coordinates(start_point, end_point)
if not snapshot_points:
    print("No route points available.")
    exit()

# Interpolate road points at 20-meter intervals
interpolated_points = interpolate_route(snapshot_points, interval=20)
print(f"Total interpolated road points: {len(interpolated_points)}")

# Calculate bearings for road points
bearings = []
for i in range(len(interpolated_points) - 1):
    lat1, lon1 = interpolated_points[i]
    lat2, lon2 = interpolated_points[i + 1]
    road_heading = calculate_bearing(lat1, lon1, lat2, lon2)
    bearings.append(road_heading)

# Calculate powerline points
offset_distance = 10  # meters
powerline_side = "right"
powerline_points = calculate_powerline_points(interpolated_points, bearings, offset_distance, powerline_side)

# Interpolate powerline points at 1-meter intervals
interpolated_powerline_points = interpolate_route(powerline_points, interval=1)
print(f"Total interpolated powerline points: {len(interpolated_powerline_points)}")

# Initialize shared data structures
coverage_data = {}  # Dictionary to store coverage ranges for each camera position
lock = Lock()
dropped_images_list = []
dropped_images_lock = Lock()

# Process points in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    for i, (lat, lng) in enumerate(interpolated_points):
        road_heading = bearings[i] if i < len(bearings) else bearings[-1]
        futures.append(executor.submit(
            process_point, i, lat, lng, road_heading, interpolated_powerline_points,
            coverage_data, lock, dropped_images_list, dropped_images_lock
        ))
    for future in as_completed(futures):
        future.result()

# Generate enhanced coverage map
generate_coverage_map(interpolated_powerline_points, coverage_data, os.path.join(output_dir, "coverage_map.html"))
print("Enhanced coverage map generated: coverage_map.html")

# Clean up dropped images (optional, already moved to dropped folder during capture)
cleanup_dropped_images(dropped_images_list, output_dir, move_instead=False)
print("Dropped images cleanup completed (if any).")

# Calculate approximate coverage percentage (simplified)
total_length = len(interpolated_powerline_points) - 1  # meters (simplified)
covered_points = len(coverage_data)  # Number of camera positions with coverage
coverage_percentage = (covered_points / len(interpolated_points)) * 100 if interpolated_points else 0
print(f"Total powerline length: {total_length:.2f} meters")
print(f"Number of covered camera positions: {covered_points}")
print(f"Approximate coverage percentage: {coverage_percentage:.2f}%")
print("Processing completed!")

Using cache found in C:\Users\Daniel Roberts/.cache\torch\hub\ultralytics_yolov5_master
YOLOv5  2025-2-20 Python-3.12.7 torch-2.6.0+cpu CPU



Error: Model file not found at C:/Users/Daniel Roberts/Downloads/HydroOne/transmission_model.pt. Falling back to YOLOv5s pre-trained model.


Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Total interpolated road points: 18
Total interpolated powerline points: 562
Processing point 1 of 18: (43.71078, -79.46943)
Road Heading: 72.235484704881
Processing point 2 of 18: (43.71083474612659, -79.46919360005764)
Road Heading: 73.52568851975809
Processing point 3 of 18: (43.71096985188514, -79.4685615506176)
Road Heading: 69.95752147875834
Processing point 4 of 18: (43.71109230351391, -79.46809718197335)
Road Heading: 72.34626334111942
Processing point 5 of 18: (43.71121082924881, -79.46758194213427)
Road Heading: 73.8557807502537
Saved snapshot_right_1_heading_90_pitch_0_fov_55.jpg
Saved snapshot_right_2_heading_90_pitch_0_fov_55.jpg
Saved snapshot_right_4_heading_90_pitch_0_fov_55.jpg
Saved snapshot_right_3_heading_90_pitch_0_fov_55.jpg
Saved snapshot_right_0_heading_90_pitch_0_fov_55.jpg
Saved snapshot_right_4_heading_90_pitch_30_fov_55.jpg
Saved snapshot_right_0_heading_90_pitch_30_fov_55.jpg
Saved snapshot_right_1_heading_90_pitch_30_fov_55.jpg
Saved snapshot_right_3_headin