In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gym  # Assuming you have a gym-like environment
from gym import spaces
import random
from collections import deque
import matplotlib.pyplot as plt
import io
import imageio.v2 as imageio
import torch.nn.functional as F
from matplotlib.patches import FancyArrow
from shapely.geometry import Point, LineString, box
import os
from datetime import datetime


In [None]:
class ContinuousRobotNavigationEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    def is_goal_behind_wall(self, goal_position):
        # Check if goal_position is behind any wall
        for wall in self.maze_walls:
            x, y, width, height = wall
            if goal_position[0] >= x and goal_position[0] <= x + width \
                    and goal_position[1] >= y and goal_position[1] <= y + height:
                return True
        return False
    def __init__(self):
        super(ContinuousRobotNavigationEnv, self).__init__()
        self.grid_size = 10
        self.observation_space = spaces.Box(low=0, high=self.grid_size, shape=(2,), dtype=np.float32)
        self.action_space = spaces.Box(low=-1, high=1, shape=(2,), dtype=np.float32)  # Continuous action space

        self.velocity_scale = 0.3
        self.lidar_range = 3.0

        self.num_humans = 5  # Number of humans
        self.human_positions = [self._random_position() for _ in range(self.num_humans)]
        self.human_velocities = [self._random_velocity() for _ in range(self.num_humans)]
        self.predicted_human_positions = [[] for _ in range(self.num_humans)]
        
        # self.num_obstacles = 3  # Number of static obstacles
        # self.obstacle_positions = [self._random_position() for _ in range(self.num_obstacles)]

        self.maze_walls = [
            (5, 8, 0.3, 1.5),   # Example wall 1
            (0, 7, 1.5, 0.3),   # Example wall 2
            (6.5, 6.5, 1.5, 0.3),   # Example wall 3
            (4, 4.5, 0.3, 1.5),   # Example wall 4
            (1.5, 2, 0.3, 1.5), #5th wall
            (5, 2.5, 1.5, 0.3),  #6th wall
            (8, 4, 1.5, 0.3),    #7th wall
            (7, 0, 0.3, 1.5)   #8th wall    
        ]
        # self.maze_walls = [
        #     (0, 9.5, 5, 0.5),    # Top horizontal wall
        #     (5.5, 9.5, 4.5, 0.5), # Top-right horizontal wall
        #     (9.5, 6, 0.5, 3.5),   # Right vertical wall
        #     (0, 0, 0.5, 5),       # Left vertical wall
        #     (0, 5.5, 0.5, 4.5),   # Left vertical wall 2
        #     (3, 0, 7, 0.5),       # Bottom horizontal wall
        #     (2, 4, 0.5, 5),       # Vertical wall in center-left
        #     (3, 2, 4, 0.5),       # Horizontal wall in center-bottom
        #     (6, 3, 0.5, 3),       # Vertical wall in center-right
        #     (6, 6, 4, 0.5)        # Horizontal wall in center-right
        # ]

        
        
        self.goal_position = (0, 0)

        self.frames = []  # Initialize the frames list

        

        self.reset()

    def _random_position(self):
        return [np.random.uniform(0, self.grid_size - 1), np.random.uniform(0, self.grid_size - 1)]

    def _random_velocity(self):
        # return [np.random.uniform(-1, 1), np.random.uniform(-1, 1)]
        return [np.random.uniform(-1, 1) * self.velocity_scale, np.random.uniform(-1, 1) * self.velocity_scale]

    def reset(self):
        self.robot_position = [0, 0]
        self.robot_orientation = np.pi/4  #looking diagonally up
        self.human_positions = [self._random_position() for _ in range(self.num_humans)]
        self.human_velocities = [self._random_velocity() for _ in range(self.num_humans)]
        self.predicted_human_positions = [[] for _ in range(self.num_humans)]
        # self.obstacle_positions = [self._random_position() for _ in range(self.num_obstacles)]
        self.state = np.array(self.robot_position, dtype=np.float32)
        # self.goal_position = self._random_position()
        while True:
            self.goal_position = self._random_position()
            if not self.is_goal_behind_wall(self.goal_position):
                break
        self.frames = []  # Reset frames list at the start of each episode
        
        return self.state

    def check_wall_collision(self, position):
        for wall in self.maze_walls:
            x, y, width, height = wall
            if position[0] >= (x-0.1) and position[0] <= x + width and position[1] >= (y-0.1) and position[1] <= y + height:
                return True
        return False
        
    def predict_human_positions(self):
        self.predicted_human_positions = [[] for _ in range(self.num_humans)]  # Reset predicted positions
        num_predictions = 5  # Number of future positions to predict
        
        for i in range(self.num_humans):
            future_positions = []
            future_position = np.array(self.human_positions[i])
            for _ in range(num_predictions):
                future_position += np.array(self.human_velocities[i])
                future_position[0] = np.clip(future_position[0], 0, self.grid_size - 1)
                future_position[1] = np.clip(future_position[1], 0, self.grid_size - 1)
                future_positions.append(future_position.copy())
        
            # Update current position
            self.human_positions[i][0] += self.human_velocities[i][0]
            self.human_positions[i][1] += self.human_velocities[i][1]
        
            # Check for boundary collisions and reverse velocity if necessary
            if self.human_positions[i][0] < 0 or self.human_positions[i][0] >= self.grid_size:
                self.human_velocities[i][0] *= -1
            if self.human_positions[i][1] < 0 or self.human_positions[i][1] >= self.grid_size:
                self.human_velocities[i][1] *= -1

            if self.check_wall_collision(self.human_positions[i]):
                self.human_velocities[i][0] *= -1
                self.human_velocities[i][1] *= -1
        
            # Update future positions if within lidar range
            if np.linalg.norm(np.array(self.robot_position) - np.array(self.human_positions[i])) <= self.lidar_range:
                self.predicted_human_positions[i] = future_positions
        


    def check_collision(self, position):
        # Check collision with humans
        for human_pos in self.human_positions:
            if np.linalg.norm(np.array(self.robot_position) - np.array(human_pos)) < 1.0:  # Collision threshold
                return True
        
        # Check collision with obstacles
        # for obstacle_pos in self.obstacle_positions:
        #     if np.linalg.norm(np.array(self.robot_position) - np.array(obstacle_pos)) < 1.0:  # Collision threshold
        #         return True

        for wall in self.maze_walls:
            x, y, width, height = wall
            if position[0] >= (x-0.3) and position[0] <= x + width and position[1] >= (y-0.3) and position[1] <= y + height:
                return True
        
        return False

        
    def step(self, action):
        self.state = np.array(self.robot_position, dtype=np.float32)
        # Apply scaling to the action to reduce velocity
        scaled_action = np.array(action) * self.velocity_scale
    
        # Calculate the new robot position
        new_robot_position = [
            self.robot_position[0] + scaled_action[0],
            self.robot_position[1] + scaled_action[1]
        ]
    
        # Clip the new position to stay within the environment bounds
        new_robot_position = [
            np.clip(new_robot_position[0], 0, self.grid_size - 1),
            np.clip(new_robot_position[1], 0, self.grid_size - 1)
        ]
    
        # Check if the new position collides with maze walls
        if not self.check_collision(new_robot_position):
            self.robot_position = new_robot_position
        else:
            done = True
            return self.state, -10, done, {}
    
        if np.linalg.norm(scaled_action) > 0:
            self.robot_orientation = np.arctan2(scaled_action[1], scaled_action[0])
    
        self.predict_human_positions()
        
        goal_threshold = 0.7
        distance_to_goal = np.linalg.norm(np.array(self.robot_position) - np.array(self.goal_position))

        reward = -distance_to_goal

        done = False

        if distance_to_goal < 0.7: 
            reward += 100  
            done = True
        else:
            collision = self.check_collision(self.robot_position)
            if collision:
                reward -= 10  
                done = True
            else:
                done = False
    
        # if done:
        #     self.reset()  
        
        self.state = np.array(self.robot_position, dtype=np.float32)
        # print(self.state)
        info = {}  # Add any additional info you want to pass
    
        return self.state, reward, done, info
    def render(self, mode='human'):
        plt.figure(figsize=(6, 6))
        ax = plt.gca()
        ax.set_xlim(0, self.grid_size)
        ax.set_ylim(0, self.grid_size)
        ax.set_aspect('equal')
    
        # Draw the goal position
        goal = plt.Circle(self.goal_position, 0.5, color='green')
        ax.add_artist(goal)
    
        # Draw the robot
        robot = plt.Circle(self.robot_position, 0.5, color='red')
        ax.add_artist(robot)
    
        # Draw the humans
        for human_pos in self.human_positions:
            human = plt.Circle(human_pos, 0.5, color='blue')
            ax.add_artist(human)

        
        #static obstacles
        # for obstacle_pos in self.obstacle_positions:
        #     obstacle = plt.Circle(obstacle_pos, 0.5, color='black')
        #     ax.add_artist(obstacle)

        #walls
        for wall in self.maze_walls:
            x, y, width, height = wall
            rect = plt.Rectangle((x, y), width, height, color='black')
            ax.add_patch(rect)
    
        
        #Orientation
        arrow_length = 0.7
        dx = arrow_length * np.cos(self.robot_orientation)
        dy = arrow_length * np.sin(self.robot_orientation)
        arrow = FancyArrow(self.robot_position[0], self.robot_position[1], dx, dy, color='black', width=0.1)
        ax.add_patch(arrow)

        #Lidar range
        # lidar_circle = plt.Circle(self.robot_position, self.lidar_range, color='gray', alpha=0.2)
        # ax.add_artist(lidar_circle)

        self.render_lidar(ax)
        # Plot observation range, ensuring it doesn't pass through walls
    
        # def intersects_wall(start, end):
        #     line = LineString([start, end])
        #     for wall in self.maze_walls:
        #         wall_box = box(wall[0], wall[1], wall[0] + wall[2], wall[1] + wall[3])
        #         if line.intersects(wall_box):
        #             return True
        #     return False
    
        # lidar_R = self.lidar_range
        # angles = np.linspace(0, 2 * np.pi, 360)
        # x0, y0 = self.robot_position
    
        # for angle in angles:
        #     x1 = x0 + lidar_R * np.cos(angle)
        #     y1 = y0 + lidar_R * np.sin(angle)
        #     if not intersects_wall((x0, y0), (x1, y1)):
        #         line = plt.Line2D((x0, x1), (y0, y1), color='gray', alpha=0.2)
        #         ax.add_line(line)


        

        #Trajectory prediction
        for predicted_positions in self.predicted_human_positions:
            for future_pos in predicted_positions:
                future_circle = plt.Circle(future_pos, 0.2, color='pink', alpha=0.3)
                ax.add_artist(future_circle)

        
    
        plt.axis('off')
    
        # Save frame to list
        buf = io.BytesIO()
        plt.savefig(buf, format='png')
        buf.seek(0)
        frame = imageio.imread(buf)
        self.frames.append(frame)
        buf.close()

        # Show the plot in real-time
        # plt.imshow(frame)
        # plt.axis('off')
        # plt.pause(0.1)
        # plt.clf()

    def render_lidar(self, ax):
        lidar_segments = 36
        angle_increment = 2 * np.pi / lidar_segments
        for i in range(lidar_segments):
            angle = i * angle_increment
            end_point = self.robot_position + np.array([np.cos(angle), np.sin(angle)]) * self.lidar_range
            ray_end_point = self.get_ray_end_point(self.robot_position, end_point)
            ax.plot([self.robot_position[0], ray_end_point[0]], [self.robot_position[1], ray_end_point[1]], color='dimgray', alpha=0.2)

    def get_ray_end_point(self, start, end):
        for wall in self.maze_walls:
            x, y, width, height = wall
            if self.line_intersects_rect(start, end, x, y, width, height):
                return self.get_intersection_point(start, end, x, y, width, height)
        return end

    def line_intersects_rect(self, start, end, x, y, width, height):
        def ccw(A, B, C):
            return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])

        A = start
        B = end
        C1 = [x, y]
        C2 = [x + width, y]
        C3 = [x, y + height]
        C4 = [x + width, y + height]
        return (ccw(A, C1, C2) != ccw(B, C1, C2) and ccw(A, B, C1) != ccw(A, B, C2)) or \
               (ccw(A, C3, C4) != ccw(B, C3, C4) and ccw(A, B, C3) != ccw(A, B, C4)) or \
               (ccw(A, C1, C3) != ccw(B, C1, C3) and ccw(A, B, C1) != ccw(A, B, C3)) or \
               (ccw(A, C2, C4) != ccw(B, C2, C4) and ccw(A, B, C2) != ccw(A, B, C4))

    def get_intersection_point(self, start, end, x, y, width, height):
        def line(p1, p2):
            A = (p1[1] - p2[1])
            B = (p2[0] - p1[0])
            C = (p1[0] * p2[1] - p2[0] * p1[1])
            return A, B, -C

        def intersection(L1, L2):
            D = L1[0] * L2[1] - L1[1] * L2[0]
            Dx = L1[2] * L2[1] - L1[1] * L2[2]
            Dy = L1[0] * L2[2] - L1[2] * L2[0]
            if D != 0:
                return Dx / D, Dy / D
            else:
                return False

        walls = [
            ([x, y], [x + width, y]),
            ([x, y], [x, y + height]),
            ([x + width, y], [x + width, y + height]),
            ([x, y + height], [x + width, y + height])
        ]
        closest_point = end
        min_distance = np.linalg.norm(np.array(start) - np.array(end))
        for wall in walls:
            L1 = line(start, end)
            L2 = line(wall[0], wall[1])
            R = intersection(L1, L2)
            if R:
                dist = np.linalg.norm(np.array(start) - np.array(R))
                if dist < min_distance:
                    min_distance = dist
                    closest_point = R
        return closest_point
    

