In [95]:
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from stable_baselines3 import A2C
from stable_baselines3.common.callbacks import BaseCallback
import time

In [96]:
def generate_random_coordinates(x_min, x_max, y_min, y_max, n=1, threshold=0.08):
    """
    Generate random (x, y) coordinates within specified bounds, ensuring no two points are closer than the threshold.

    Parameters:
    - n: int, number of coordinates to generate
    - x_min: float, minimum value for x coordinates
    - x_max: float, maximum value for x coordinates
    - y_min: float, minimum value for y coordinates
    - y_max: float, maximum value for y coordinates
    - threshold: float, minimum Euclidean distance between any two points

    Returns:
    - coordinates: array of shape (n, 2) containing generated (x, y) coordinates
    """
    coordinates = []

    while len(coordinates) < n:
        # Generate a new candidate point
        x_new = np.random.uniform(x_min, x_max)
        y_new = np.random.uniform(y_min, y_max)
        new_point = np.array([x_new, y_new])

        # Check distance to all existing points
        if all(
            np.linalg.norm(new_point - existing_point) > threshold
            for existing_point in coordinates
        ):
            coordinates.append(new_point)

    return np.array(coordinates)

In [97]:
def calculate_distance(point1, point2):
    # Convert points to NumPy arrays
    point1 = np.array(point1)
    point2 = np.array(point2)

    # Calculate the distance
    distance = np.linalg.norm(point1 - point2)

    return np.abs(distance)


pass

In [98]:
def calculate_movement(v1, v2, L, delta_t):
    # Calculate forward distance
    d = (v1 + v2) / 2 * delta_t
    
    # Calculate angular velocity
    omega = (v2 - v1) / L
    
    # Calculate change in orientation
    delta_theta = omega * delta_t
    
    return d, delta_theta



In [99]:
def get_alignment_to_target(orientation,current_coordinate, destination_coordinate):
    theta = np.arctan2(
        destination_coordinate[1] - current_coordinate[1],
        destination_coordinate[0] - current_coordinate[0],
    )
    
    return np.degrees(theta) - orientation



In [100]:
class RobotBase:

    position: list[float]
    orientation: float = 0
    speed: float = 1
    angular_speed = 10
    obstacle_threshold = 1
    id: int

    def __init__(
        self,
        id: int,
        initial_position,
        orientation=0,
        speed: float = 1,
        angular_speed=10,
        obstacle_threshold=1,
    ):
        self.id = id
        self.position = initial_position
        self.orientation = orientation
        self.speed = speed
        self.angular_speed = angular_speed
        self.obstacle_threshold = obstacle_threshold

    @property
    def x(self):
        return self.position[0]

    @property
    def y(self):
        return self.position[1]

    @x.setter
    def x(self, v):
        self.position[0] = v

    @y.setter
    def y(self, v):
        self.position[1] = v

    def forward(self):
        pass

    def stop(self):
        pass

    def turn_left(self):
        pass

    def turn_right(self):
        pass

    def detect_obstacles(self):
        pass

    def move(self, left_velocity, right_velocity):
        pass

    def is_out_of_range(self):
        pass


pass

In [101]:
class AgentBase(gym.Env):

    def reset(self, *args, **kwargs):
        pass

    def render(self):
        pass

    def close(self):
        pass

    def step(self, action):
        pass


pass

In [102]:
def plot_env(
    grid_size,
    robot: RobotBase,
    target,
    ax
):
    if ax is None:
        fig = plt.figure()
        ax = fig.add_subplot(1, 1, 1)
    ax.set_xlim(0, grid_size[0])
    ax.set_ylim(0, grid_size[1])
    ax.grid(True)

    if target is not None:
        ax.plot(
            target[0],
            target[1],
            "gx",
            label="Targets",
            markersize=10,
        )

    if robot is not None:

        length = 1
        O_radians = np.deg2rad(robot.orientation)
        x, y = robot.x, robot.y
        x_end = x + length * np.cos(O_radians)
        y_end = y + length * np.sin(O_radians)

        ax.plot(
            x,
            y,
            "bo",
            label="Robots",
            markersize=10,
        )

        # Plot the arrow indicating the direction
        ax.quiver(
            x,
            y,
            x_end - x,
            y_end - y,
            angles="xy",
            scale_units="xy",
            scale=1,
            color="r",
        )
    ax.set_aspect("equal")
    ax.legend()
    return ax


pass

In [103]:
class RobotContinuous(RobotBase):
    occupancy_grid: np.array
    grid_size: list[float]
    _is_out_of_range=False

    def __init__(
        self,
        id: int,
        grid_size: list[float],
        initial_position,
        orientation=0,
        speed=1,
        angular_speed=10,
        obstacle_threshold=1,
        aperture=120,
        axial_distance=1,
        basic_timestep=1,
    ):
        super().__init__(
            id,
            initial_position,
            orientation,
            speed,
            angular_speed=angular_speed,
        )
        self.grid_size = grid_size
        self.obstacle_threshold = obstacle_threshold
        self.aperture = aperture
        self.axial_distance = axial_distance
        self.basic_timestep = basic_timestep

    def detect_obstacles(self):
        pass
    
    def is_out_of_range(self):
        return self._is_out_of_range

    def move(self, left, right):
        left_velocity, right_velocity = self.speed * left, self.speed * right

        distance, orientation_change = calculate_movement(
            left_velocity,
            right_velocity,
            self.axial_distance,
            self.basic_timestep,
        )

        O_radians = orientation_change + np.radians(self.orientation)
        
        x_new = self.x + distance * np.cos(O_radians)
        y_new = self.y + distance * np.sin(O_radians)

        self.x = x_new
        self.y = y_new
        self.orientation = np.degrees(O_radians)
        if (
            x_new >= self.grid_size[0]
            or y_new >= self.grid_size[1]
            or x_new < 0
            or y_new < 0
        ):
            # print(f"Robot {self.id} out of bounds")
            self._is_out_of_range=True
            return False
        
        self._is_out_of_range=False

