In [7]:
pip install gymnasium


[33mDEPRECATION: Loading egg at /Users/apple/ilgc/lib/python3.11/site-packages/pyrvo2-0.0.0-py3.11-macosx-11.1-arm64.egg is deprecated. pip 23.3 will enforce this behaviour change. A possible replacement is to use pip for package installation..[0m[33m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
#custom files
import gridworld_converter
import robo_nav_algo as robo
import free_space_finder
import utils
import obstacle_generator as obs
import baseline_robo_nav as baseline



#standard files
%matplotlib notebook
import cv2
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.cm as cm
import os
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env


# Preparation of the grid world

In [3]:
# Load the Image
image = cv2.imread('example6.png', cv2.IMREAD_COLOR)

utils.display_image('Original Image', cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

<IPython.core.display.Javascript object>

In [4]:
# 2d matrix representation of the above blueprint
# value of scale can be used to control how much you want to zoom into the grid world
gridworld = gridworld_converter.grid_convert('example6.png',scale = 3)
utils.display_image('Cleaned Gridworld Matrix', gridworld, cmap='gray', axis_switch = 'on')

print(f"Number of rows: {gridworld.shape[0]}\nNumber of columns: {gridworld.shape[1]}")

<IPython.core.display.Javascript object>

Number of rows: 528
Number of columns: 590


# Define Parameters

In [6]:
number_of_obstacles = 2
episode_length = 500
total_timesteps = 100000  # Total training steps

# Dynamic Obstacle Generator

In [8]:

# Initialize the pathfinding class
pathfinder = obs.Pathfinding(gridworld)

# Generate paths for obstacles
obstacle_paths = pathfinder.generate_paths(number_of_obstacles, episode_length)

#contains a dictionary of occupied coordinate as keys at different time step as value
'only one object can occupy a coordinate at a particular time stamp'
occupied_time_steps = pathfinder.occupied_time_steps

#contains a list of start or end coordinates of each dynamic obstacle
obstacle_occupied_points = pathfinder.obstacle_occupied_points
'''
Help:
You can see the obstacle path by obstacle_paths['1'] or obstacle_paths['2']
'''

"\nHelp:\nYou can see the obstacle path by obstacle_paths['1'] or obstacle_paths['2']\n"


# EXTRA CODE:
visualisation code for only obstacles path


In [7]:


# Function to generate distinct colors for the obstacles
def generate_colors(num_colors):
    colormap = cm.get_cmap('hsv', num_colors + 10)
    return [colormap(i+1) for i in range(num_colors)]

# Create a figure for the live plot
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(gridworld, cmap='gray')

# Initialize scatter plots for the objects' current positions
scatters = []
colors = generate_colors(number_of_obstacles)  # Different colors for different obstacles

for idx, key in enumerate(obstacle_paths.keys()):
    scatter = ax.scatter([], [], color=colors[idx % len(colors)], s=50, label=f'Obstacle {key}')
    scatters.append(scatter)

# Initialize the scatter points for start and end
for key, path in obstacle_paths.items():
    start_point = path[0]
    end_point = path[-1]
    ax.scatter([start_point[1]], [start_point[0]], c='green', marker='x')
    ax.scatter([end_point[1]], [end_point[0]], c='red', marker='x')

ax.legend()

# Initialize a text annotation for the step counter
step_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, fontsize=12, verticalalignment='top',color='white')

# Function to update the scatter plots and step counter for each frame in the animation
def update_scatters(num, obstacle_paths, scatters):
    for idx, key in enumerate(obstacle_paths.keys()):
        path = obstacle_paths[key]
        if num < len(path):
            scatters[idx].set_offsets([path[num][1], path[num][0]])
    step_text.set_text(f'Step: {num + 1}')
    return scatters + [step_text]

# Calculate the maximum path length to set the number of frames in the animation
max_path_length = max(len(path) for path in obstacle_paths.values())

# Create the animation
ani = animation.FuncAnimation(fig, update_scatters, frames=episode_length, fargs=[obstacle_paths, scatters], interval=1, blit=True)

# Show the live plot
plt.title('Live Path Animation')
plt.show()


<IPython.core.display.Javascript object>

  colormap = cm.get_cmap('hsv', num_colors + 10)


In [10]:
import os
import wandb
import obstacle_generator as obs
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback

import heapq
import numpy as np
from sklearn.cluster import DBSCAN
import matplotlib.pyplot as plt
from IPython.display import display, clear_output

class DGSN_Env(gym.Env):
    def __init__(self, gridworld, obstacle_paths, occupied_time_steps, obstacle_occupied_points, number_of_obstacles, episode_length):
        super(DGSN_Env, self).__init__()
        print("Environment initialization begins!")
        self.gridworld = gridworld
        self.grid_size = gridworld.shape[0]

        self.obstacle_paths = obstacle_paths
        self.occupied_time_steps = occupied_time_steps
        self.obstacle_occupied_points = obstacle_occupied_points
        self.episode_length = episode_length

        self.boundary_threshold = 0.5
        self.max_dynamic_obstacles = number_of_obstacles

        self.num_static_obstacles = 0
        self.number_of_obstacles = number_of_obstacles
        self.num_dynamic_obstacles = number_of_obstacles

        self.robot_position = None
        self.goal_position = None

        self.global_obstacle_position = []

        self.close_call, self.discomfort, self.current_step, self.ep_no = 0, 0, 1, 0
        self.dist_factor = 4
        self.thresh = 1.5

        self.max_steps = episode_length
        self.total_reward = 0

        self.robot_path = []

        self.action_space = spaces.Discrete(9)  # 9 possible actions including no movement
        self.observation_space = spaces.Dict({
            'agent': spaces.Box(low=0, high=max(gridworld.shape), shape=(2,), dtype=np.int32),
            'dyn_obs': spaces.Box(low=0, high=max(gridworld.shape), shape=(number_of_obstacles, 2), dtype=np.int32),
            'sta_obs': spaces.Box(low=0, high=self.grid_size, shape=(self.max_dynamic_obstacles, 2), dtype=np.float64)
        })

        self._initialize_robot_and_goal()
        self.dynamic_obstacles = self._init_dynamic_obstacles(obstacle_paths)
        self.static_obstacles = self._init_static_obstacles()

        wandb.login()
        wandb.init(project='DGSN_Finale')

        self.safe = True

        # Evaluation Metrics
        self.Avg_Success_Rate = 0
        self.Avg_Collision_Rate, self.ep_collision_rate, self.collision_count = 0, 0, 0
        self.Avg_Min_Time_To_Collision, self.min_time_to_collision = 0, 0
        self.Avg_Wall_Collision_Rate, self.ep_wall_collision_rate, self.wall_collision_count = 0, 0, 0
        self.Avg_Obstacle_Collision_Rate, self.ep_obstacle_collision_rate, self.obstacle_collision_count = 0, 0, 0
        self.Avg_Human_Collision_Rate, self.ep_human_collision_rate, self.human_collision_count = 0, 0, 0

        self.Avg_Timeout = 0
        self.Avg_Path_Length = 0
        self.Avg_Stalled_Time, self.stalled_time = 0, 0

        self.Avg_Group_Inhibition_Rate, self.ep_group_inhibition_rate, self.group_inhibition = 0, 0, 0
        self.Avg_Discomfort, self.ep_discomfort = 0, 0
        self.Avg_Human_Distance, self.ep_human_distance, self.human_distance = 0, 0, 0
        self.Avg_Closest_Human_Distance, self.closest_human_distance = 0, 0
        self.Min_Closest_Human_Distance = 0
        self.goal_reached = 0
        self.timeout = 0

    def _initialize_robot_and_goal(self):
        # Find a free space for the robot start point
        self.robot_position = self._find_free_space()
        self.agent_pos = self.robot_position
        # Find a free space for the goal point
        self.goal_position = self._find_free_space()
        self.goal_pos = self.goal_position

        self.robot_path.append(self.robot_position)

    def _find_free_space(self):
        free_spaces = np.argwhere(self.gridworld == 0)
        while True:
            candidate = tuple(free_spaces[np.random.choice(len(free_spaces))])
            if self._is_free_space(candidate):
                return candidate

    def _is_free_space(self, position):
        # Check if the position is occupied by any obstacle at the current time step
        if position in self.occupied_time_steps and self.occupied_time_steps[position] == self.current_step:
            return False
        # Check if the position is occupied by the robot or goal
        if position == self.robot_position or position == self.goal_position:
            return False
        return True

    def _init_static_obstacles(self):
        obstacles = []
        for _ in range(self.num_static_obstacles):
            obstacles.append((np.array(np.random.randint(1, self.grid_size, size=2))).astype('float'))
        return obstacles

    def _init_dynamic_obstacles(self, obstacle_paths):
        obstacles = []
        for i in range(self.number_of_obstacles):
            start_pos = obstacle_paths[str(i+1)][0]
            end_pos = obstacle_paths[str(i+1)][-1]
            path = obstacle_paths[str(i+1)]
            distance = np.linalg.norm(self.agent_pos - start_pos)
            obstacles.append({
                'start': start_pos,
                'end': end_pos,
                'current': start_pos.copy(),
                'angle': np.rad2deg(np.arctan2(self.agent_pos[1] - start_pos[1], self.agent_pos[0] - start_pos[0])).astype('float'),
                'distance': int(distance),
                'path': path if path else []
            })
        return obstacles

    def _get_obstacles_positions(self):
        obstacles_positions = {}
        for key, path in self.obstacle_paths.items():
            if self.current_step < len(path):
                obstacles_positions[key] = path[self.current_step]
        return obstacles_positions

    def _move(self, position, action):
        directions = [
            (0, 1), (1, 0), (0, -1), (-1, 0),  # Right, Down, Left, Up
            (-1, 1), (-1, -1), (1, 1), (1, -1), (0, 0)  # Top-Right, Top-Left, Bottom-Right, Bottom-Left, No movement
        ]
        direction = directions[action]
        new_position = (position[0] + direction[0], position[1] + direction[1])
        return new_position

    def step(self, action):
        previous_agent_pos = self.robot_position
        new_position = self._move(self.robot_position, action)
        collision = not self._is_valid_move(new_position)
        goal_reached = self._check_goal_reached(new_position)

        if not collision:
            self.robot_position = new_position
            self.agent_pos = new_position

            # Update obstacles positions for the current time step
            obstacles_positions = self._get_obstacles_positions()
            self.global_obstacle_position = obstacles_positions

            # Increment the current step
            self.current_step += 1
            self.robot_path.append(new_position)  # Store the new position in the path
        else:
            # Updating the environment with old values if there is collision
            new_position = previous_agent_pos
            obstacles_positions = self.global_obstacle_position

        obs = self._get_obs()
        truncated = self.current_step >= self.episode_length
        done = goal_reached or self.current_step >= self.episode_length

        print(f"Agent's Current Position {previous_agent_pos}")
        print(f"Goal Position : {self.goal_pos}")
        print(f"Agent's New Position {self.agent_pos}")
        print(f"Steps taken : {self.current_step}")

        reward = self._compute_reward(previous_agent_pos)
        print(f"Reward Obtained : {reward}")

        self.total_reward += reward
        wandb.log({"Reward": reward, "Total_Reward": self.total_reward})

        if self.current_step >= self.max_steps:
            done = True
            reward -= 2500
            self.timeout += 1
            wandb.log({"TimeOut": self.timeout})

        return obs, reward, done, truncated, {}

    def _compute_reward(self, previous_agent_pos):
        reward_c = 0

        if self.safe:
            self.min_time_to_collision += 1

        previous_dist_to_goal = np.linalg.norm(previous_agent_pos - self.goal_pos)
        current_dist_to_goal = np.linalg.norm(self.agent_pos - self.goal_pos)
        del_distance = current_dist_to_goal - previous_dist_to_goal

        if del_distance > 0:
            reward_c -= del_distance * self.dist_factor
        elif del_distance == 0:
            reward_c = -2
        else:
            reward_c += (-del_distance) * self.dist_factor * 2

        for i, obs in enumerate(self.dynamic_obstacles):
            distance = np.linalg.norm(obs['current'] - self.agent_pos)
            self.human_distance += distance

            if distance < 3 and distance != 0:
                self.closest_human_distance = min(self.closest_human_distance, distance)
                self.close_call += 1
                pen_1 = (10 / distance) * 2
                reward_c -= pen_1

        grouped_obstacles = self._group_dynamic_obstacles()
        for j, group in enumerate(grouped_obstacles):
            group_center = np.mean(group, axis=0)
            if np.linalg.norm(self.agent_pos - group_center) < self.thresh:
                reward_c -= 50
                self.group_inhibition += 1
                self.close_call += 1
                wandb.log({"Group_Inhibition": self.group_inhibition})

        if any(np.array_equal(self.agent_pos, obs['current']) for obs in self.dynamic_obstacles):
            self.collision_count += 1
            self.human_collision_count += 1
            self.safe = False
            reward_c -= 30

        if any(np.array_equal(self.agent_pos, obs) for obs in self.static_obstacles):
            self.collision_count += 1
            self.obstacle_collision_count += 1
            self.safe = False
            reward_c -= 20

        if np.any(self.agent_pos <= self.boundary_threshold) or np.any(self.agent_pos >= self.grid_size - self.boundary_threshold):
            reward_c -= 15

        if np.any(self.agent_pos == 0) or np.any(self.agent_pos >= self.grid_size):
            self.collision_count += 1
            self.wall_collision_count += 1
            self.safe = False
            reward_c -= 20

        reward_c -= 1

        if self._is_done():
            reward_c += 3000
            self.goal_reached += 1
            wandb.log({'Goal_Reached': self.goal_reached})

        return reward_c

    def _is_done(self):
        return np.array_equal(self.agent_pos, self.goal_pos)

    def _get_obs(self):
        agent_state = np.array([self.agent_pos[0], self.agent_pos[1]], dtype=np.float32)
        dynamic_obstacle_states = np.array([(np.rad2deg(np.arctan2(self.agent_pos[1] - ob['current'][1], self.agent_pos[0] - ob['current'][0])), np.linalg.norm(self.agent_pos - ob['current'])) for ob in self.dynamic_obstacles] +
                                           [np.zeros(2) for _ in range(self.num_dynamic_obstacles, self.max_dynamic_obstacles)], dtype=np.float32)
        static_obstacle_states = np.array([ob[:2] for ob in self.static_obstacles] +
                                          [np.zeros(2) for _ in range(self.num_static_obstacles, self.max_dynamic_obstacles)], dtype=np.float32)
        return {
            'agent': agent_state,
            'dyn_obs': dynamic_obstacle_states,
            'sta_obs': static_obstacle_states
        }

    def _group_dynamic_obstacles(self):
        positions = np.array([obs['current'] for obs in self.dynamic_obstacles])
        clustering = DBSCAN(eps=0.3, min_samples=2).fit(positions)
        labels = clustering.labels_

        grouped_obstacles = []
        for label in set(labels):
            if label != -1:
                group = positions[labels == label]
                grouped_obstacles.append(group)
        return grouped_obstacles

    def render(self, mode='human'):
        pass

    def reset(self, **kwargs):
        print("PRINTING & LOGGING!!!")

        wandb.log({"Episode": self.ep_no})
        self.ep_no += 1

        self.ep_human_distance = self.human_distance / (self.num_dynamic_obstacles * self.current_step)
        self.ep_discomfort = self.close_call / self.current_step
        self.ep_collision_rate = self.collision_count / self.current_step
        self.ep_wall_collision_rate = self.wall_collision_count / self.current_step
        self.ep_obstacle_collision_rate = self.obstacle_collision_count / self.current_step
        self.ep_human_collision_rate = self.human_collision_count / self.current_step
        self.ep_group_inhibition_rate = self.group_inhibition / self.current_step

        self.Avg_Collision_Rate = ((self.ep_no - 1) * self.Avg_Collision_Rate + self.ep_collision_rate) / self.ep_no
        self.Avg_Min_Time_To_Collision = ((self.ep_no - 1) * self.Avg_Min_Time_To_Collision + self.min_time_to_collision) / self.ep_no
        self.Avg_Wall_Collision_Rate = ((self.ep_no - 1) * self.Avg_Wall_Collision_Rate + self.ep_wall_collision_rate) / self.ep_no
        self.Avg_Obstacle_Collision_Rate = ((self.ep_no - 1) * self.Avg_Obstacle_Collision_Rate + self.ep_obstacle_collision_rate) / self.ep_no
        self.Avg_Human_Collision_Rate = ((self.ep_no - 1) * self.Avg_Human_Collision_Rate + self.ep_human_collision_rate) / self.ep_no
        self.Avg_Path_Length = ((self.ep_no - 1) * self.Avg_Path_Length + self.current_step) / self.ep_no
        self.Avg_Stalled_Time = ((self.ep_no - 1) * self.Avg_Stalled_Time + self.stalled_time) / self.ep_no
        self.Avg_Discomfort = ((self.ep_no - 1) * self.Avg_Discomfort + self.ep_discomfort) / self.ep_no
        self.Avg_Human_Distance = ((self.ep_no - 1) * self.Avg_Human_Distance + self.ep_human_distance) / self.ep_no
        self.Avg_Closest_Human_Distance = ((self.ep_no - 1) * self.Avg_Closest_Human_Distance + self.closest_human_distance) / self.ep_no
        self.Avg_Group_Inhibition_Rate = ((self.ep_no - 1) * self.Avg_Group_Inhibition_Rate + self.ep_group_inhibition_rate) / self.ep_no

        wandb.log({"Ep_Total_Reward": self.total_reward,
                   "Ep_Collision_Count": self.collision_count, "Ep_Min_Time_To_Collision": self.min_time_to_collision,
                   "Ep_Wall_Collision_Count": self.wall_collision_count, "Ep_Obstacle_Collision_Count": self.obstacle_collision_count,
                   "Ep_Human_Collision_Count": self.human_collision_count, "Ep_Collision_Rate": self.ep_collision_rate,
                   "Ep_Wall_Collision_Rate": self.ep_wall_collision_rate, "Ep_Obstacle_Collision_Rate": self.ep_obstacle_collision_rate,
                   "Ep_Human_Collision_Rate": self.ep_human_collision_rate, "Ep_Path_Length": self.current_step,
                   "Ep_Stalled_Time": self.stalled_time, "Ep_Discomfort": self.ep_discomfort,
                   "Ep_Avg_Human_Distance": self.ep_human_distance, "Ep_Closest_Human_Distance": self.closest_human_distance,
                   "Ep_Close_Calls": self.close_call, "Ep_Group_Inhibition": self.ep_group_inhibition_rate,
                   "Avg_Collision_Rate": self.Avg_Collision_Rate, "Avg_Min_Time_To_Collision": self.Avg_Min_Time_To_Collision,
                   "Avg_Wall_Collision_Rate": self.Avg_Wall_Collision_Rate, "Avg_Obstacle_Collision_Rate": self.Avg_Obstacle_Collision_Rate,
                   "Avg_Human_Collision_Rate": self.Avg_Human_Collision_Rate, "Avg_Path_Length": self.Avg_Path_Length,
                   "Avg_Stalled_Time": self.Avg_Stalled_Time, "Avg_Discomfort": self.Avg_Discomfort,
                   "Avg_Human_Distance": self.Avg_Human_Distance, "Avg_Closest_Human_Distance": self.Avg_Closest_Human_Distance,
                   "Avg_Group_Inhibition_Rate": self.Avg_Group_Inhibition_Rate
                   })

        print(f"$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
        print(f"RUN DETAILS!!! \n")
        print(f"Ep_Total_Reward : {self.total_reward} \n")
        print(f"Ep_Collision_Count : {self.collision_count}")
        print(f"Ep_Wall_Collision_Count : {self.wall_collision_count}")
        print(f"Ep_Obstacle_Collision_Count : {self.obstacle_collision_count}")
        print(f"Ep_Human_Collision_Count : {self.human_collision_count}")
        print(f"Ep_Min_Time_To_Collision : {self.min_time_to_collision}")
        print(f"Ep_Stalled_Time : {self.stalled_time}")
        print(f"Ep_Avg_Human_Distance : {self.Avg_Human_Distance}")
        print(f"$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
        print("Creating the new Episode")

        if 'seed' in kwargs:
            np.random.seed(kwargs['seed'])

        # Initialize the pathfinding class
        pathfinder = obs.Pathfinding(self.gridworld)

        # Generate new paths for obstacles again
        self.obstacle_paths = pathfinder.generate_paths(self.number_of_obstacles, self.episode_length)

        self.occupied_time_steps = pathfinder.occupied_time_steps
        self.obstacle_occupied_points = pathfinder.obstacle_occupied_points
        self.robot_position = None
        self.goal_position = None
        self.current_step = 0
        self.global_obstacle_position = []
        self.robot_path = []

        self._initialize_robot_and_goal()

        self.agent_pos = self.robot_position
        self.goal_pos = self.goal_position
        self.num_static_obstacles = 0
        self.num_dynamic_obstacles = len(obstacle_paths)

        self.dynamic_obstacles = self._init_dynamic_obstacles(obstacle_paths)
        self.static_obstacles = self._init_static_obstacles()
        self.safe = True
        self.done = False

        self.Avg_Success_Rate = 0
        self.ep_collision_rate, self.collision_count = 0, 0
        self.min_time_to_collision = 0
        self.ep_wall_collision_rate, self.wall_collision_count = 0, 0
        self.ep_obstacle_collision_rate, self.obstacle_collision_count = 0, 0
        self.ep_human_collision_rate, self.human_collision_count = 0, 0

        self.Avg_Timeout = 0
        self.Avg_Path_Length = 0
        self.stalled_time = 0

        self.ep_group_inhibition_rate, self.group_inhibition = 0, 0
        self.ep_discomfort = 0
        self.ep_human_distance, self.human_distance = 0, 0
        self.closest_human_distance = 0
        self.Min_Closest_Human_Distance = 0

        self.close_call, self.discomfort, self.current_step = 0, 0, 1
        self.total_reward = 0

        print("*******************************************************************\n")
        print("Initialized the environment with the following")
        print("Agent's Initial Position :", self.agent_pos)
        print("Goal Position :", self.goal_pos)
        print("Number of Static Obstacles :", self.num_static_obstacles)
        print("Static Obstacle Positions :", self.static_obstacles)
        print("Number of Dynamic Obstacles :", self.num_dynamic_obstacles)
        print("Dynamic Obstacle theta & dist :", self.dynamic_obstacles, "\n")
        print("*******************************************************************")

        return self._get_obs(), {}

class CustomCallback(BaseCallback):
    def __init__(self, env, render_freq=1, save_freq=100, save_path=None, verbose=0):
        super(CustomCallback, self).__init__(verbose)
        self.env = env
        self.render_freq = render_freq
        self.save_freq = save_freq
        self.save_path = save_path

    def _init_callback(self) -> None:
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.render_freq == 0:
            self.env.render()
        
        if self.n_calls % self.save_freq == 0:
            save_file = os.path.join(self.save_path, f"model_step_{self.n_calls}.zip")
            self.model.save(save_file)
            if self.verbose > 0:
                print(f"Model saved at step {self.n_calls} to {save_file}")
        
        return True


env = DGSN_Env(gridworld, obstacle_paths, occupied_time_steps, obstacle_occupied_points, number_of_obstacles, episode_length)
obs = env.reset()

log_path = os.path.join('Train', 'Logs')
save_path = os.path.join('Train', 'Saved_Models', 'PPO_1')

combined_callback = CustomCallback(env=env, render_freq=1, save_freq=100, save_path=save_path, verbose=1)
model = PPO("MultiInputPolicy", env, verbose=1, tensorboard_log=log_path, device='cuda' if torch.cuda.is_available() else 'cpu', n_epochs=1000)

model.learn(total_timesteps=10000000, callback=combined_callback)


Environment initialization begins!


KeyError: '0'

# ROBOT Training using Standard BaseLine Model 3 Library

In [6]:


# Create the RL environment
env = baseline.RLEnvironment(gridworld, obstacle_paths, occupied_time_steps, obstacle_occupied_points, number_of_obstacles, episode_length)

# Check the environment
check_env(env, warn=True)

# Ensure the log directory exists
log_path = "./ppo_logs/"
os.makedirs(log_path, exist_ok=True)

# Create the PPO model
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = 'cpu'
model = PPO("MultiInputPolicy", env, verbose=1, tensorboard_log=log_path, device=device, n_epochs=100)

# Train the model
model.learn(total_timesteps=total_timesteps)

# Save the model
model.save("ppo_robot_env")

# Load the model
model = PPO.load("ppo_robot_env")

# Enjoy trained agent
obs = env.reset()
for _ in range(1000):
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    
    # Print statements for debugging and tracking
    print(f"Step: {step}, Action: {action}")
    print(f"Robot Position: {env.robot_position}, Goal Position: {env.goal_position}")
    print(f"Collision: {info.get('collision', False)}, Goal Reached: {info.get('goal_reached', False)}")
    
    
    env.render()
    if dones:
        break

robotPath = env.robot_path  # Get the path taken by the robot





Step: 1, Action: 1, Position: (186, 433), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (356, 536), '2': (425, 470)}
Step: 1, Action: 5, Position: (345, 290), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (196, 448), '2': (384, 291)}
Step: 2, Action: 8, Position: (345, 290), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (195, 447), '2': (385, 290)}
Step: 3, Action: 7, Position: (346, 289), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (194, 446), '2': (386, 289)}
Step: 4, Action: 0, Position: (346, 290), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (193, 445), '2': (387, 288)}
Step: 5, Action: 4, Position: (345, 291), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (192, 444), '2': (388, 287)}
Step: 6, Action: 0, Position: (345, 292), Collision: False, Goal Reached: False
Obstacle Positions: {'1': (191, 443), '2': (389, 286)}
Step: 7, Action: 2, Position: (345, 291), Collision: Fa

