In [1]:
import numpy as np
import sys
import gym
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
#from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.legacy import Adam

from collections import deque
import random




# ***Policy Gradient***

In [55]:
class PolicyGradientAgent:
    def __init__(self, states, actions):
        # To see Cartpole learning, change to True
        self.render = False
        self.load_model = False

        # Define size of state and actions
        self.states = states
        self.actions = actions

        # Hyper parameters for the DQN

        self.learning_rate = 0.01
        self.gamma = 0.99
        self.eps = 0.0001

        self.model = self.build_model()


    def build_model(self):
        model = Sequential()
        model.add(Dense(128, input_dim=self.states, activation='relu', kernel_initializer='he_normal'))
        model.add(Dense(128, activation='relu', kernel_initializer='he_normal'))
        model.add(Dense(self.actions, activation='softmax', kernel_initializer='he_normal'))
        model.compile(optimizer=Adam(learning_rate=self.learning_rate),
                    loss='categorical_crossentropy')
        #model.summary()
        return model

    def print_Initial_W(self):
        print("Initial Model Weights:")
        for layer in self.model.layers:
            print(layer.get_weights())
        
    def choose_action(self, state):
        # Sample an action from the policy
        probabilities = self.model.predict(state, verbose=0)[0]      
        # Clip probabilities to prevent NaN
        probabilities = np.clip(probabilities, 1e-10, 1.0 - 1e-10)

        action = np.random.choice(self.actions, p=probabilities)
        return action

    def discount_rewards(self, rewards):
        discounted_rewards = np.zeros_like(rewards, dtype=np.float32)
        sum_reward = 0
        for t in reversed(range(len(rewards))):
            running_add = rewards[t] + self.gamma * sum_reward 
            discounted_rewards[t] = running_add
        if normalize:
            discounted_rewards = discounted_rewards-np.mean(discounted_rewards)/(np.std(discounted_rewards) + self.eps)
        return discounted_rewards

    def train(self, states, actions, rewards):
        discounted_rewards = self.discount_rewards(rewards)
        actions_one_hot = tf.one_hot(actions, self.actions, dtype=tf.float32)

        with tf.GradientTape() as tape:
            # Compute the log probability of the taken actions
            action_probabilities = self.model(states, training=True)
            chosen_action_probabilities = tf.reduce_sum(actions_one_hot * action_probabilities, axis=1)
            log_probabilities = -tf.math.log(chosen_action_probabilities)
            # Compute the loss (negative log-likelihood multiplied by discounted rewards)
            loss = tf.reduce_sum(log_probabilities * discounted_rewards)

        # Update the model using the gradient of the loss with respect to the model parameters
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))



In [56]:
if __name__ == "__main__":
    #env = gym.make('CartPole-v1', render_mode='human')
    env = gym.make('CartPole-v1')
    states = env.observation_space.shape[0]
    actions = env.action_space.n

    EPISODES = 300

    agent = PolicyGradientAgent(states, actions)

    scores, episodes = [], []

    for e in range(EPISODES):
        done = False
        score = 0
        state, _ = env.reset()
        state = np.reshape(state, [1, states])

        states_batch, actions_batch, rewards_batch = [], [], []

        #agent.print_Initial_W()

        while not done:
            #if agent.render:
            #    env.render()

            action = agent.choose_action(state)
            next_state, reward, done, _, _ = env.step(action)
            next_state = np.reshape(next_state, [1, states])

            states_batch.append(state)
            actions_batch.append(action)
            rewards_batch.append(reward)

            score += reward
            state = next_state

        # Train the model at the end of the episode
        states_batch = np.vstack(states_batch)
        actions_batch = np.array(actions_batch)
        rewards_batch = np.array(rewards_batch)
        agent.train(states_batch, actions_batch, rewards_batch)

        scores.append(score)
        episodes.append(e)

        print("Episode:", e, "|  Score:", score)

    env.close()

Episode: 0 |  Score: 14.0
Episode: 1 |  Score: 30.0
Episode: 2 |  Score: 11.0
Episode: 3 |  Score: 12.0
Episode: 4 |  Score: 19.0
Episode: 5 |  Score: 16.0
Episode: 6 |  Score: 16.0
Episode: 7 |  Score: 11.0
Episode: 8 |  Score: 10.0
Episode: 9 |  Score: 14.0
Episode: 10 |  Score: 14.0
Episode: 11 |  Score: 9.0
Episode: 12 |  Score: 8.0
Episode: 13 |  Score: 33.0
Episode: 14 |  Score: 10.0
Episode: 15 |  Score: 16.0
Episode: 16 |  Score: 15.0
Episode: 17 |  Score: 13.0
Episode: 18 |  Score: 10.0
Episode: 19 |  Score: 12.0
Episode: 20 |  Score: 13.0
Episode: 21 |  Score: 13.0
Episode: 22 |  Score: 12.0
Episode: 23 |  Score: 16.0
Episode: 24 |  Score: 21.0
Episode: 25 |  Score: 12.0
Episode: 26 |  Score: 10.0
Episode: 27 |  Score: 11.0
Episode: 28 |  Score: 9.0
Episode: 29 |  Score: 13.0
Episode: 30 |  Score: 10.0
Episode: 31 |  Score: 35.0
Episode: 32 |  Score: 15.0
Episode: 33 |  Score: 28.0
Episode: 34 |  Score: 21.0
Episode: 35 |  Score: 10.0
Episode: 36 |  Score: 12.0
Episode: 37 | 

In [10]:
    env.close()