In [None]:
class Attention(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Attention, self).__init__()
        self.query_layer = nn.Linear(input_dim, hidden_dim)
        self.key_layer = nn.Linear(input_dim, hidden_dim)
        self.value_layer = nn.Linear(input_dim, hidden_dim)
        self.output_layer = nn.Linear(hidden_dim, input_dim)  # Project back to the original dimension
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, x):
        queries = self.query_layer(x)
        keys = self.key_layer(x)
        values = self.value_layer(x)
        
        scores = torch.matmul(queries, keys.transpose(-2, -1)) / np.sqrt(queries.size(-1))
        attention_weights = self.softmax(scores)
        
        attended_values = torch.matmul(attention_weights, values)
        attended_values = self.output_layer(attended_values)  # Project back to the original dimension
        return attended_values, attention_weights

In [None]:
class Actor(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=128):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x))  # Output action is in range [-1, 1]
        return x

class Critic(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=128):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_size + action_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, 1)
    
    def forward(self, state, action):
        x = torch.cat((state, action), dim=-1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:


class DDPGAgent:
    def __init__(self, state_size, action_size, buffer_size, batch_size, lr_actor=1e-3, lr_critic=1e-3, gamma=0.99, tau=1e-3):
        self.actor = Actor(state_size, action_size)
        self.actor_target = Actor(state_size, action_size)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr_actor)
        
        self.critic = Critic(state_size, action_size)
        self.critic_target = Critic(state_size, action_size)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr_critic)
        
        self.replay_buffer = ReplayBuffer(buffer_size, batch_size, state_size, action_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau
        
        # Initialize target networks with same weights as original networks
        self._update_targets(self.actor_target, self.actor)
        self._update_targets(self.critic_target, self.critic)
    
    def _update_targets(self, target_model, model):
        for target_param, param in zip(target_model.parameters(), model.parameters()):
            target_param.data.copy_(param.data)
            
    def act(self, state):
        state = torch.FloatTensor(state)
        return self.actor(state).detach().numpy()
        
    #Test
    def remember(self, state, action, reward, next_state, done):
        self.replay_buffer.add(state, action, reward, next_state, done)
        
    #Learn
    def learn(self):
        if len(self.replay_buffer) < self.batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1)
        
        # Critic update
        Q_targets_next = self.critic_target(next_states, self.actor_target(next_states))
        Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))
        Q_expected = self.critic(states, actions)
        critic_loss = F.mse_loss(Q_expected, Q_targets.detach())
        
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # Actor update
        actor_loss = -self.critic(states, self.actor(states)).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # Update target networks
        self._soft_update_target(self.actor_target, self.actor, self.tau)
        self._soft_update_target(self.critic_target, self.critic, self.tau)
    
    def _soft_update_target(self, target_model, model, tau):
        for target_param, param in zip(target_model.parameters(), model.parameters()):
            target_param.data.copy_(tau * param.data + (1.0 - tau) * target_param.data)


