In [6]:
from custom_interfaces.msg import (
    GPS,
    AISShips,
    HelperLatLon,
    Path,
    WindSensor,
    HelperHeading,
    HelperSpeed,
)
from rclpy.impl.rcutils_logger import RcutilsLogger

import local_pathfinding.coord_systems as cs
import local_pathfinding.objectives as objectives
import local_pathfinding.ompl_path as ompl_path
from local_pathfinding.local_path import LocalPathState
from typing import List
from geopy.distance import geodesic
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import product
from sklearn.model_selection import ParameterGrid
import sys
import os
import math
from ompl import geometric as og
from dataclasses import dataclass
from ompl import base
from shapely.geometry import MultiPolygon
from local_pathfinding.objectives import get_sailing_objective

In [5]:
!pip3 install scikit-learn

Defaulting to user installation because normal site-packages is not writeable
Collecting scikit-learn
  Downloading scikit_learn-1.6.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (12.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.6/12.6 MB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting joblib>=1.2.0
  Downloading joblib-1.5.0-py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 KB[0m [31m96.5 MB/s[0m eta [36m0:00:00[0m
Collecting threadpoolctl>=3.1.0
  Downloading threadpoolctl-3.6.0-py3-none-any.whl (18 kB)
Installing collected packages: threadpoolctl, joblib, scikit-learn
Successfully installed joblib-1.5.0 scikit-learn-1.6.1 threadpoolctl-3.6.0


# The code in the next block is copy pasted code from the various files to support the simulation 

In [None]:
# OMPL class simplified:



from shapely.geometry import box


BOX_BUFFER_SIZE = 1.0  # km


class OMPLPath:
    """Represents the general OMPL Path.

    Attributes
        _logger (RcutilsLogger): ROS logger of this class.
        _simple_setup (og.SimpleSetup): OMPL SimpleSetup object.
        _box_buffer (float): buffer around the sailbot position and the goal position in km
        solved (bool): True if the path is a solution to the OMPL query, else false.

    Static Attributes
        all_land_data (MultiPolygon): All land polygons along the entire global voyage
        obstacles (List[Polygon]): The list of all obstacles Sailbot is currently aware of.
                                   This is a static attribute so that OMPL can access it when
                                   accessing the is_state_valid function pointer.
    """

    all_land_data = None

    def __init__(
        self,
        max_runtime: float,
        local_path_state: LocalPathState,
    ):
        """Initialize the OMPLPath Class. Attempt to solve for a path.

        Args:
            parent_logger (RcutilsLogger): Logger of the parent class.
            max_runtime (float): Maximum amount of time in seconds to look for a solution path.
            local_path_state (LocalPathState): State of Sailbot.
        """
        self._box_buffer = BOX_BUFFER_SIZE
        self._simple_setup = self._init_simple_setup(local_path_state)  # this needs state

        self.solved = self._simple_setup.solve(time=max_runtime)  # time is in seconds

    def _init_simple_setup(self, local_path_state) -> og.SimpleSetup:
        self.state = local_path_state

        # Create buffered spaces and extract their centers
        start_position_in_xy = cs.latlon_to_xy(self.state.reference_latlon, self.state.position)
        start_box = self.create_buffer_around_position(start_position_in_xy)
        start_x = start_position_in_xy.x
        start_y = start_position_in_xy.y

        goal_position = self.state.global_path.waypoints[-1]
        goal_position_in_xy = cs.latlon_to_xy(self.state.reference_latlon, goal_position)
        goal_polygon = self.create_buffer_around_position(goal_position_in_xy)
        goal_x, goal_y = goal_position_in_xy

        # create an SE2 state space: rotation and translation in a plane
        space = base.SE2StateSpace()

        # set the bounds of the state space
        bounds = base.RealVectorBounds(dim=2)
        state_space = box(*MultiPolygon([start_box, goal_polygon]).bounds)
        x_min, y_min, x_max, y_max = state_space.bounds

        if x_max <= x_min or y_max <= y_min:
            raise ValueError(f"Invalid bounds: x=[{x_min}, {x_max}], y=[{y_min}, {y_max}]")
        bounds.setLow(0, x_min)
        bounds.setLow(1, y_min)
        bounds.setHigh(0, x_max)
        bounds.setHigh(1, y_max)
        """self._logger.debug(
            "state space bounds: "
            f"x=[{bounds.low[0]}, {bounds.high[0]}]; "
            f"y=[{bounds.low[1]}, {bounds.high[1]}]"
        )"""
        bounds.check()  # check if bounds are valid
        space.setBounds(bounds)

        OMPLPath.init_obstacles(local_path_state=local_path_state, state_space_xy=state_space)

        # create a simple setup object
        simple_setup = og.SimpleSetup(space)
        simple_setup.setStateValidityChecker(base.StateValidityCheckerFn(OMPLPath.is_state_valid))

        start = base.State(space)
        goal = base.State(space)
        start().setXY(start_x, start_y)
        goal().setXY(goal_x, goal_y)
        self._logger.debug(
            "start and goal state: "
            f"start=({start().getX()}, {start().getY()}); "
            f"goal=({goal().getX()}, {goal().getY()})"
        )
        simple_setup.setStartAndGoalStates(start, goal)

        # Constructs a space information instance for this simple setup
        space_information = simple_setup.getSpaceInformation()

        # figure this out
        self.state.planner = og.RRTstar(space_information)

        # set the optimization objective of the simple setup object
        # TODO: implement and add optimization objective here

        objective = get_sailing_objective(
            space_information,
            simple_setup,
            # This too
            self.state.heading,
            self.state.speed,
            self.state.wind_direction,
            self.state.wind_speed,
        )

        simple_setup.setOptimizationObjective(objective)

        # set the planner of the simple setup object
        simple_setup.setPlanner(og.RRTstar(space_information))

        return simple_setup

NameError: name 'og' is not defined

In [None]:
# Systematic weight optimization for local path planning using grid search


from local_pathfinding.objectives import SpeedObjective

MEAN_SPEED = HelperSpeed(speed=15.0)  # mean boat speed in kmph

START_POINT = HelperLatLon(latitude=48.46, longitude=-125.1)  # Starting location of the mock
START_HEADING = HelperHeading(heading=180.0)  # in degrees, heading of the boat

TRUE_WIND_SPEED = 7.0

calculator = geodesic()


@dataclass
class Weights:
    minimum_turning_angle_weight: int
    speed_weight: int
    distance_weight: int
    wind_weight: int


@dataclass
class TrueWind:
    direction: int
    speed: float


@dataclass
class TrueWindGPSPair:
    true_wind: TrueWind
    wind: WindSensor
    gps: GPS


@dataclass
class GPSWindSensorPath:
    gps: GPS
    wind_sensor: WindSensor
    path: Path


ais_ships = AISShips(ships=[])


# 1296  is 6^4 for weights
def get_apparent_wind(
    true_wind_direction: int, true_wind_speed: float, boat_direction: int, boat_speed: float
):
    """Calculates the apparent wind direction and speed from true wind and boat motion.

    Args:
        true_wind_direction (int): The true wind direction in degrees
        true_wind_speed (float): The true wind speed in kmph
        boat_direction (int): The boat's heading in degrees
        boat_speed (float): The boat's speed in kmph

    Returns:
        tuple: (apparent_wind_direction, apparent_wind_speed) where direction is in degrees
               with true north = 0, east = -90, west = +90
    """
    # Convert to radians
    true_wind_radians = math.radians(true_wind_direction)
    boat_direction_radians = math.radians(cs.bound_to_180(boat_direction + 180))

    # Calculate true wind components (east, north)
    true_wind_east = true_wind_speed * math.sin(true_wind_radians)
    true_wind_north = true_wind_speed * math.cos(true_wind_radians)

    # Calculate boat velocity components (east, north)
    boat_velocity_east = boat_speed * math.sin(boat_direction_radians)
    boat_velocity_north = boat_speed * math.cos(boat_direction_radians)

    # Calculate apparent wind components
    # Apparent wind = true wind - boat velocity
    apparent_wind_east = true_wind_east - boat_velocity_east
    apparent_wind_north = true_wind_north - boat_velocity_north

    # Calculate apparent wind direction and speed
    apparent_wind_direction_radians = math.atan2(apparent_wind_east, apparent_wind_north)
    apparent_wind_direction_degrees = math.degrees(apparent_wind_direction_radians)

    apparent_wind_speed = math.sqrt(apparent_wind_east**2 + apparent_wind_north**2)

    return int(apparent_wind_direction_degrees), apparent_wind_speed


def generate_true_wind(num_samples=1296) -> List[TrueWind]:
    wind_arr = []
    heading = -180
    for _ in range(num_samples):
        if heading > 180:
            break
        if heading == -180:
            wind_obj = TrueWind(0, TRUE_WIND_SPEED)
        else:
            wind_obj = TrueWind(heading, TRUE_WIND_SPEED)
        heading += (360) // num_samples
        wind_arr.append(wind_obj)
    return wind_arr


def generate_GPS_and_WindSensor(num_samples=1296) -> List[TrueWindGPSPair]:
    """Generates GPS data with a fixed speed and starting location and changing heading from -180 to 180

    Args:
        num_samples (int, optional): _description_. Defaults to 1296.

    Returns:
        List[GPS]: _description_
    """
    print("called generate_GPS_and_WindSensor")
    true_wind_arr: List[TrueWind] = generate_true_wind()
    results: List[TrueWindGPSPair] = []
    for wind_obj in true_wind_arr:
        heading = -180.0
        for heading in range(18):
            estimated_sailboat_speed = 0
            if heading == -180:
                estimated_sailboat_speed = SpeedObjective.get_sailbot_speed(
                    0, wind_obj.direction, wind_obj.speed
                )
                gps_obj = GPS(
                    lat_lon=START_POINT,
                    heading=HelperHeading(heading=0.0),
                    speed=HelperSpeed(speed=estimated_sailboat_speed),
                )
                apparent_wind = get_apparent_wind(
                    wind_obj.direction, wind_obj.speed, 0, estimated_sailboat_speed
                )
                wind_sensor = WindSensor(
                    speed=HelperSpeed(speed=apparent_wind[1]), direction=apparent_wind[0]
                )
            else:
                estimated_sailboat_speed = SpeedObjective.get_sailbot_speed(
                    heading, wind_obj.direction, wind_obj.speed
                )
                gps_obj = GPS(
                    lat_lon=START_POINT,
                    heading=HelperHeading(heading=math.radians(heading)),
                    speed=HelperSpeed(speed=estimated_sailboat_speed),
                )
                apparent_wind = get_apparent_wind(
                    wind_obj.direction, wind_obj.speed, heading, estimated_sailboat_speed
                )
                wind_sensor = WindSensor(
                    speed=HelperSpeed(speed=apparent_wind[1]), direction=apparent_wind[0]
                )

            heading += 20.0
        results.append(TrueWindGPSPair(wind_obj, wind_sensor, gps_obj))

    return results


def generate_paths(num_samples=1296, distance=1296) -> List[Path]:
    """Generates a list of paths. Each path is a list of waypoints. Since we don't need the entire array of waypoints
    because LocalPathState doesn't use it, we are going to only create the goal point and store it in the array. It
    returns a list of list of single waypoints.

    Args:
        num_samples (int, optional): _description_. Defaults to 1296.
        distance (int, optional): in km

    Returns:
        List[Path]: _description_
    """
    array_of_paths: List[Path] = []
    heading = 0
    point = (START_POINT.latitude, START_POINT.longitude)
    for _ in range(num_samples):
        destination = calculator.destination(point, bearing=heading, distance=distance)
        heading += (360) / num_samples
        # only one element in the path array since we don't need the rest for this purpose
        reference_lat_lon: HelperLatLon = HelperLatLon(
            latitude=destination.latitude, longitude=destination.longitude
        )
        array_of_paths.append(Path(waypoints=[reference_lat_lon]))

    return array_of_paths

    # generate way points starting at start point


def generate_weights(num_samples: int = 1296) -> List[Weights]:
    """Generate weights with possible values 1, 2, 4, 8, 16, 32"""
    weight_arr = []

    # Calculate how many values per parameter (assuming equal distribution)
    # For 1296 samples with 4 parameters, we need 6 values per parameter (6^4 = 1296)
    possible_values = [1, 2, 4, 8, 16, 32]

    for speed_weight in possible_values:
        for distance_weight in possible_values:
            for min_turning_angle_weight in possible_values:
                for wind_weight in possible_values:
                    weight_arr.append(
                        Weights(
                            minimum_turning_angle_weight=min_turning_angle_weight,
                            speed_weight=speed_weight,
                            distance_weight=distance_weight,
                            wind_weight=wind_weight,
                        )
                    )

    return weight_arr


def get_gps_windSensor_paths():
    print("called get_gps_windSensor_paths")
    paths_arr = generate_paths()
    gps_wind_sensor_arr = generate_GPS_and_WindSensor()
    results: List[GPSWindSensorPath] = []
    for path in paths_arr:
        for gps_wind_sensor in gps_wind_sensor_arr:
            gpsWindSensorPath = GPSWindSensorPath(
                gps=gps_wind_sensor.gps, wind_sensor=gps_wind_sensor.wind, path=path
            )
            results.append(gpsWindSensorPath)
    return results


def get_all_path_state() -> List[LocalPathState]:
    print("get all path states called")
    gps_wind_sensorPaths = get_gps_windSensor_paths()
    lps_arr: List[LocalPathState] = []
    for gps_wind_sensor in gps_wind_sensorPaths:
        lps_arr.append(
            LocalPathState(
                gps_wind_sensor.gps,
                ais_ships,
                gps_wind_sensor.path,
                gps_wind_sensor.wind_sensor,
                "rrtstar",
            )
        )
    return lps_arr


lps_arr = get_all_path_state()
print("complete")
print(len(lps_arr))

def update_if_needed(
    self,
    lps_arr: List[LocalPathState]
) -> bool:
    """Updates the OMPL path, waypoints and current state. The path is updated if a new path
        is found. Returns true if the path is updated and false otherwise.

    Args:
        gps (ci.GPS): GPS data.
        ais_ships (ci.AISShips): AIS ships data.
        global_path (ci.Path): Path to the destination.
        filtered_wind_sensor (ci.WindSensor): Wind data.
    """
    # this raises ValueError if any of the parameters are not properly initialized
    
    for lps in lps_arr:

        ompl_path = OMPLPath(
            parent_logger=self._logger,
            max_runtime=1.0,
            local_path_state=lps,
        )
    if ompl_path.solved:
        self._logger.debug("Updating local path")
        self._update(ompl_path)
        return True
    return False

def _update(self, ompl_path: OMPLPath):
    self._ompl_path = ompl_path
    self.path = self._ompl_path.get_path()

get all path states called
called get_gps_windSensor_paths
called generate_GPS_and_WindSensor
complete
1679616
