In [None]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import numpy as np

env = gym.make('CartPole-v1')

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
def create_model(num_observations, num_hidden, num_actions):
    observations = layers.Input(shape=(num_observations,))
    hidden = layers.Dense(num_hidden, activation='relu')(observations)
    action = layers.Dense(num_actions, activation='softmax')(hidden)
    
    return keras.Model(inputs=observations, outputs=action)
    
class Agent:

    def __init__(self):
        self.num_observations = 4
        self.num_hidden = 128
        self.num_actions = 2

        self.batch_size = 128
        self.max_memory_length = 10000

        self.epsilon = 0.99
        self.epsilon_decay = 0.99
        self.gamma = 0.7
        
        self.action_history = []
        self.state_history = []
        self.state_next_history = []
        self.rewards_history = []
        self.done_history = []
        self.episode_reward_history = []
        self.running_reward = 0
        self.episode_count = 0
        
        self.loss_function = keras.losses.Huber()
        self.optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
    
        self.policy_net = create_model(self.num_observations, self.num_hidden, self.num_actions)
        self.target_net = create_model(self.num_observations, self.num_hidden, self.num_actions)

    def take_action(self, state):
        if self.epsilon > np.random.rand(1)[0]:
            action = np.random.choice(self.num_actions)
        else:
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = self.policy_net(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()
    
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)
            
        self.epsilon *= self.epsilon_decay
    
        self.action_history.append(action)
        self.state_history.append(state)
        self.state_next_history.append(state_next)
        self.done_history.append(done)
        self.rewards_history.append(reward)
        
        state = state_next

        return state, reward, done

    def get_samples(self):
        indices = np.random.choice(range(len(agent.done_history)), size=agent.batch_size)

        state_sample = np.array([  self.state_history[i] for i in indices])
        state_next_sample = np.array([  self.state_next_history[i] for i in indices])
        rewards_sample = [  self.rewards_history[i] for i in indices]
        action_sample = [  self.action_history[i] for i in indices]
        done_sample = tf.convert_to_tensor(
            [float(  self.done_history[i]) for i in indices]
        )

        return state_sample, state_next_sample, rewards_sample, action_sample, done_sample

    def train_model(self, state_sample, state_next_sample, rewards_sample, action_sample, done_sample):
        future_rewards = self.target_net.predict(state_next_sample)
        updated_q_values = rewards_sample + self.gamma * tf.reduce_max(
            future_rewards, axis=1
        )
        updated_q_values = updated_q_values * (1 - done_sample) - done_sample
        masks = tf.one_hot(action_sample, self.num_actions)

        with tf.GradientTape() as tape:
            q_values = self.policy_net(state_sample)
            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
            loss = self.loss_function(updated_q_values, q_action)

        grads = tape.gradient(loss, self.policy_net.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.policy_net.trainable_variables))

In [None]:
agent = Agent()

for i in range(2000):
    
    state = np.array(env.reset()[0])
    episode_reward = 0

    for timestep in range(100):
        state, reward, done = agent.take_action(state)
        episode_reward += reward

        if len(agent.done_history) > agent.batch_size:
            state_sample, state_next_sample, rewards_sample, action_sample, done_sample = agent.get_samples()
            agent.train_model(state_sample, state_next_sample, rewards_sample, action_sample, done_sample)
            
        if timestep % 10000 == 0:
            agent.target_net.set_weights(agent.policy_net.get_weights())
            template = "running reward: {:.2f} at episode {}"
            print(template.format(agent.running_reward, agent.episode_count))

        if len(agent.rewards_history) > agent.max_memory_length:
            del agent.rewards_history[:1]
            del agent.state_history[:1]
            del agent.state_next_history[:1]
            del agent.action_history[:1]
            del agent.done_history[:1]
            
        if done:
            break
            
    agent.episode_reward_history.append(episode_reward)
    if len(agent.episode_reward_history) > 100:
        del agent.episode_reward_history[:1]
    agent.running_reward = np.mean(agent.episode_reward_history)

    agent.episode_count += 1

    if agent.running_reward > 40:  
        print("Solved at episode {}!".format(agent.episode_count))
        break