In [104]:
class RenderCallback(BaseCallback):
    def __init__(self, env, verbose=0):
        super(RenderCallback, self).__init__(verbose)
        self.env = env

    def _on_step(self) -> bool:
        self.env.render()
        return True
    
    def _on_training_end(self)->None:
        self.env.close()



In [175]:
class MoveToTargetAgent(AgentBase):

    robot: RobotBase
    target_pos: list = [0, 0]
    initial_position = [0, 0]
    initial_orientation = 0
    plot = None
    agent_type = None

    def __init__(
        self,
        robot: RobotBase,
        target_pos: list,
        grid_size,
        should_render=True,
    ):
        self.robot = robot
        self.should_render = should_render

        self.target_pos = target_pos

        self.initial_position = [robot.x, robot.y]
        self.initial_orientation = robot.orientation
        self.grid_size = grid_size

        self.fig = plt.figure()
        self.ax = self.fig.add_subplot(1, 1, 1)
        self.screen = None

        self.observation_space = gym.spaces.Box(
            low=np.array([-1]),
            high=np.array([1]),
            shape=(1,),
            dtype=np.float32,
        )

        self.action_space = gym.spaces.Box(
            low=np.array([0,0]),
            high=np.array([1,1]),
            shape=(2,),
            dtype=np.float32,
        )
        
        self.render_tag = ""

    def reset(self, *args, **kwargs):
        training_angles = [0, 45, 90, 135, 180, 180 + 45, 270, 270 + 45]
        self.initial_orientation = training_angles[
            np.random.randint(0, len(training_angles))
        ]

        self.robot.x = self.initial_position[0]
        self.robot.y = self.initial_position[1]
        self.robot.orientation = self.initial_orientation

        observation = self.get_obs()

        info = {}

        return observation, info

    def render(self):
        if self.should_render == False:
            return
        self.ax.clear()
        plot_env(
            self.grid_size,
            self.robot,
            self.target_pos,
            ax=self.ax,
        )

        self.fig.savefig(
            f"renders/render_{self.render_tag}.png",
            format="png",
            bbox_inches="tight",
            pad_inches=0,
        )

    def close(self):
        pass

    def get_alignment(self):
        v = get_alignment_to_target(
            self.robot.orientation,
            [
                self.robot.x,
                self.robot.y,
            ],
            self.target_pos,
        )
        v = v % 360
        if v > 180:
            v = -(180 - (v - 180))

        return v / 180

    def get_obstacles(self):
        obs = self.robot.detect_obstacles()
        return obs / self.robot.obstacle_threshold

    def get_distance_to_target(self):
        travel_distance = calculate_distance(self.initial_position, self.target_pos)
        current_distance = calculate_distance(
            (self.robot.x, self.robot.y), self.target_pos
        )
        d = (travel_distance - current_distance) / travel_distance
        if d < -1:
            d = -1
        if d > 1:
            d = 1

        return d

    def get_obs(self):
        return [
            self.get_alignment(),
        ]

    def step(self, action):
        o1 = self.get_obs()[0]

        self.robot.move(action[0], action[1])

        o2 = self.get_obs()[0]

        degree_thres = 3/180
        distance_thres = 0.95
        reward = 0

        if np.abs(o2) > degree_thres:
            reward += -np.abs(o2)
        
        distance = self.get_distance_to_target()

        print(reward, distance)

        observation = [o2]
        terminated = np.abs(distance) >= distance_thres or self.robot.is_out_of_range()
        truncated = False
        info = {}
        return observation, reward, terminated, truncated, info

    def learn(
        self,
        total_timesteps=100,
    ):
        self.initial_position = [10, 5]
        self.target_pos = [10, 15]
        np.random.seed(7)
        self.reset()

        self.max_time_allowed = 0
        model = A2C("MlpPolicy", self, verbose=1)
        callback = RenderCallback(
            self,
        )
        model.learn(
            total_timesteps=total_timesteps,
            log_interval=5,
            callback=callback,
        )

        model.save(f"models/a2c-v3-{total_timesteps}")

    def predict(self, dir=""):

        model = A2C.load(dir)
        observation, _ = self.reset()
        while True:
            action, _ = model.predict(observation)
            observation, _, terminated, truncated, info = self.step(action)
            self.render()
            if terminated or truncated:
                break

In [None]:
grid_size = [20, 20]

r1 = RobotContinuous(
    grid_size=grid_size,
    initial_position=[17, 2],
    id=1,
    speed=0.1,
    obstacle_threshold=3,
    aperture=120,
    angular_speed=10,
    orientation=0,
    basic_timestep=1,
    axial_distance=1,
)

a1 = MoveToTargetAgent(
    robot=r1,
    grid_size=grid_size,
    target_pos=[0,0],
)
a1.should_render=False
a1.learn(total_timesteps=30000)


In [None]:
a2 = MoveToTargetAgent(
    robot=r1,
    grid_size=grid_size,
    target_pos=[0,0],
)
a2.initial_position=[15,4]
a2.target_pos=[12,18]
a2.reset()
a2.predict(dir="models/a2c-v3-30000")