In [None]:


class ReplayBuffer:
    def __init__(self, max_size, batch_size, state_size, action_size):
        self.max_size = max_size
        self.batch_size = batch_size
        self.state_size = state_size
        self.action_size = action_size
        self.buffer = []
        self.idx = 0
        
    def add(self, state, action, reward, next_state, done):
        experience = (state, action, reward, next_state, done)
        if len(self.buffer) < self.max_size:
            self.buffer.append(experience)
        else:
            self.buffer[self.idx] = experience
        self.idx = (self.idx + 1) % self.max_size
        
    def sample(self, batch_size):
        batch = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*[self.buffer[i] for i in batch])
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)

In [None]:
def save_checkpoint(actor, critic, actor_target, critic_target, actor_optimizer, critic_optimizer, filepath):
    checkpoint = {
        'actor_state_dict': actor.state_dict(),
        'critic_state_dict': critic.state_dict(),
        'actor_target_state_dict': actor_target.state_dict(),
        'critic_target_state_dict': critic_target.state_dict(),
        'actor_optimizer_state_dict': actor_optimizer.state_dict(),
        'critic_optimizer_state_dict': critic_optimizer.state_dict()
    }
    torch.save(checkpoint, filepath)
    print(f"Checkpoint saved to {filepath}")

def load_checkpoint(filepath, actor, critic, actor_target, critic_target, actor_optimizer, critic_optimizer):
    checkpoint = torch.load(filepath)
    actor.load_state_dict(checkpoint['actor_state_dict'])
    critic.load_state_dict(checkpoint['critic_state_dict'])
    actor_target.load_state_dict(checkpoint['actor_target_state_dict'])
    critic_target.load_state_dict(checkpoint['critic_target_state_dict'])
    actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
    critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])
    print(f"Checkpoint loaded from {filepath}")
    

