#### requirements

In [None]:

!pip3 install gymnasium
!pip3 install tensorflow
!pip3 install keras

#### imports

In [None]:
#imports
from collections import deque
import numpy as np
import random
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras.models import Sequential , load_model
from tensorflow.keras.layers import Dense , Flatten
from tensorflow.keras.optimizers import Adam

#### Environment

In [None]:
#Environment
env = gym.make("CartPole-v1")

In [None]:
#Observation and action space shape
print(env.observation_space)
print(env.action_space)
obs_shape = env.observation_space.shape[0]

In [None]:
action_space = [0,1]

#### Build Model

In [None]:
def Model():
  #build model
  model = Sequential([
  # Input_shape = (4,) [cart position, cart velocity, pole angle, pole Angular velocity]
  Flatten(input_shape=(obs_shape,)),
  Dense(64,activation="relu"),
  Dense(24,activation="relu"),
  Dense(len(action_space) ,activation="linear")
  ])
  opt = Adam(learning_rate=0.01)

  model.compile(
      optimizer = opt,
      loss = "mean_squared_error"
  )
  return model

# get new models
model = Model()
target_model = Model()

In [None]:
def predict_action(state):

    # returns the Q values for each corresponding action
    action_probs = model(np.expand_dims(state, axis=0) , training=False)
    # Take best action
    actionIndex = tf.argmax(action_probs[0]).numpy()
    return action_space[actionIndex]

#### Training

In [None]:
# training parameter
MEMORY_SIZE     = 50000
GAMMA          = 0.90 # Discount factor for future rewards
EPSILON_MAX     = 1.0  # Initial exploration rate
EPSILON_MIN    = 0.1
EPSILON_DECAY   = 0.0003
BATCH_SIZE      = 32   # Size of batch taken from replay buffer

In [None]:
# function updates/fits the model to simulate updates in Q - values
def update_q_vals(model , replay):
  # Take random samples from the memory
  samples = random.sample(replay, BATCH_SIZE)
  
  input_state = []          # stores current sample states - think of it as training X/Input to the model
  target_q_values = []      # stores updated Q - values - think of it as training Y/required_outputs from the model


  for state, action, next_state, reward, done_sample in samples:
      # get corresponding Q-value of the state
      current_q_value = model(np.expand_dims(state, axis=0) , training=False).numpy()
      current_q_value = current_q_value[0]

      # update the Q-value accordingly
      if done_sample:
          #print(current_q_value[action])
          current_q_value[action] = reward
      else:
          future_reward = target_model(np.expand_dims(next_state, axis=0) , training=False).numpy()
          #print(future_reward[0][tf.argmax(future_reward[0]).numpy()])
          current_q_value[action] = reward + GAMMA *future_reward[0][tf.argmax(future_reward[0]).numpy()]

      # append to the input X and output Y
      input_state.append(state)
      target_q_values.append(current_q_value)
  
  # fit the model to correctly predict Ys for corresponding Xs
  model.fit(np.array(input_state), np.array(target_q_values), epochs=1, verbose=0)

  return model



In [None]:
def train(model=model,
          target_model=target_model,
          starting_state=0,
          epsilon=EPSILON_MAX,
          decay_rate = EPSILON_DECAY,
          starting_episode = 0,
          save_model=50,
          render = False,
          memory_size = MEMORY_SIZE,
          MAX_EPISODE = 300
          ):
  
  # output file
  file = open(f"outputs/output_{starting_episode}.txt" , "w")
  file.write("Start training\n")
  
  #Experience replay buffers - Memory
  replay = deque(maxlen = memory_size)

  # number of steps taken already - default 0
  state_count = starting_state

  # keeps track of final rewards for last 100 episodes
  episode_reward_history = []
  

  epsilon_decay = decay_rate
  
  update_after_actions = 4 # update q values after n steps 
  update_target_network = 20 # updaete target model after n episodes
  
  running_reward = 0
  episode_count = starting_episode


  for episode in range(episode_count , MAX_EPISODE):
    # initial state
    state = np.array(env.reset()[0])
    episode_reward = 0
    done = False


    while not done: #run until episode is finished
      
      # increase steps taken count
      state_count+=1

      if render:
        env.render()

      # predict action
      if np.random.rand() < epsilon :   # randomly
        action = random.choice(action_space)

      else :                            # using model
        action = predict_action(state)


      # take the action
      next_state, reward, done, truncated, info = env.step(action)

      # extra reward for pole Angle being in ±6˚
      if(-0.1048 <= next_state[2] <= 0.1048):
        reward += 0.2*reward
      episode_reward+=reward

      # Decay probability of taking random action
      epsilon -= epsilon_decay
      epsilon = max(epsilon, EPSILON_MIN)

      # update the memory
      replay.append((state, action, next_state, reward, done))

      # preform replay/update model with new Q values
      if state_count % update_after_actions == 0 and len(replay) > BATCH_SIZE:
        model = update_q_vals(model , replay)
      

      state = next_state
    
    #print the final episode reward
    file.write(f"episode_{episode} : {episode_reward}\n")
    file.flush()

    #update reward history
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]

    # running reward = mean over last 100 episodes
    running_reward = np.mean(episode_reward_history)
    
    # update the the target network with new weights
    if episode%update_target_network == 0:
      target_model.set_weights(model.get_weights())
      # Log details
      template = "running reward: {:.2f} at episode {}, state count {}\n"
      file.write(template.format(running_reward, episode, state_count))

    # save model
    if episode%save_model == 0:
      file.write(f"epsilon = {epsilon} , state_count = {state_count} , episode_count = {episode_count}\n")
      model.save('models/trained_{}.h5'.format(episode))

  return model

In [None]:
loaded_model = False
# models/trained_600.h5
load_model_path = ""

# load a saved model and train
if loaded_model:
  model = load_model(load_model_path)
  target_model = load_model(load_model_path)
  epsi = 1  # epsilon
  sc = 0    # state_count
  epc = 0   # episode_count
  model = train(model = model , target_model = target_model , epsilon=epsi , starting_state = sc , starting_episode = epc , MAX_EPISODE= epc+300)

# train new model
else:
  model = train(model = model , target_model = target_model ,MAX_EPISODE = 300)