IndexError: list index out of range

# Visualization Code

In [None]:
%matplotlib notebook

# Function to generate distinct colors for the obstacles
def generate_colors(num_colors):
    colormap = cm.get_cmap('hsv', num_colors + 10)
    return [colormap(i + 1) for i in range(num_colors)]

# Create a figure for the live plot
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(gridworld, cmap='gray')

# Initialize scatter plots for the objects' current positions
scatters = []
colors = generate_colors(number_of_obstacles)  # Different colors for different obstacles

for idx, key in enumerate(obstacle_paths.keys()):
    scatter = ax.scatter([], [], color=colors[idx % len(colors)], s=50, label=f'Obstacle {key}')
    scatters.append(scatter)

# Initialize the scatter points for the robot's start and goal positions
robot_start = robotPath[0]
robot_goal = env.goal_position
# ax.scatter([robot_start[1]], [robot_start[0]], c='green', marker='^', label='Robot Start')
ax.scatter([robot_goal[1]], [robot_goal[0]], c='red', marker='x', label='Robot Goal')

# Initialize the scatter points for start and end of obstacles
for key, path in obstacle_paths.items():
    start_point = path[0]
    end_point = path[-1]
#     ax.scatter([start_point[1]], [start_point[0]], c='green', marker='x')
#     ax.scatter([end_point[1]], [end_point[0]], c='red', marker='x')

