In [11]:
import pandas as pd
import numpy as np
from keras.layers import  Dense, Input
from keras.optimizers import  Adam
from keras.models import  Sequential
from keras.losses import mean_squared_error
from tensorflow import gather_nd, GradientTape
import gym

In [12]:
GAMMA = .995 # Discount factor
TAU = .001
EPSILON_MIN = 0.05
ALPHA = .001

In [13]:
env = gym.make("CartPole-v1",render_mode="rgb_array")

actions_size = env.action_space.n
state_size = env.observation_space.shape

In [14]:
Q_network = Sequential([
  Input(shape=state_size),
  Dense(128,activation="relu"),
  Dense(128,activation="relu"),
  Dense(64,activation="relu"),
  Dense(64,activation="relu"),
  Dense(48,activation="relu"),
  Dense(82,activation="relu"),
  Dense(actions_size,activation="linear"),
])
Q_star_network = Sequential([
  Input(shape=state_size),
  Dense(128,activation="relu"),
  Dense(128,activation="relu"),
  Dense(64,activation="relu"),
  Dense(64,activation="relu"),
  Dense(48,activation="relu"),
  Dense(82,activation="relu"),
  Dense(actions_size,activation="linear"),
])

optimizer = Adam(learning_rate=ALPHA)

In [15]:
def initialze_memory():  #! this function in specific is for readability and convinence
                  #! and I strongly advise to not consider it as given.
  df = pd.DataFrame(columns=["states","rewards","actions","dones","next_states"])
  df["states"] = df["states"].astype(dtype=object)
  df["next_states"] = df["next_states"].astype(dtype=object)
  return df 
def arrange_data(df):
  states = np.stack(df["states"])
  rewards = np.stack(df["rewards"])
  actions = np.stack(df["actions"])
  dones = np.stack(df["dones"])
  next_states = np.stack(df["next_states"])
  return states,rewards,actions,dones,next_states

def take_action(Q_values, epsilon):
  if np.random.random() > epsilon:
    return np.argmax(Q_values)
  else:
    return np.random.randint(0, Q_values.numpy().shape[1])

def update_epsilon(epsilon,epilon_min,decrease_rate):
  epsilon -= decrease_rate
  return max(epilon_min, epsilon)

def compute_loss(df, gamma,Q_network, Q_star_network):
  states,rewards,actions,dones,next_states = arrange_data(df)
  m = actions.shape[0]
  Q_star_values = Q_star_network(next_states)
  max_Q_star_values = np.amax(Q_star_values,axis=-1)
  y_target = rewards + gamma  * max_Q_star_values * (1 - dones)
  
  Q_values = Q_network(states)
  
  indices = np.stack([np.arange(m),actions]).T
  
  best_Q_values = gather_nd(Q_values, indices)
  
  loss = mean_squared_error(y_target, best_Q_values)
  
  return loss

def train_agent(memory_sample, gamma, tau, Q_network, Q_star_network, optimizer):
  with GradientTape() as tape:
      loss = compute_loss(memory_sample, gamma, Q_network, Q_star_network)
      gradients = tape.gradient(loss, Q_network.trainable_variables)
      optimizer.apply_gradients(zip(gradients, Q_network.trainable_variables))
      
      for target_weights, q_net_weights in zip(
      Q_star_network.weights, Q_network.weights):
          target_weights.assign(tau * q_net_weights + (1.0 - tau) * target_weights)

In [16]:
episodes = range(1,2000+1)
episode_max_size = 1000
epsilon = 1
update_rate = 4
sample_size = 64
scores = []
memory_index, memory = 0, initialze_memory()

In [17]:
for episode in episodes:
  state = env.reset()[0]
  score = 0
  for j in range(episode_max_size):
    Q_values = Q_network(np.expand_dims(state,axis=0))
    action = take_action(Q_values, epsilon)
    next_state, reward, done, _, _ = env.step(action)
    score+=reward
    
    memory.loc[memory_index] = [None,reward,action,done,None]
    memory.at[memory_index, "states"] = np.array(state)
    memory.at[memory_index, "next_states"] = np.array(next_state)
    memory_index+=1
    
    state = next_state.copy()
    if not memory_index%update_rate and len(memory) >= sample_size:
      memory_sample = memory.sample(sample_size)
      train_agent(memory_sample, GAMMA, TAU, Q_network, Q_star_network, optimizer)
    
    if done or j >= episode_max_size:
      break
  
  scores.append(score)
  epsilon = update_epsilon(epsilon,EPSILON_MIN, .005)
  
  if not episode%100:
    print(f"episode:{episode} average score:{np.average(scores[-100:])}")

  if not isinstance(terminated, (bool, np.bool8)):


episode:100 average score:17.69
episode:200 average score:12.77
episode:300 average score:12.6
episode:400 average score:12.12
episode:500 average score:11.38
episode:600 average score:11.45
episode:700 average score:11.41
episode:800 average score:12.98
episode:900 average score:12.19
episode:1000 average score:15.34
episode:1100 average score:28.37
episode:1200 average score:73.08
episode:1300 average score:186.02
episode:1400 average score:275.26
episode:1500 average score:230.0
episode:1600 average score:172.38
episode:1700 average score:183.15
episode:1800 average score:205.44


KeyboardInterrupt: 

In [None]:
# create_video("./cart-pole.mp4",env, Q_network,5,60)