In [None]:
# Necessary Imports
import gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import time

# Define Custom Warehouse Environment in OpenAI gym
class WarehouseEnv(gym.Env):
    def __init__(self):
        super(WarehouseEnv, self).__init__()
        # Define size/shape (of grid)
        self.shape = (5, 5)  # Easy - start here for POC
        # self.shape = (10, 10) # Hard
        # Define action space
        self.action_space = gym.spaces.Discrete(4)  # Up, Down, Left, Right
        # Define observation space - custom environment
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=self.shape, dtype=np.float32
        ) #a matrix with our defined shape (grid layout) and 3 possible values defined by our low/high range [0 = empty, 0.5 = goal, 1 = present]
        # Define goal
        self.goal = (4, 4) #here a tuple defining our destination
        # Reset to initial state
        self.reset()

    def reset(self):
        # Set the state of the environment to zeros
        self.state = np.zeros(self.shape)
        # Define where our agent is and then place them, assigning observation value
        self.agent_position = [0, 0]
        self.state[self.agent_position[0], self.agent_position[1]] = 1
        # Place our goal, assigning observation value
        self.state[self.goal[0], self.goal[1]] = 0.5
        # Flatten to be compatible for neural network input layer
        return self.state.flatten()

    # Define what our action space will look like
    def step(self, action): #when taking step, have our environment and action
        movements = {
            0: (-1, 0),  # Up
            1: (1, 0),  # Down
            2: (0, -1),  # Left
            3: (0, 1),  # Right
        }
        #Given our action what is our new position
        new_pos = [
            self.agent_position[0] + movements[action][0],
            self.agent_position[1] + movements[action][1],
        ]
        #if the new position is valid, save it
        if 0 <= new_pos[0] < self.shape[0] and 0 <= new_pos[1] < self.shape[1]:
            self.agent_position = new_pos
        #rebuilding the board
        self.state = np.zeros(self.shape)
        self.state[self.agent_position[0], self.agent_position[1]] = 1
        self.state[self.goal[0], self.goal[1]] = 0.5

        done = False #start as not done
        reward = -1 #small penalty when take a step
        # if agent finds its position at the goal
        if self.agent_position == list(self.goal):
            reward = 10 #give reward
            done = True #change done to "true"
        # if not then return state flattened and return our metadata
        return self.state.flatten(), reward, done, {}

    #How do we want to represent our environment
    def render(self, mode="human"):
        for i in range(self.shape[0]):
            row = ""
            for j in range(self.shape[1]):
                if (i, j) == self.goal:
                    row += "G "  # Goal
                elif [i, j] == self.agent_position:
                    row += "A "  # Agent
                else:
                    row += ". "  # Empty space
            print(row)
        print()  # Blank line at the end for readability