# Initialize a scatter plot for the robot's current position
robot_scatter = ax.scatter([], [], color='white', s=50, marker='*', label='Robot')
scatters.append(robot_scatter)

ax.legend()

# Initialize a text annotation for the step counter
step_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, fontsize=12, verticalalignment='top', color='white')

# Function to update the scatter plots and step counter for each frame in the animation
def update_scatters(num, obstacle_paths, scatters, robot_path):
    for idx, key in enumerate(obstacle_paths.keys()):
        path = obstacle_paths[key]
        if num < len(path):
            scatters[idx].set_offsets([path[num][1], path[num][0]])
    
    if num < len(robot_path):
        scatters[-1].set_offsets([robot_path[num][1], robot_path[num][0]])
    
    step_text.set_text(f'Step: {num + 1}')
    return scatters + [step_text]

# Calculate the maximum path length to set the number of frames in the animation
max_path_length = max(len(path) for path in obstacle_paths.values())
max_path_length = max(max_path_length, len(robotPath))

# Create the animation
ani = animation.FuncAnimation(fig, update_scatters, frames=max_path_length, fargs=[obstacle_paths, scatters, robotPath], interval=1, blit=True)

# Show the live plot
plt.title('Live Path Animation')
plt.show()

# ROBOT Training using an algo from scratch

