In [170]:
%matplotlib inline
import numpy as np
import random
import matplotlib.pyplot as plt
import seaborn as sns

In [171]:
import numpy as np

import gym
from gym import spaces

import pygame

class RCMazeEnv(gym.Env):
    def __init__(self, maze_size_x=12, maze_size_y=12):
        self.maze_size_x = maze_size_x
        self.maze_size_y = maze_size_y
        self.maze = self.generate_maze()
        self.car_position = (1, 1)
        self.possible_actions = range(3)
        self.car_orientation = 'N'
        self.sensor_readings = {'front': 0, 'left': 0, 'right': 0}
        self.steps = 0
        self.previous_distance = 0
        self.goal = (10, 10)
        self.previous_steps = 0
        self.reset()

            
    def generate_maze(self):
        # For simplicity, create a static maze with walls
        # '1' represents a wall, and '0' represents an open path
        maze = np.zeros((self.maze_size_y, self.maze_size_x), dtype=int)
        # Add walls to the maze (this can be customized)

        layout = [
                [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
                [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
                [1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1],
                [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
                [1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1],
                [1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1],
                [1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1],
                [1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1],
                [1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1],
                [1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1],
                [1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1],
                [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
        
     
        maze = np.array(layout)

        return maze

    def reset(self):
        self.car_position = (1, 1)
        self.car_orientation = 'N'
        self.update_sensor_readings()
        self.steps = 0
        self.previous_distance = 0
        self.previous_steps = 0
        return self.get_state()

    def step(self, action):
        if action == 0:
            self.move_forward()
        elif action == 1:
            self.turn_left()
        elif action == 2:
            self.turn_right()
        self.update_sensor_readings()
        reward = self.compute_reward()
        self.steps += 1
        done = self.is_done()
        return self.get_state(), reward, done

    
    def move_forward(self):
        x, y = self.car_position
        if self.car_orientation == 'N' and y > 0 and self.maze[y - 1][x] != 1:
            self.car_position = (x, y - 1)
        elif self.car_orientation == 'S' and y < self.maze_size_y - 1 and self.maze[y + 1][x] != 1:
            self.car_position = (x, y + 1)
        elif self.car_orientation == 'E' and x < self.maze_size_x - 1 and self.maze[y][x + 1] != 1:
            self.car_position = (x + 1, y)
        elif self.car_orientation == 'W' and x > 0 and self.maze[y][x - 1] != 1:
            self.car_position = (x - 1, y)
        

    def turn_left(self):
        orientations = ['N', 'W', 'S', 'E']
        idx = orientations.index(self.car_orientation)
        self.car_orientation = orientations[(idx + 1) % 4]

    def turn_right(self):
        orientations = ['N', 'E', 'S', 'W']
        idx = orientations.index(self.car_orientation)
        self.car_orientation = orientations[(idx + 1) % 4]

    def update_sensor_readings(self):
        # Simple sensor implementation: counts steps to the nearest wall
        self.sensor_readings['front'] = self.distance_to_wall('front')
        self.sensor_readings['left'] = self.distance_to_wall('left')
        self.sensor_readings['right'] = self.distance_to_wall('right')

    def distance_to_wall(self, direction):
        x, y = self.car_position
        distance = 0
        if direction == 'front':
            if self.car_orientation == 'N':
                while y - distance >= 0 and self.maze[y - distance][x] != 1:
                    distance += 1
            elif self.car_orientation == 'S':
                while y + distance < self.maze_size_y and self.maze[y + distance][x] != 1:
                    distance += 1
            elif self.car_orientation == 'E':
                while x + distance < self.maze_size_x and self.maze[y][x + distance] != 1:
                    distance += 1
            elif self.car_orientation == 'W':
                while x - distance >= 0 and self.maze[y][x - distance] != 1:
                    distance += 1
        elif direction == 'left':
            if self.car_orientation == 'N':
                while x - distance >= 0 and self.maze[y][x - distance] != 1:
                    distance += 1
            elif self.car_orientation == 'S':
                while x + distance < self.maze_size_x and self.maze[y][x + distance] != 1:
                    distance += 1
            elif self.car_orientation == 'E':
                while y - distance >= 0 and self.maze[y - distance][x] != 1:
                    distance += 1
            elif self.car_orientation == 'W':
                while y + distance < self.maze_size_y and self.maze[y + distance][x] != 1:
                    distance += 1
        elif direction == 'right':
            if self.car_orientation == 'N':
                while x + distance < self.maze_size_x and self.maze[y][x + distance] != 1:
                    distance += 1
            elif self.car_orientation == 'S':
                while x - distance >= 0 and self.maze[y][x - distance] != 1:
                    distance += 1
            elif self.car_orientation == 'E':
                while y + distance < self.maze_size_y and self.maze[y + distance][x] != 1:
                    distance += 1
            elif self.car_orientation == 'W':
                while y - distance >= 0 and self.maze[y - distance][x] != 1:
                    distance += 1
        
        return distance

    def compute_reward(self):
        reward = 0

        # Penalty for hitting walls or going out of bounds
        if self.sensor_readings['front'] == 0 or self.sensor_readings['left'] == 0 or self.sensor_readings['right'] == 0:
            reward -= 100

        # Reward for reaching the goal
        if self.car_position == self.goal:
            reward += 1000
            return reward  # Return immediately as this is the terminal state

        # Calculate reward based on reduced distance to goal
        x, y = self.car_position
        goal_x, goal_y = self.goal
        distance = abs(x - goal_x) + abs(y - goal_y)
        

        # Assuming previous_distance is stored after each move
        if distance < self.previous_distance:
            reward += 300  # Positive reward for moving closer to the goal
        elif distance > self.previous_distance:
            reward -= 150   # Negative reward for moving farther from the goal
            
        if distance < 0.5:
            reward += 200
            
            
        if self.steps > 800:
            reward -= 100

        # Update previous_distance for the next step
        self.previous_distance = distance

        return reward

        

    def is_done(self):
        # Define when the episode ends
        # ends when the car reaches the goal or it takes more than 100 steps 
        return self.car_position == self.goal or self.steps > 3000
        
        
    # def state_to_tuple(self, state):
      
    #   #((0, 0), 'N', {'front': 1, 'left': 0, 'right': 0})
    #   # if like this convert to ((0, 0), 'N', (1, 0, 0))
    #   if not isinstance(state[2], dict):
    #      # print(state)
    #      # print(state[2])
    #      #take state[2] and make it from this (1, 0, 0) to this {'front': 1, 'left': 0, 'right': 0}
    #      newState = {'front': state[2][0], 'left': state[2][1], 'right': state[2][2]}
    #      # print(newState)
    #      #create a new state with the [2] being the new dictionary
    #      state = (state[0], state[1], newState)
         
    #   # Convert the state dictionary to a hashable tuple
    #   # Adjust this based on the specific format of your state
    #   position, orientation, sensor_readings = state
    #   sensor_readings_tuple = tuple(sensor_readings.values())
    #   return (position, orientation, sensor_readings_tuple)    
    
    def get_state(self):
        car_position = [float(coord) for coord in self.car_position]
        sensor_readings = [float(value) for value in self.sensor_readings.values()]
        
        state = car_position + [self.car_orientation] + sensor_readings
        
        # cast state to this ['1.0' '1.0' 'N' '1.0' '1.0' '10.0']
        state = np.array(state, dtype=str)
        
        #get the orientation and convert do label encoding
        if state[2] == 'N':
            state[2] = 0
        elif state[2] == 'E':
            state[2] = 1
        elif state[2] == 'S':
            state[2] = 2
        elif state[2] == 'W':
            state[2] = 3
            
        state = np.array(state, dtype=float)
        print('get state:',state)
        
        return state
        
        

    # def render(self):
    #     rendered_maze = np.array(self.maze, dtype=str)
    #     x, y = self.car_position
    #     rendered_maze[y][x] = 'C'  # Representing the car
        
    #     #print array
    #     print(rendered_maze, '\n') 

    
    def init_pygame(self):
        # Initialize Pygame and set up the display
        pygame.init()
        self.cell_size = 40  # Size of each cell in pixels
        self.maze_size_x = 12  # Assuming the maze size_x is 12
        self.maze_size_y = 12  # Assuming the maze size_y is 12
        self.width = 600
        self.height = 600
        self.screen = pygame.display.set_mode((self.width, self.height))
        self.clock = pygame.time.Clock()

    def render(self):
        # Render the environment using Pygame
        for y in range(self.maze_size_y):
            for x in range(self.maze_size_x):
                rect = pygame.Rect(x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size)
                if (x, y) == self.goal:  # Goal position
                    color = (0, 255, 0)  # Green color for the goal
                elif self.maze[y][x] == 0:
                    color = (255, 255, 255)  # White color for empty space
                else:
                    color = (0, 0, 0)  # Black color for walls
                pygame.draw.rect(self.screen, color, rect)

        # Draw the car
        car_x, car_y = self.car_position
        car_rect = pygame.Rect(car_x * self.cell_size, car_y * self.cell_size, self.cell_size, self.cell_size)
        pygame.draw.rect(self.screen, (255, 0, 0), car_rect)  # Red color for the car

        pygame.display.flip()
        self.clock.tick(60)  # Limit the frame rate to 60 FPS


    def close_pygame(self):
        # Close the Pygame window
        pygame.quit()

In [172]:
import tensorflow as tf
import numpy as np
import gym


In [186]:
class PPOAgent:
    def __init__(self, action_dim, observation_dim, env=RCMazeEnv()):
        self.action_dim = action_dim
        self.observation_dim = observation_dim
        self.policy_network = self.build_policy_network()
        self.value_network = self.build_value_network()
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.clip_epsilon = 0.2
        self.env = env

    def build_policy_network(self):
        # Create a policy network appropriate for your custom environment
        policy_network = tf.keras.Sequential([
            tf.keras.layers.Input(shape=(6,)),  # Adjust the input shape to (None, 6)
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_dim, activation='softmax')
        ])
        return policy_network

    def build_value_network(self):
        # Create a value network appropriate for your custom environment
        value_network = tf.keras.Sequential([
            tf.keras.layers.Input(shape=(6,)),  # Adjust the input shape to (None, 6)
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_dim, activation='linear')
        ])
        return value_network

    def compute_loss(self, observations, actions, advantages, old_probabilities):
        # Compute the surrogate loss for PPO
        new_probabilities = self.policy_network(observations)
        action_masks = tf.one_hot(actions, self.action_dim, dtype=tf.float32)
        new_action_probabilities = tf.reduce_sum(action_masks * new_probabilities, axis=1)
        old_action_probabilities = tf.reduce_sum(action_masks * old_probabilities, axis=1)

        ratio = new_action_probabilities / (old_action_probabilities + 1e-10)
        surrogate_objective = tf.minimum(ratio * advantages,
                                         tf.clip_by_value(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages)
        loss = -tf.reduce_mean(surrogate_objective)
        return loss

    def train_step(self, observations, actions, advantages, old_probabilities):
        with tf.GradientTape() as tape:
            loss = self.compute_loss(observations, actions, advantages, old_probabilities)
        grads = tape.gradient(loss, self.policy_network.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.policy_network.trainable_variables))
        return loss

    def train(self, num_episodes, max_steps_per_episode):
        for episode in range(num_episodes):
            observations = []
            actions = []
            rewards = []
            old_probabilities = []

            observation = self.env.reset()
            for step in range(max_steps_per_episode):
                flattened_observation = np.array([])
                for element in observation:
                    if isinstance(element, tuple):
                        flattened_observation = np.concatenate((flattened_observation, np.array(element)))
                    else:
                        flattened_observation = np.concatenate((flattened_observation, np.array([element])))
                
                print('flattened_observation:',flattened_observation)
                action_probabilities = self.policy_network.predict(np.expand_dims(flattened_observation, axis=0))
                action = np.random.choice(self.action_dim, p=action_probabilities.ravel())
                next_observation, reward, done = self.env.step(action)
                observation = next_observation  # Update observation for the next step


                observations.append(observation)
                actions.append(action)
                rewards.append(reward)
                old_probabilities.append(action_probabilities.ravel())

                observation = next_observation

                if done:
                    break

            discounted_rewards = self.compute_discounted_rewards(rewards)
            
            print('discounted_rewards:',discounted_rewards)
            advantages = self.compute_advantages(discounted_rewards, observations)

            observations = np.vstack(observations)
            actions = np.array(actions)
            old_probabilities = np.vstack(old_probabilities)

            for _ in range(10):  # PPO optimization steps
                loss = self.train_step(observations, actions, advantages, old_probabilities)

            print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {sum(rewards)}")

    def compute_discounted_rewards(self, rewards, gamma=0.99):
        discounted_rewards = np.zeros_like(rewards, dtype=np.float32)
        running_add = 0
        for t in reversed(range(len(rewards))):
            running_add = running_add * gamma + rewards[t]
            discounted_rewards[t] = running_add
        mean = np.mean(discounted_rewards)
        std = np.std(discounted_rewards)
        discounted_rewards = (discounted_rewards - mean) / (std + 1e-8)
        return discounted_rewards

    def compute_advantages(self, discounted_rewards, observations):
        try:
            values = self.value_network.predict(observations)
            print('values:',values)
            advantages = discounted_rewards - values.ravel()
            print('advantages:',advantages)
            return advantages
        except:
            return discounted_rewards
    

get state: [ 1.  1.  0.  1.  1. 10.]


In [187]:
def state_to_tuple( state):
      
      #((0, 0), 'N', {'front': 1, 'left': 0, 'right': 0})
      # if like this convert to ((0, 0), 'N', (1, 0, 0))
      if not isinstance(state[2], dict):
         # print(state)
         # print(state[2])
         #take state[2] and make it from this (1, 0, 0) to this {'front': 1, 'left': 0, 'right': 0}
         newState = {'front': state[2][0], 'left': state[2][1], 'right': state[2][2]}
         # print(newState)
         #create a new state with the [2] being the new dictionary
         state = (state[0], state[1], newState)
         
      # Convert the state dictionary to a hashable tuple
      # Adjust this based on the specific format of your state
      position, orientation, sensor_readings = state
      sensor_readings_tuple = tuple(sensor_readings.values())
      return (position, orientation, sensor_readings_tuple)

In [188]:
env = RCMazeEnv()  # Create your custom environment

observation_dim = env.reset().shape[0]  # Adjust this based on your custom environment's state space
action_dim = 3  # Adjust this based on your custom environment's action space
ppo_agent = PPOAgent(action_dim, observation_dim,env)

ppo_agent.train(num_episodes=100, max_steps_per_episode=200)


get state: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  1. 10.  1. 10.]
flattened_observation: [ 1.  1.  1. 10.  1. 10.]
get state: [ 1.  1.  2. 10. 10.  1.]
flattened_observation: [ 1.  1.  2. 10. 10.  1.]
get state: [ 1.  1.  3.  1. 10.  1.]
flattened_observation: [ 1.  1.  3.  1. 10.  1.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  1. 10.  1. 10.]
flattened_observation: [ 1.  1.  1. 10.  1. 10.]
get state: [ 1.  1.  2. 10. 10.  1.]
flattened_observation: [ 1.  1.  2. 10. 10.  1.]
get state: [ 1.  1.  3.  1. 10.  1.]
flattened_observation: [ 1.  1.  3.  1. 10.  1.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  1. 10.  1. 10.]
flattened_observation: [ 1.  1.  1. 10.  1. 10.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  

2024-01-02 18:32:34.522342: I external/local_xla/xla/service/service.cc:168] XLA service 0xed303a0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2024-01-02 18:32:34.522358: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 4080, Compute Capability 8.9
2024-01-02 18:32:34.528085: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:269] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-01-02 18:32:34.537910: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:454] Loaded cuDNN version 8902
I0000 00:00:1704216754.582293   16029 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Episode 1/100, Total Reward: 300
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  3.  1. 10.  1.]
flattened_observation: [ 1.  1.  3.  1. 10.  1.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  3.  1. 10.  1.]
flattened_observation: [ 1.  1.  3.  1. 10.  1.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  1. 10.  1. 10.]
flattened_observation: [ 1.  1.  1. 10.  1. 10.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  3.  1. 10.  1.]
flattened_observation: [ 1.  1.  3.  1. 10.  1.]
get state: [ 1.  1.  0.  1.  1. 10.]
flattened_observation: [ 1.  1.  0.  1.  1. 10.]
get state: [ 1.  1.  1. 10.  1. 10.]
flattened_observation: [ 1.  1.  1. 10.  1. 10.]
get state: [2. 1. 1. 

In [189]:
observation = env.reset()
total_reward = 0
while True:
    action_probabilities = agent.policy_network.predict(np.expand_dims(observation, axis=0))
    action = np.random.choice(agent.action_dim, p=action_probabilities.ravel())
    observation, reward, done, _ = env.step(action)
    total_reward += reward
    if done:
        break
print(f"Total Reward: {total_reward}")


get state: [ 1.  1.  0.  1.  1. 10.]


AttributeError: 'PPOAgent' object has no attribute 'policy_network'