In [None]:
def main():
    env = ContinuousRobotNavigationEnv()
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    buffer_size = 10000
    batch_size = 64
    lr_actor = 1e-3
    lr_critic = 1e-3
    gamma = 0.99
    tau = 1e-3

    ddpg_agent = DDPGAgent(state_dim, action_dim, buffer_size, batch_size, lr_actor, lr_critic, gamma, tau)
    replay_buffer = ReplayBuffer(max_size=buffer_size, batch_size=batch_size, state_size=state_dim, action_size=action_dim)

    checkpoint_dir = 'checkpoints'
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    # Define checkpoint filename with timestamp
    checkpoint_filename = f"ddpg_checkpoint_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pth"
    checkpoint_filepath = os.path.join(checkpoint_dir, checkpoint_filename)

    # if os.path.exists(checkpoint_filepath):
    #     load_checkpoint(checkpoint_filepath, ddpg_agent.actor, ddpg_agent.critic, ddpg_agent.actor_target,
    #                     ddpg_agent.critic_target, ddpg_agent.actor_optimizer, ddpg_agent.critic_optimizer)
    
    checkpoint_files = sorted([f for f in os.listdir(checkpoint_dir) if f.endswith('.pth')], reverse=True)
    latest_checkpoint = os.path.join(checkpoint_dir, checkpoint_files[0]) if checkpoint_files else None

    if latest_checkpoint:
        print(f"Loading checkpoint {latest_checkpoint}")
        load_checkpoint(latest_checkpoint, ddpg_agent.actor, ddpg_agent.critic, ddpg_agent.actor_target,
                        ddpg_agent.critic_target, ddpg_agent.actor_optimizer, ddpg_agent.critic_optimizer)

    # Training loop
    num_episodes = 150
    episode_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        env.frames = []  # Clear frames for the new episode
        for t in range(100):
            action = ddpg_agent.act(state)
            next_state, reward, done, _ = env.step(action)
            ddpg_agent.remember(state, action, reward, next_state, done)  # Remember the experience
            state = next_state
            episode_reward += reward
            env.render()  
            if done:
                break

        if episode % 50 == 0:  # Save checkpoint every 100 episodes
            save_checkpoint(ddpg_agent.actor, ddpg_agent.critic, ddpg_agent.actor_target, 
                            ddpg_agent.critic_target, ddpg_agent.actor_optimizer, ddpg_agent.critic_optimizer, checkpoint_filepath)
            print(f'Saved checkpoint at episode {episode}')
        
        # Learn from the experiences in the replay buffer
        ddpg_agent.learn()

        episode_rewards.append(episode_reward)
        print(f"Episode {episode}, Reward: {episode_reward}")

        # Save GIF
        imageio.mimsave(f'episode_{episode}.gif', env.frames, fps=10)

    plt.plot(range(num_episodes), episode_rewards)
    plt.xlabel('Episode')
    plt.ylabel('Cumulative Reward')
    plt.title('Cumulative Reward over Episodes')
    plt.show()

if __name__ == "__main__":
    main()