In [35]:

# Function to generate distinct colors for the obstacles
def generate_colors(num_colors):
    colormap = cm.get_cmap('hsv', num_colors + 10)
    return [colormap(i + 1) for i in range(num_colors)]


# Create the RL environment
env = robo.RLEnvironment(gridworld, obstacle_paths, occupied_time_steps, obstacle_occupied_points, number_of_obstacles, episode_length)

#storing the robot start position
robotPath = [env.robot_position]

# Example step through the environment
actions = [0, 1, 2, 3, 4, 5, 6, 7, 8]  # Possible actions
action = 3
episode = 2

for i in range(episode):
    print(f"=================####################episode number: {i}")
    if i > 0:
        
        
        print("resetting environment")
        env.reset()
        robotPath = [env.robot_position]
        action = 3

    for _ in range(episode_length):

        new_position, obstacles_positions, collision, goal_reached = env.step(action)

        #random alog movement
        if collision:
            action =  np.random.choice(actions)

        print(f"Step:{env.current_step}, Position:{new_position}, Collision:{collision}, Goal Reached:{goal_reached}, Current Step:{env.current_step}, Goal coordinate:{env.goal_position}")
        print(f"        Position of Obstacles: {obstacles_positions}")
        print()
        robotPath.append(new_position)
        
        '''
        Write your own custom RL algo and train it from scratch.
        
        '''

        if goal_reached:
            print("Goal reached!")
            break
    
