# Imports


In [1]:
#%pip install gym pygame tensorflow numpy matplotlib

In [None]:
# Env
import gym
from gym import spaces
import pygame

# DL
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers, models, optimizers

# Miscellaneous
from collections import deque
import numpy as np
import random
import math
import os
import matplotlib.pyplot as plt
import time


# Constants

### Environment Constants

In [None]:
SCALE_FACTOR = 1
ORIGINAL_SCREEN_WIDTH, ORIGINAL_SCREEN_HEIGHT = int(2616 / 2.2), int(1816 / 2.2)
SCREEN_WIDTH, SCREEN_HEIGHT = int(ORIGINAL_SCREEN_WIDTH * SCALE_FACTOR), int(ORIGINAL_SCREEN_HEIGHT * SCALE_FACTOR)
CELL_SIZE = max(1, int(5 * SCALE_FACTOR))
GRID_WIDTH, GRID_HEIGHT = SCREEN_WIDTH // CELL_SIZE, SCREEN_HEIGHT // CELL_SIZE
AGENT_SIZE = 3
TARGET_SIZE = 5
FPS = 60

### Training Constants

In [None]:
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 0.998
TARGET_UPDATE = 100
MEMORY_SIZE = 100000
LEARNING_RATE =  0.0001

### Image Paths Constants

In [None]:
# Root directory
ROOT_DIR = os.path.abspath(os.path.join(os.getcwd() , ".."))

# Path to images
ASSETS_DIR = os.path.join(ROOT_DIR, "images", "proj_assets")

# Image paths
IMAGE_PATHS = {
    "env": os.path.join(ASSETS_DIR, "env2.png"),
    "agent":os.path.join(ASSETS_DIR, "crabs.png"),
    "target": os.path.join(ASSETS_DIR, "moneybag.png"),
}

# Environment

