In [1]:
import numpy as np
import random
from collections import deque
import gymnasium as gym
import tensorflow as tf
print(tf.__version__)

from tensorflow.keras.models import Sequential, clone_model
from tensorflow.keras.layers import Dense, Activation
from tensorflow.keras.optimizers import Adam

2.14.0


In [2]:
env_name = 'CartPole-v1'
env = gym.make(env_name)

In [3]:
num_observations = env.observation_space.shape[0]
num_actions = env.action_space.n

In [4]:
model = Sequential(
    [Dense(16, input_shape=(1,num_observations)),
    Activation('relu'),
    Dense(32),
    Activation('relu'),
    Dense(num_actions),
    Activation('linear')]
)
print(model.summary())

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 1, 16)             80        
                                                                 
 activation (Activation)     (None, 1, 16)             0         
                                                                 
 dense_1 (Dense)             (None, 1, 32)             544       
                                                                 
 activation_1 (Activation)   (None, 1, 32)             0         
                                                                 
 dense_2 (Dense)             (None, 1, 2)              66        
                                                                 
 activation_2 (Activation)   (None, 1, 2)              0         
                                                                 
Total params: 690 (2.70 KB)
Trainable params: 690 (2.70 

In [5]:
target_model = clone_model(model)

In [6]:
#declare hyperparameters
EPISODES = 1000 
LEARNING_RATE = 0.001 #LEARNING RATE FOR MODEL
GAMMA = 0.95 #DISCOUNT RATE

epsilon = 1.0 #greedy-epsilon
EPSILON_REDUCE = 0.995

replay_buffer = deque(maxlen=20000)
update_target_model = 10

In [7]:
#action selection
def epsilon_greedy_action_selection(model, epsilon, observation):
    
    #Exploitation
    if np.random.random() > epsilon:
        prediction = model.predict(observation,verbose=0)
        action = np.argmax(prediction)
        
    #Exploration
    else:
        action = env.action_space.sample()
        
    return action
    
def replay(replay_buffer,batch_size, model, target_model):
    if len(replay_buffer)< batch_size:
        return
    
    samples = random.sample(replay_buffer, batch_size)
    
    target_batch=[]
    
    zipped_samples = list(zip(*samples))
    states, actions ,rewards, new_states, dones, truncated = zipped_samples
    
    targets = target_model.predict(np.array(states), verbose=0)
    
    q_values = model.predict( np.array(new_states),verbose=0)
    
    for i in range(batch_size):
        q_value = max(q_values[i][0])
    
        target = targets[i].copy()
        if dones[i] or truncated[i]:
            target[0][actions[i]] = rewards[i]
        else:
            target[0][actions[i]] =rewards[i]+q_value * GAMMA
        target_batch.append(target)
    
    model.fit(np.array(states),np.array(target_batch),verbose=False)
# reduce epsilon
def reduce_epsilon(epsilon,epoch):
    return min_epsilon + (max_epsilon-min_epsilon)*np.exp(-decay_rate * epoch)

In [8]:
def update_model_handler(epoch,update_target_model, model, target_model):
    if epoch > 0 and epoch % update_target_model == 0:
        target_model.set_weights(model.get_weights())

In [9]:
model.compile(loss="mse", optimizer=Adam(learning_rate=LEARNING_RATE))



In [10]:
best_so_far = 0

for episode in range(EPISODES):
    observation, _ = env.reset()
    
    observation = observation.reshape(1,4)
    done = False
    truncated = False
    
    points = 0
    
    while not (done or truncated):
        action = epsilon_greedy_action_selection(model,epsilon,observation)
    
        next_observation, reward, done, truncated, info = env.step(action)
        next_observation = next_observation.reshape(1,4)
        
        replay_buffer.append((observation, action, reward, next_observation,done,truncated))
        
        observation = next_observation
        points += 1
        
        replay(replay_buffer,32, model, target_model)
        
    epsilon *= EPSILON_REDUCE
    
    update_model_handler(episode, update_target_model, model,target_model)
    
    if points > best_so_far:
        best_so_far= points
    if episode %25 == 0:
        print("Episode {0:} , Best so far: {1:}".format(episode,best_so_far))
        

Episode 0 , Best so far: 20
Episode 25 , Best so far: 57
Episode 50 , Best so far: 69
Episode 75 , Best so far: 97
Episode 100 , Best so far: 121
Episode 125 , Best so far: 126
Episode 150 , Best so far: 260
Episode 175 , Best so far: 329
Episode 200 , Best so far: 329
Episode 225 , Best so far: 329
Episode 250 , Best so far: 467
Episode 275 , Best so far: 467
Episode 300 , Best so far: 500
Episode 325 , Best so far: 500
Episode 350 , Best so far: 500
Episode 375 , Best so far: 500
Episode 400 , Best so far: 500


KeyboardInterrupt: 

In [28]:
test_env =gym.make("CartPole-v1",render_mode="human")
state, _ = test_env.reset()
point = 0
done = False
truncated = False
for steps in range(600):
    action = np.argmax(model.predict(state.reshape([1,4])))
    
    state, reward, done, truncated , info = test_env.step(action)
    point+= 1
    if done or truncated: 
        print("done")
        print("reward")
        break
        
env.close()







done
reward


In [29]:
print(point)
#maximum point = 500, acheived maximum point

500
