<a href="https://colab.research.google.com/github/hariharan2302/RL-Project/blob/main/RVO2_Environment_for_Simulation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python
import gymnasium as gym
from gym import spaces
import rvo2
import numpy as np
import matplotlib.pyplot as plt
import torch

def point_to_segment_dist(x1, y1, x2, y2, x3, y3):
    """
    Calculate the closest distance between point(x3, y3) and a line segment with two endpoints (x1, y1), (x2, y2)

    """
    px = x2 - x1
    py = y2 - y1

    if px == 0 and py == 0:
        return np.linalg.norm((x3-x1, y3-y1))

    u = ((x3 - x1) * px + (y3 - y1) * py) / (px * px + py * py)

    if u > 1:
        u = 1
    elif u < 0:
        u = 0

    # (x, y) is the closest point to (x3, y3) on the line segment
    x = x1 + u * px
    y = y1 + u * py

    return np.linalg.norm((x - x3, y-y3))

class humans():
    def __init__(self) -> None:
        pass

class Robot():
    def __init__(self) -> None:
        self.initial_pos = (5, 0)
        self.goal_pos = (5, 10)
        self.current_pos = self.initial_pos
        self.velocity = (0, 0)
        # self.get_current_pos()

    def get_current_pos(self, velocity):
        if self.current_pos is None:
            self.current_pos = self.initial_pos
        else:
            ix, iy = self.current_pos
            vx, vy = velocity
            self.velocity = velocity
            x, y = (ix + min(vx,5)*0.1, iy + min(vy,5)*0.1)
            clipped_x = max(0, min(x, 10))
            clipped_y = max(0, min(y, 10))
            self.current_pos = (clipped_x, clipped_y)

        self.v_pref = np.array(self.goal_pos) - np.array(self.current_pos)
        return self.current_pos