[(12,3),(12,5)]

action taken: 3
-----------checking if valid move---(544, 1723)-------
Step:1, Position:(544, 1723), Collision:False, Goal Reached:False, Current Step:1, Goal coordinate:(160, 398)
        Position of Obstacles: {'1': (1279, 54), '2': (741, 1263)}

action taken: 3
-----------checking if valid move---(543, 1723)-------
Step:2, Position:(543, 1723), Collision:False, Goal Reached:False, Current Step:2, Goal coordinate:(160, 398)
        Position of Obstacles: {'1': (1278, 55), '2': (740, 1264)}

action taken: 3
-----------checking if valid move---(542, 1723)-------
Step:3, Position:(542, 1723), Collision:False, Goal Reached:False, Current Step:3, Goal coordinate:(160, 398)
        Position of Obstacles: {'1': (1277, 56), '2': (739, 1265)}

action taken: 3
-----------checking if valid move---(541, 1723)-------
Step:4, Position:(541, 1723), Collision:False, Goal Reached:False, Current Step:4, Goal coordinate:(160, 398)
        Position of Obstacles: {'1': (1276, 57), '2': (738, 1266)}

acti

# Code for live visualisation:

Visualization with the robot and dynamic obstacles.


In [36]:
%matplotlib notebook


# Assuming gridworld, obstacle_paths, number_of_obstacles, episode_length, and robotPath are already defined