# PPO Agent - houses the 2 models to form the policy and understand how making decisions
class PPOAgent:
    def __init__(self, state_shape, n_actions):
        self.state_shape = state_shape
        self.n_actions = n_actions
        self.gamma = 0.99
        self.clip_ratio = 0.2 #consider very large gradients up to this threshold (upper limit)
        self.learning_rate = 0.001

        self.actor = self.build_actor()
        self.critic = self.build_critic()

    # Actor Model = Policy Network = taking actions
    def build_actor(self):
        #define our architecture
        inputs = layers.Input(shape=self.state_shape) #input is our state shape
        x = layers.Dense(128, activation="relu")(inputs) #128 neurons
        x = layers.Dense(128, activation="relu")(x)
        outputs = layers.Dense(self.n_actions, activation="softmax")(x) #OUTPUT IS OUR ACTION
        #define and compile, give it a loss function to be compiled with the Adam optimizer
        model = tf.keras.Model(inputs, outputs)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
        )
        return model

    # Critic Model = Value Network = QUALITY OF THE STATE ACTION PAIR - VIA MSE (will be comparing actual with predicted)
    def build_critic(self):
        #define our architecture
        inputs = layers.Input(shape=self.state_shape)
        x = layers.Dense(128, activation="relu")(inputs)
        x = layers.Dense(128, activation="relu")(x)
        outputs = layers.Dense(1, activation="linear")(x) #OUTPUT is the quality of
         #define and compile, give it a loss function to be compiled with the Adam optimizer
        model = tf.keras.Model(inputs, outputs)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate),
            loss="mean_squared_error",
        )
        return model

    # Advantage for Training the Agent
    def compute_advantage(self, rewards, values):
        #Define local variables
        advantages = []
        discounted_sum = 0
        #Calculate the discounted sums of particular state given observations and rewards
        for reward, value in zip(reversed(rewards), reversed(values)):
            discounted_sum = reward + self.gamma * discounted_sum - value
            advantages.insert(0, discounted_sum)
        return np.array(advantages)

    # Train our A+C Networks
    def train(self, states, actions, rewards, values, old_probs):
        #Set out variables
        advantages = self.compute_advantage(rewards, values)
        actions_one_hot = tf.keras.utils.to_categorical(actions, self.n_actions) #actions as 1-hot vector

        # Update Actor via gradients of policy and weights in the actor
        with tf.GradientTape() as tape:
            probs = self.actor(states)
            new_probs = tf.reduce_sum(probs * actions_one_hot, axis=1)
            ratio = new_probs / (old_probs + 1e-10)
            #apply the clip advantage threshold
            clip_advantage = (
                tf.clip_by_value(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
                * advantages
            )
            #our loss function
            loss = -tf.reduce_mean(tf.minimum(ratio * advantages, clip_advantage))
        grads = tape.gradient(loss, self.actor.trainable_variables)
        self.actor.optimizer.apply_gradients(zip(grads, self.actor.trainable_variables))

        # Update Critic
        self.critic.train_on_batch(states, rewards)


# Training Loop
def train_agent(env, agent, episodes=500, render_interval=100):
    #track of our training history
    reward_history = []
    #put our policy against many different episodes to collect info
    for episode in range(episodes):
        #Reset environment
        state = env.reset()
        done = False
        #Initialize variables
        states, actions, rewards, values, old_probs = [], [], [], [], []

        while not done:
            #Collect experience:
            state = state.reshape(1, -1) #understand state
            action_probs = agent.actor(state).numpy()[0] #decision based on state - actor
            value = agent.critic(state).numpy()[0][0] #understand value of the decision - critic

            #action goes from random --> probability distribution
            action = np.random.choice(len(action_probs), p=action_probs)
            next_state, reward, done, _ = env.step(action)

            #Store experience
            states.append(state.flatten())
            actions.append(action)
            rewards.append(reward)
            values.append(value)
            old_probs.append(action_probs[action]) #to compare with previous policy

            # Render environment at specified intervals
            if episode % render_interval == 0:
                env.render()  # Show the current state of the environment
                time.sleep(0.5)  # Add a small delay to visualize the environment

            state = next_state

        # Train the agent
        agent.train(
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(values),
            np.array(old_probs),
        )
        #append to our reward history
        reward_history.append(sum(rewards))

        #keep track of average reward over time
        if (episode + 1) % 50 == 0:
            print(
                f"Episode {episode + 1}: Average Reward: {np.mean(reward_history[-50:])}"
            )
    #Return our reward history
    return reward_history


# Main function
def main():
    #Initialize enviroment
    env = WarehouseEnv()
    state_shape = env.state.flatten().shape
    n_actions = env.action_space.n

    #Initialize agent
    agent = PPOAgent(state_shape, n_actions)
    #Train agent
    reward_history = train_agent(env, agent)

    #Visualize the results
    plt.plot(reward_history)
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.title("PPO Training Performance")
    plt.show()


# Run the script
if __name__ == "__main__":
    main()


A . . . . 
. . . . . 
. . . . . 
. . . . . 
. . . . G 

. A . . . 
. . . . . 
. . . . . 
. . . . . 
. . . . G 

. . A . . 
. . . . . 
. . . . . 
. . . . . 
. . . . G 

. . . A . 
. . . . . 
. . . . . 
. . . . . 
. . . . G 

. . . . A 
. . . . . 
. . . . . 
. . . . . 
. . . . G 

. . . . . 
. . . . A 
. . . . . 
. . . . . 
. . . . G 

. . . . . 
. . . . . 
. . . . A 
. . . . . 
. . . . G 



KeyboardInterrupt: 