class Environment_Sim(gym.Env):
    def __init__(self,number):
        super(Environment_Sim, self).__init__()
        self.humans_size = number
        self.humans_pos = np.random.randint(1, 11, size=(self.humans_size, 2))
        self.goal_pos = np.random.randint(1, 11, size=(self.humans_size, 2))
        self.sim = rvo2.PyRVOSimulator(1/15, 1.5, 10, 1.5, 2, 0.4, 2)
        self.humans = []
        self.agents = [self.sim.addAgent(tuple(pos)) for pos in self.humans_pos]
        self.fig, self.ax = plt.subplots()

        self.robot = Robot()
        self.dones = np.array([False]*(self.humans_size+1))
        self.time_step = 1/20

        #action space
        self.max_steps = 100
        self.action_space = spaces.Discrete(9)
        self.action_space_map = {
            0: [0, 0],   # Stop moving
            1: [1.7, 0],   # Right
            2: [0, 1.7],   # Up
            3: [-1.7, 0],  # Left
            4: [0, -1.7],  # Down
            5: [1.7, 1.7],   # Up-Right (Diagonal)
            6: [-1.7, 1.7],  # Up-Left (Diagonal)
            7: [1.7, -1.7],  # Down-Right (Diagonal)
            8: [-1.7, -1.7]  # Down-Left (Diagonal)
        }


    def reward_system(self, robot, humans, action):
        collision_penalty = -20
        success_reward = 500
        discomfort_penalty = -10

        #Max steps reached
        if self.max_steps:
            self.max_steps -= 1
        else :
            self.max_steps = 100
            self.dones[0] = True
            return 0, True, "Limit"

        # # Collision detection between robot and humans
        # collision = False
        # for human in humans:
        #     if self.check_collision(robot, human, action):
        #         collision = True
        #         break

        # Collision detection between robot and humans - 2
        dmin = float('inf')
        gdmin = 10
        collision = False
        for human in humans:
            closest_dist = self.collision(robot, human, action)
            if closest_dist < 0:
                collision = True
            elif closest_dist < dmin:
                dmin = closest_dist

        # Check for goal reaching
        reached, distance, reward = self.is_goal_reached(robot, action)
        if reached:
            self.dones[0] = True
            # return (success_reward/(100 - self.max_steps))**2, True, "Reached Goal"
            return success_reward, True, "Reached Goal"
        # Check for collisions
        if collision:
            self.dones[0] = True
            return collision_penalty, True, "Collision"

        if dmin < 0.2:
            return -10, False, "Risk for Collision"

        # if distance < gdmin:
        #     reward = (10 - distance)/10
        #     gdmin = distance
        # else:

        # x,y = self.robot.current_pos
        # x1,y1 = self.robot.goal_pos
        # reward = -np.sqrt((x-x1)**2+(y-y1)**2)
        reward = -1*(distance)
        return reward, False, "Nothing"

    def check_collision(self, robot, human, action):
        next_robot_position = np.array(robot.current_pos) + np.array(action) * self.time_step
        next_human_position = np.array(self.sim.getAgentPosition(human)) + np.array(self.sim.getAgentVelocity(human)) * self.time_step
        distance = np.linalg.norm(next_robot_position - next_human_position)
        return distance < (0.4 + 0.4)

    def collision(self, robot, human, action):
        hpx, hpy = self.sim.getAgentPosition(human)
        rpx, rpy = robot.current_pos
        px = hpx - rpx
        py = hpy - rpy

        hvx, hvy = self.sim.getAgentVelocity(human)
        rvx, rvy = action
        vx = hvx - rvx
        vy = hvy - rvy

        ex = px + vx * self.time_step
        ey = py + vy * self.time_step
        # closest distance between boundaries of two agents
        closest_dist = point_to_segment_dist(px, py, ex, ey, 0, 0) - 0.8
        return closest_dist


    def is_goal_reached(self, robot, action):
        next_position = np.array(robot.current_pos) + np.array(action) * self.time_step
        distance = np.linalg.norm(next_position - np.array(self.robot.goal_pos))
        reward = np.linalg.norm(np.array(robot.current_pos) - np.array(self.robot.goal_pos))
        return distance < 0.75, distance, reward

    def obtain_joinSate(self):
        # robot_state []
        robot_pos = list(self.robot.current_pos)
        robot_goal = list(self.robot.goal_pos)
        robot_vpref = (robot_goal[0] - robot_pos[0], robot_goal[1] - robot_pos[1])
        rvx, rvy = self.robot.velocity
        # robot_state = [robot_pos[0], robot_pos[1], rvx, rvy, 0.4, robot_goal[0], robot_goal[1], robot_vpref[0], robot_vpref[1]]
        robot_state = [robot_pos[0], robot_pos[1], rvx, rvy, robot_goal[0], robot_goal[1]]
        # robot_state = torch.tensor(robot_state)

        # human_state []
        human_states = []
        for human_idx in self.agents:
            human_posX, human_posY = self.sim.getAgentPosition(human_idx)
            human_goal = self.goal_pos[human_idx]
            human_vpref = list(self.sim.getAgentPrefVelocity(human_idx))
            hvx, hvy = human_vpref
            # human_state = [human_posX, human_posY, hvx, hvy, 0.4, human_goal[0], human_goal[1], human_vpref[0], human_vpref[1]]
            human_state = [human_posX, human_posY, hvx, hvy,human_goal[0], human_goal[1]]
            human_states.append(human_state)

        human_states.append(robot_state)
        join_state = torch.cat([torch.tensor(state) for state in human_states])
        return join_state.float()

    def reset(self):
        self.humans_pos = np.random.randint(1, 11, size=(self.humans_size, 2))
        self.goal_pos = np.random.randint(1, 11, size=(self.humans_size, 2))
        self.humans = []
        self.sim = rvo2.PyRVOSimulator(1/15, 1.5, 10, 1.5, 2, 0.4, 2)
        self.agents = [self.sim.addAgent(tuple(pos)) for pos in self.humans_pos]

        self.robot = Robot()
        self.dones = np.array([False]*(self.humans_size+1))

        #action space
        self.max_steps = 100
        state = self.obtain_joinSate()
        return state, {}

    def step(self, action):
        for agent, goal in zip(self.agents, self.goal_pos):
            px, py = self.sim.getAgentPosition(agent)
            velocity = (goal[0] - px, goal[1] - py)
            self.sim.setAgentPrefVelocity(agent, velocity)
            px, py = self.sim.getAgentPosition(agent)
            gx, gy = self.goal_pos[agent]
            if (gx-px) < 0.05 and (gy-py) < 0.05:
                self.dones[agent+1] = True

        self.sim.doStep()
        reward, done = 0, 0

        # print()
        # self.render()

        # print(self.reward_system(self.robot, self.agents, action))
        action = tuple(self.action_space_map[action.item()])
        reward, done, info = self.reward_system(self.robot, self.agents, action)
        self.robot.get_current_pos(action)
        state = self.obtain_joinSate()
        # done = np.all(self.dones)
        return state, reward, done, done, {}

    def render(self, mode='human'):
        self.ax.clear()
        self.ax.set_xlim(-1, 11)
        self.ax.set_ylim(-1, 11)
        for pos in [self.sim.getAgentPosition(agent) for agent in self.agents]:
            self.ax.plot(pos[0], pos[1], 'bo')
        # robot_posX, robot_posY = self.robot.initial_pos
        # self.ax.plot(robot_posX, robot_posY, 'ro')
        goal_posX, goal_posY = self.robot.goal_pos
        self.ax.plot(goal_posX, goal_posY, 'yo', marker="*",markersize=9)

        # d = self.sim.getAgentPosition(self.r)
        d = self.robot.current_pos
        self.ax.plot(d[0], d[1], 'ro')
        plt.pause(0.02)

# if __name__ == "__main__":
#     env = Environment_Sim(4)
#     env.reset()
#     done = False
#     i = 0
#     while not done:
#         state, reward, done, done, info = env.step(2)
#         i += 1
#         env.render()
#     print(i)