In [None]:
class RobotEnv(gym.Env):
    
    def __init__(self):
        super(RobotEnv, self).__init__()
        self.action_space = spaces.Discrete(4)  # Up, Right, Down, Left
        self.observation_space = spaces.Box(
            low=0, high=1, shape=(12,), dtype=np.float32
        )
        self.env = self._create_occupancy_grid()
        self._add_obstacles()
        self.screen = None
        self.clock = None
        self.episode_outcomes = []
        self.current_episode = 0

        self.target_image = pygame.transform.scale(pygame.image.load(IMAGE_PATHS["target"]), (TARGET_SIZE * CELL_SIZE, TARGET_SIZE * CELL_SIZE))
        self.agent_image = pygame.transform.scale(pygame.image.load(IMAGE_PATHS["agent"]), (AGENT_SIZE * CELL_SIZE, AGENT_SIZE * CELL_SIZE))

        self.reset()
    
    def _create_occupancy_grid(self):
        return np.zeros((GRID_WIDTH, GRID_HEIGHT))

    def _add_circular_obstacle(self, center, radius):
        cx, cy = int(center[0] * SCALE_FACTOR), int(center[1] * SCALE_FACTOR)
        scaled_radius = int(radius * SCALE_FACTOR)
        for x in range(GRID_WIDTH):
            for y in range(GRID_HEIGHT):
                if (x + 0.5 - cx) ** 2 + (y + 0.5 - cy) ** 2 <= scaled_radius ** 2:
                    self.env[x][y] = 1

    def _add_rectangular_obstacle(self, top_left, width, height):
        start_x, start_y = int(top_left[0] * SCALE_FACTOR), int(top_left[1] * SCALE_FACTOR)
        scaled_width, scaled_height = int(width * SCALE_FACTOR), int(height * SCALE_FACTOR)
        for x in range(start_x, min(start_x + scaled_width, GRID_WIDTH)):
            for y in range(start_y, min(start_y + scaled_height, GRID_HEIGHT)):
                self.env[x][y] = 1

    def _add_obstacles(self):
        # Frame
        self._add_rectangular_obstacle((0, 1), GRID_WIDTH, 15)
        self._add_rectangular_obstacle((0, 152), GRID_WIDTH, 15)
        self._add_rectangular_obstacle((0, 1), 17, GRID_HEIGHT)
        self._add_rectangular_obstacle((222, 1), 17, GRID_HEIGHT)

        # Dining room (Tables then Barrels)
        self._add_circular_obstacle((146, 133), 10)
        self._add_circular_obstacle((74, 133), 10)
        self._add_circular_obstacle((76, 102), 9)
        self._add_circular_obstacle((38, 118), 11)
        self._add_circular_obstacle((187, 118), 12)
        self._add_circular_obstacle((157, 91), 10)

        self._add_circular_obstacle((90, 115), 3)
        self._add_circular_obstacle((162, 112), 3)
        self._add_circular_obstacle((139, 100), 3)
        self._add_circular_obstacle((181, 96), 3)
        self._add_circular_obstacle((210, 123), 3)
        self._add_circular_obstacle((55, 122), 3)
        self._add_circular_obstacle((24, 93), 3)
        self._add_circular_obstacle((24, 137), 3)
        self._add_circular_obstacle((81, 145), 3)

        self._add_rectangular_obstacle((101, 70), 28, 18)
        self._add_circular_obstacle((115, 90), 14)

        # Inner walls
        self._add_rectangular_obstacle((0, 68), 33, 5)
        self._add_rectangular_obstacle((46, 68), 89, 5)
        self._add_rectangular_obstacle((153, 68), 38, 5)
        self._add_rectangular_obstacle((208, 68), 38, 5)

        self._add_rectangular_obstacle((68, 1), 5, 42)
        self._add_rectangular_obstacle((68, 61), 5, 10)
        self._add_rectangular_obstacle((183, 1), 5, 70)

        # Kitchen area
        self._add_rectangular_obstacle((70, 1), 12, 42)
        self._add_rectangular_obstacle((104, 1), 23, 26)
        self._add_rectangular_obstacle((101, 57), 27, 18)
        self._add_rectangular_obstacle((81, 57), 20, 14)
        self._add_rectangular_obstacle((177, 36), 8, 20)

        self._add_circular_obstacle((135, 21), 3)
        self._add_circular_obstacle((143, 21), 3)

        # Crabs room
        self._add_rectangular_obstacle((35, 32), 19, 12)
        self._add_rectangular_obstacle((38, 24), 12, 12)

        self._add_circular_obstacle((33, 52), 3)
        self._add_circular_obstacle((53, 53), 3)

        # Bathroom
        self._add_rectangular_obstacle((185, 21), 14, 30)
        self._add_rectangular_obstacle((185, 55), 10, 8)
    
    def _initialize_pygame(self, title="Double DQN Agent Simulation"):
        pygame.init()
        screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
        pygame.display.set_caption(title)
        return screen

    def _load_and_scale_image(self):
        image = pygame.image.load(IMAGE_PATHS["env"])
        return pygame.transform.scale(image, (SCREEN_WIDTH, SCREEN_HEIGHT))

    def _draw_occupancy_grid(self):
        for x in range(GRID_WIDTH):
            for y in range(GRID_HEIGHT):
                if self.env[x, y] == 1:
                    pygame.draw.rect(self.screen, (128, 128, 128),
                                (x * CELL_SIZE, y * CELL_SIZE, CELL_SIZE, CELL_SIZE))

    def _draw_image(self, screen, image, grid_x, grid_y, cell_size):

        # Calculate the center position of the grid cell
        center_x = grid_x * cell_size + cell_size // 2
        center_y = grid_y * cell_size + cell_size // 2
        
        # Calculate the top-left corner for the image to be centered
        top_left_x = center_x - image.get_width() // 2
        top_left_y = center_y - image.get_height() // 2
        
        # Draw the image on the screen
        screen.blit(image, (top_left_x, top_left_y))

    def _is_valid_position(self, x, y, size):
        for dx in range(size):
            for dy in range(size):
                nx, ny = x + dx, y + dy
                if nx < 0 or nx >= GRID_WIDTH or ny < 0 or ny >= GRID_HEIGHT or self.env[nx, ny] == 1:
                    return False
        return True
    
    def _get_state(self):
        observation = []
        for dx, dy in [(0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1), (-1, 0), (-1, 1)]:
            nx, ny = self.x + dx * AGENT_SIZE, self.y + dy * AGENT_SIZE
            if self._is_valid_position(nx, ny, AGENT_SIZE):
                observation.append(0.0)
            else:
                observation.append(1.0)
        state = [self.x / GRID_WIDTH, self.y / GRID_HEIGHT,
                self.target_x / GRID_WIDTH, self.target_y / GRID_HEIGHT] + observation
        return np.array(state, dtype=np.float32)
    
    def get_episode_outcomes(self):
        return self.episode_outcomes
    
    def step(self, action):
        self.steps += 1
        dx, dy = [(0, -1), (1, 0), (0, 1), (-1, 0)][action]
        new_x, new_y = self.x + dx, self.y + dy
        new_distance = math.sqrt((new_x - self.target_x) ** 2 + (new_y - self.target_y) ** 2)
        reward = 0.0
        done = False
        info = {'target_reached': False}
       
        if self._is_valid_position(new_x, new_y, AGENT_SIZE):
            self.x, self.y = new_x, new_y
            target_left = self.target_x - TARGET_SIZE / 2
            target_right = self.target_x + TARGET_SIZE / 2
            target_top = self.target_y - TARGET_SIZE / 2
            target_bottom = self.target_y + TARGET_SIZE / 2
           
            # Reaching target
            if target_left <= self.x <= target_right and target_top <= self.y <= target_bottom:
                reward = 50.0
                done = True
                info['target_reached'] = True
                self.episode_outcomes.append(1)
                return self._get_state(), reward, done, info
               
            # Distance-based reward with smoother scaling
            distance_delta = self.current_distance - new_distance
            if distance_delta > 0:
                reward = distance_delta
            else:
                reward = distance_delta - 0.2
               
            reward -= 0.01
            self.current_distance = new_distance
        else:
            reward = -5.0
            done = True
            self.episode_outcomes.append(0)
           
        if self.steps >= 500:
            done = True
            if done and not info['target_reached']:
                self.episode_outcomes.append(0)
           
        return self._get_state(), reward, done, info

    def reset(self):
        self.x = 50
        self.y = 20
        self.target_x = 145
        self.target_y = 115
        self.steps = 0
        
        # Calculate initial distance for reward scaling
        self.initial_distance = math.sqrt((self.x - self.target_x) ** 2 + (self.y - self.target_y) ** 2)
        self.current_distance = self.initial_distance

        # Keep track of episodes
        self.current_episode += 1
        
        return self._get_state()

    def render(self, reward=None, episode=None, draw_obstacles=None):
        if self.screen is None:

            self.screen = self._initialize_pygame()
            self.background_image = self._load_and_scale_image()
            self.clock = pygame.time.Clock()

        # Fill the screen with the background image
        self.screen.blit(self.background_image, (0, 0))
        
        # Draw obstacles
        if draw_obstacles is not None:
            self._draw_occupancy_grid()
        
        # Draw the target image
        self._draw_image(self.screen, self.target_image, self.target_x, self.target_y, CELL_SIZE)

        # Draw the agent image
        self._draw_image(self.screen, self.agent_image, self.x, self.y, TARGET_SIZE)
        
        # Render reward and episode number
        if reward is not None and episode is not None:
            font = pygame.font.Font(None, 36)
            text = font.render(f'Episode: {episode}  Reward: {reward:.2f}', True, (255, 255, 255))
            self.screen.blit(text, (450, 25))
        
        pygame.display.flip()
        self.clock.tick(FPS)

    def close(self):
        if self.screen is not None:
            pygame.quit()
            self.screen = None


