In [4]:
import tensorflow as tf
import numpy as np
import gym

env = gym.make('CartPole-v0')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

class Actor:
    def __init__(self, state_dim, action_dim, learning_rate):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.learning_rate = learning_rate

        self.model = self.create_model()

        self.optimizer = tf.keras.optimizers.Adam(self.learning_rate)

    def create_model(self):
        return tf.keras.Sequential([
            tf.keras.layers.Dense(24, activation='relu', input_shape=(self.state_dim,)),
            tf.keras.layers.Dense(24, activation='relu'),
            tf.keras.layers.Dense(self.action_dim, activation='softmax')
        ])

    def predict(self, state):
        return self.model.predict(state)

    def train(self, states, actions, advantages):
        with tf.GradientTape() as tape:
            probabilities = self.model(states)
            actions_one_hot = tf.one_hot(actions, depth=self.action_dim)
            action_probabilities = tf.reduce_sum(actions_one_hot * probabilities, axis=1)
            log_probabilities = tf.math.log(action_probabilities)
            loss = -tf.reduce_mean(log_probabilities * advantages)

        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

class Critic:
    def __init__(self, state_dim, learning_rate):
        self.state_dim = state_dim
        self.learning_rate = learning_rate

        self.model = self.create_model()

        self.optimizer = tf.keras.optimizers.Adam(self.learning_rate)

    def create_model(self):
        return tf.keras.Sequential([
            tf.keras.layers.Dense(24, activation='relu', input_shape=(self.state_dim,)),
            tf.keras.layers.Dense(24, activation='relu'),
            tf.keras.layers.Dense(1)
        ])

    def predict(self, state):
        return self.model.predict(state)

    def train(self, states, td_targets):
        with tf.GradientTape() as tape:
            values = self.model(states)
            loss = tf.reduce_mean(tf.square(td_targets - values))

        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

class DDPG:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.actor_learning_rate = 0.001
        self.critic_learning_rate = 0.01
        self.gamma = 0.99

        self.actor = Actor(self.state_dim, self.action_dim, self.actor_learning_rate)
        self.critic = Critic(self.state_dim, self.critic_learning_rate)

    def get_action(self,state):
      prob=self.actor.predict(state)[0]
      action=np.random.choice(self.action_dim,p=prob)
      return action

    def train(self, states, actions,rewards,next_states,dones):
      values=self.critic.predict(states)
      next_values=self.critic.predict(next_states)
      td_targets=rewards+self.gamma*next_values*(1-dones)
      advantages=td_targets-values
      actor_loss=self.actor.train(states,np.array(actions),advantages)
      critic_loss=self.critic.train(states,np.array(td_targets))

ddpg=DDPG(state_dim=state_dim ,action_dim=action_dim)

num_episodes=1000
total_rewards=[]

for episode in range(num_episodes):
  state=env.reset()
  done=False
  episode_reward=0
  while not done:
    action=ddpg.get_action(state.reshape(1,-1))
    next_state,reward,done,_=env.step(action)
    ddpg.train(state.reshape(1,-1),action,reward,next_state.reshape(1,-1),done)
    episode_reward+=reward
    state=next_state
  total_rewards.append(episode_reward)
  print('Episode = ', episode, ' | Reward = ', episode_reward)

import matplotlib.pyplot as plt
plt.plot(total_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()
plt.savefig('DDPG_Cartpole.png')

  and should_run_async(code)


In [5]:
# Run the environment for at least 10 episodes using only greedy actions from the learned policy and plot the total reward per episode.
num_episodes_greedy=10
total_rewards_greedy=[]

for episode in range(num_episodes_greedy):
  state=env.reset()
  done=False
  episode_reward=0
  while not done:
    prob=ddpg.actor.predict(state.reshape(1,-1))[0]
    action=np.argmax(prob)
    next_state,reward,done,_=env.step(action)
    episode_reward+=reward
    state=next_state
  total_rewards_greedy.append(episode_reward)

plt.plot(total_rewards_greedy)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()
plt.savefig('DDPG_Cartpole_Testing.png')