# Function to generate distinct colors for the obstacles
def generate_colors(num_colors):
    colormap = cm.get_cmap('hsv', num_colors + 10)
    return [colormap(i + 1) for i in range(num_colors)]

# Create a figure for the live plot
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(gridworld, cmap='gray')

# Initialize scatter plots for the objects' current positions
scatters = []
colors = generate_colors(number_of_obstacles)  # Different colors for different obstacles

for idx, key in enumerate(obstacle_paths.keys()):
    scatter = ax.scatter([], [], color=colors[idx % len(colors)], s=50, label=f'Obstacle {key}')
    scatters.append(scatter)

# Initialize the scatter points for the robot's start and goal positions
robot_start = robotPath[0]
robot_goal = env.goal_position
# ax.scatter([robot_start[1]], [robot_start[0]], c='green', marker='^', label='Robot Start')
ax.scatter([robot_goal[1]], [robot_goal[0]], c='red', marker='x', label='Robot Goal')

# Initialize the scatter points for start and end of obstacles
for key, path in obstacle_paths.items():
    start_point = path[0]
    end_point = path[-1]
#     ax.scatter([start_point[1]], [start_point[0]], c='green', marker='x')
#     ax.scatter([end_point[1]], [end_point[0]], c='red', marker='x')

# Initialize a scatter plot for the robot's current position
robot_scatter = ax.scatter([], [], color='white', s=50, marker='*', label='Robot')
scatters.append(robot_scatter)

ax.legend()

# Initialize a text annotation for the step counter
step_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, fontsize=12, verticalalignment='top', color='white')

# Function to update the scatter plots and step counter for each frame in the animation
def update_scatters(num, obstacle_paths, scatters, robot_path):
    for idx, key in enumerate(obstacle_paths.keys()):
        path = obstacle_paths[key]
        if num < len(path):
            scatters[idx].set_offsets([path[num][1], path[num][0]])
    
    if num < len(robot_path):
        scatters[-1].set_offsets([robot_path[num][1], robot_path[num][0]])
    
    step_text.set_text(f'Step: {num + 1}')
    return scatters + [step_text]

# Calculate the maximum path length to set the number of frames in the animation
max_path_length = max(len(path) for path in obstacle_paths.values())
max_path_length = max(max_path_length, len(robotPath))

# Create the animation
ani = animation.FuncAnimation(fig, update_scatters, frames=max_path_length, fargs=[obstacle_paths, scatters, robotPath], interval=1, blit=True)

# Show the live plot
plt.title('Live Path Animation')
plt.show()

<IPython.core.display.Javascript object>

  colormap = cm.get_cmap('hsv', num_colors + 10)