# Agent

In [None]:
class DoubleDQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        
        # Initialize experience replay memory
        self.memory = deque(maxlen=MEMORY_SIZE)
        
        # Initialize networks
        self.policy_net = self._build_network()
        self.target_net = self._build_network()
        self.target_net.set_weights(self.policy_net.get_weights())
        
        # Training steps counter
        self.steps_done = 0
        self.train_step_counter = tf.Variable(0)
        
        # Optimizer
        self.optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)
        
    def _build_network(self):
        model = Sequential()
        model.add(Dense(64, input_dim=self.state_size, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=LEARNING_RATE))
        return model

    def store_transition(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def select_action(self, state, training=True):
        if training:
            sample = random.random()
            eps_threshold = EPS_END + (EPS_START - EPS_END) * \
                          math.exp(-1. * self.steps_done / EPS_DECAY)
            self.steps_done += 1
            
            if sample > eps_threshold:
                state_tensor = tf.convert_to_tensor(state)
                state_tensor = tf.expand_dims(state_tensor, 0)
                action_values = self.policy_net(state_tensor, training=False)
                return tf.argmax(action_values[0]).numpy()
            else:
                return random.randrange(self.action_size)
        else:
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_values = self.policy_net(state_tensor, training=False)
            return tf.argmax(action_values[0]).numpy()
    
    @tf.function
    def _train_step(self, states, actions, rewards, next_states, dones):
        with tf.GradientTape() as tape:
            # Current Q values
            current_q_values = self.policy_net(states, training=True)
            current_q = tf.reduce_sum(current_q_values * tf.one_hot(actions, self.action_size), axis=1)
            
            # Double DQN Target Q values
            next_q_values_policy = self.policy_net(next_states, training=False)
            next_actions = tf.argmax(next_q_values_policy, axis=1)
            next_q_values_target = self.target_net(next_states, training=False)
            max_next_q = tf.reduce_sum(next_q_values_target * tf.one_hot(next_actions, self.action_size), axis=1)
            
            target_q = rewards + (1 - tf.cast(dones, tf.float32)) * GAMMA * max_next_q
            
            # Compute loss
            loss = tf.reduce_mean(tf.square(target_q - current_q))
        
        # Compute gradients and update weights
        gradients = tape.gradient(loss, self.policy_net.trainable_variables)
        # Clip gradients
        gradients = [tf.clip_by_value(grad, -1.0, 1.0) for grad in gradients]
        self.optimizer.apply_gradients(zip(gradients, self.policy_net.trainable_variables))
        
        return loss
    
    def train(self):
        if len(self.memory) < BATCH_SIZE:
            return 0
        
        # Sample batch
        batch = random.sample(self.memory, BATCH_SIZE)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        
        # Convert to tensors
        states = tf.convert_to_tensor(states, dtype=tf.float32)
        actions = tf.convert_to_tensor(actions, dtype=tf.int32)
        rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
        next_states = tf.convert_to_tensor(next_states, dtype=tf.float32)
        dones = tf.convert_to_tensor(dones, dtype=tf.float32)
        
        # Perform training step
        loss = self._train_step(states, actions, rewards, next_states, dones)
        
        # Update target network if needed
        self.train_step_counter.assign_add(1)
        if self.train_step_counter % TARGET_UPDATE == 0:
            self.update_target_network()
        
        return loss.numpy()
    
    def update_target_network(self):
        self.target_net.set_weights(self.policy_net.get_weights())
    
    def save_model(self, filename):
        policy_filename = f"{filename}_policy.weights.h5"
        target_filename = f"{filename}_target.weights.h5"

        # Save policy network weights
        self.policy_net.save_weights(policy_filename)
        
        # Save target network weights
        self.target_net.save_weights(target_filename)

    def load_model(self, filename):
        # Adjust paths to match saved weight filenames
        policy_path = f"{filename}_policy.weights.h5"
        if os.path.exists(policy_path):
            self.policy_net.load_weights(policy_path)

        target_path = f"{filename}_target.weights.h5"
        if os.path.exists(target_path):
            self.target_net.load_weights(target_path)

# Plotting

### Plot Reward per Episode

In [None]:
def plot_training_progress(episode_rewards):
    # Plot the training progress (total reward per episode)
    plt.plot(range(len(episode_rewards)), episode_rewards)
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Training Progress - Total Reward per Episode')
    plt.show()

### Plot Successful Episode Over Time

In [None]:
def plot_episode_outcomes(episode_outcomes):
    # Plot the outcomes (1 for success, 0 for failure)
    plt.plot(episode_outcomes, marker='o', linestyle='-', color='b', markersize=4)
    plt.xlabel('Episode')
    plt.ylabel('Outcome (1=Success, 0=Failure)')
    plt.title('Episode Outcomes: Success or Failure')
    plt.yticks([0, 1], ['Failure', 'Success'])
    plt.grid(True)
    plt.show()

# Training

In [None]:
def train(num_episodes=1000, model_prefix='robot_model'):
    env = RobotEnv()
    agent = DoubleDQNAgent(
        state_size=env.observation_space.shape[0],
        action_size=env.action_space.n,
    )
    
    episode_rewards = []
    episode_losses = []
    best_reward = float('-inf')
    episode_times = []
    
    for episode in range(num_episodes):
        start_time = time.time()
        
        state = env.reset()
        total_reward = 0
        done = False
        
        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            
            agent.store_transition(state, action, reward, next_state, done)
            
            if len(agent.memory) >= BATCH_SIZE:
                loss = agent.train()
                episode_losses.append(loss)
            
            total_reward += reward
            state = next_state
            
            env.render(reward=total_reward, episode=episode)
            
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    env.close()
                    return
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        episode_times.append(elapsed_time)
        
        episode_rewards.append(total_reward)
        
        # Save best model
        if total_reward > best_reward:
            best_reward = total_reward
            agent.save_model(f"{model_prefix}_best")
        
        # Episodic save
        if episode % 100 == 0:
            agent.save_model(f"{model_prefix}_ep{episode}")
        
        if episode % 10 == 0:
            avg_reward = sum(episode_rewards[-10:]) / 10

        print(f'Episode {episode}: Total Reward = {total_reward:.2f}, Avg Reward (10 ep) = {avg_reward:.2f}, Elapsed Time = {elapsed_time:.2f} seconds')
    
    episode_outcomes = env.get_episode_outcomes()
    env.close()

    return episode_rewards, episode_outcomes, episode_losses, episode_times


# Testing

In [None]:
def test_agent(num_episodes=100, model_prefix='robot_model_best'):
    env = RobotEnv()
    agent = DoubleDQNAgent(
        state_size=env.observation_space.shape[0],
        action_size=env.action_space.n,
    )
    
    # Load the trained model
    agent.load_model(model_prefix)
    
    episode_rewards = []
    episode_times = [] 
    
    for episode in range(num_episodes):
        # Start the timer
        start_time = time.time()
        
        state = env.reset()
        total_reward = 0
        done = False
        
        while not done:
            # Select action using the trained policy (no exploration)
            action = agent.select_action(state, training=False)
            next_state, reward, done, _ = env.step(action)
            
            total_reward += reward
            state = next_state
            
            env.render(reward=total_reward, episode=episode)
            
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    env.close()
                    return episode_rewards, env.get_episode_outcomes(), episode_times
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        episode_times.append(elapsed_time)
        
        episode_rewards.append(total_reward)
        print(f"Episode {episode + 1}/{num_episodes}: Total Reward = {total_reward:.2f}, Elapsed Time = {elapsed_time:.2f} seconds")
    
    # Gather and print test statistics
    avg_reward = sum(episode_rewards) / num_episodes
    max_reward = max(episode_rewards)
    min_reward = min(episode_rewards)
    
    print(f"Test Completed: Avg Reward = {avg_reward:.2f}, Max Reward = {max_reward:.2f}, Min Reward = {min_reward:.2f}")
    
    # Collect episode outcomes or statistics
    episode_outcomes = env.get_episode_outcomes()
    
    env.close()
    return episode_rewards, episode_outcomes, episode_times

# Run Training

In [None]:
episode_rewards, episode_outcomes, _, _ = train(130)
print("Training completed! Average reward:", sum(episode_rewards) / len(episode_rewards))

In [None]:
plot_training_progress(episode_rewards)

In [None]:
plot_episode_outcomes(episode_outcomes)

# Load Model

### Initialize env and then load pre-trained model

In [None]:
episode_rewards, episode_outcomes, _ = test_agent(10)
print("Testing completed! Average reward:", sum(episode_rewards) / len(episode_rewards))

In [None]:
plot_training_progress(episode_rewards)

In [None]:
plot_episode_outcomes(episode_outcomes)