In [None]:
%pip install gymnasium[classic-control]
%pip install tensorflow

import numpy as np
import tensorflow as tf
import gymnasium as gym
import os
import random
import matplotlib.pyplot as plt
from keras import regularizers
from keras.optimizers import Adam
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense
from collections import deque

In [None]:
class nnq:
    def __init__(self, env, learning_rate, epsilon):

        self.epsilon = epsilon
        self.learning_rate = learning_rate
        self.net = self.network()

    def network(self):

        model = Sequential() #input_shape = 2 -> position and speed

        model.add(Dense(24, input_shape=env.observation_space.shape, activation='relu',kernel_initializer='he_uniform'))
        model.add(Dense(36, activation='relu',kernel_initializer='he_uniform'))
        model.add(Dense(24, activation='relu',kernel_initializer='he_uniform'))
        model.add(Dense(3, activation='linear',kernel_initializer='he_uniform'))

        opt = tf.optimizers.Adam(learning_rate=tf.keras.optimizers.schedules.ExponentialDecay(self.learning_rate, decay_steps=300, decay_rate=0.96, staircase=True))
        model.compile(optimizer=opt, loss='mse', metrics=["mse"])

        return model


    def best_action(self,state):
        best_action_to_do = self.net(np.array([state]))
        return np.argmax(best_action_to_do[0], axis=0) # use trained network to take best action


    def action(self, state):
        if random.random() > self.epsilon :
            return self.best_action(state)
        else:
             return np.random.choice(3)

    def replay_buffer(self, buffer, batch):

        buffer_batch = random.sample(buffer, batch)

        state = np.array([i[0] for i in buffer_batch])
        action = np.array([i[1] for i in buffer_batch])
        next_state = np.array([i[2] for i in buffer_batch])
        reward = np.array([i[3] for i in buffer_batch])
        terminate = np.array([i[4] for i in buffer_batch])

        current_reward = self.net(state) #actual rewards
        #target = np.zeros((state.shape[0],3))
        target = np.copy(current_reward)

        next_reward = self.net(next_state) #future rewards
        max_next_reward = np.amax(next_reward, axis=1) #max reward in future reward (after three actions)

        for e in range(state.shape[0]):
            target[e][action[e]] = reward[e] + 0.99 * (1 - terminate[e]) * max_next_reward[e]

        self.net.fit(x=state, y=target, epochs=1,verbose=0)
        self.learning = self.net.optimizer.learning_rate.numpy()


    def decresing_epsilon(self):
        if self.epsilon > 0.3 :
          self.epsilon = 95 * self.epsilon / 100
        else :
          self.epsilon = 995 * self.epsilon / 1000

        # to have always a bit of randomness
        if self.epsilon < 0.001 :
          self.epsilon = 0.009


    def save(self, episode):
        self.net.save(f'./nn/nn-{episode}.h5')


    def load(self, episode):
        self.net = load_model(f'./nn/nn-{episode}.h5')


In [None]:
episodes = 3500
learning_rate = 0.01
epsilon = 1.0
batch = 64

reward_single_episode = deque(maxlen=100) # my goal is terminate in 100 steps
win_episode = 0
episode_result=[]
reward_result=[]

if not os.path.exists("nn/"):
  os.makedirs("nn/")

env = gym.make('MountainCar-v0')
nnq_class = nnq(env, learning_rate, epsilon)

#print(env.action_space)  ->  Discrete(3)
#print(env.observation_space)  ->   Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
#print(env.observation_space.shape)  ->  (2,)

buffer = deque(maxlen=20000)


for episode in range(1, episodes):
  state, _ = env.reset()
  terminate, truncate = False, False

  for step in range(1, 201): #200 step to win

    action = nnq_class.action(state)
    next_state, reward, terminate, truncate, _ = env.step(action)

    buffer.append((state, action, next_state, reward, terminate))
    state = next_state

    if step % 20 == 0 :
      if len(buffer) > batch :
          nnq_class.replay_buffer(buffer, batch)

    if terminate or truncate:

        reward_single_episode.append(step * -1)
        average_reward = sum(reward_single_episode) / len(reward_single_episode)
        episode_result.append(episode)
        reward_result.append(average_reward)

        print(f"Episode: {episode} - Average Reward: {average_reward:.4f}   ----   epsilon(randomness): {nnq_class.epsilon:.4f} - learning rate: {nnq_class.learning_rate:.4f}")
        break

  if episode % 200 == 0 :
    nnq_class.net.save(f'./nn/nn-{episode}.h5')

    plt.plot(episode_result, reward_result)
    plt.xlabel('Episodes')
    plt.ylabel('Average Reward')
    plt.ylim(-200, None)
    plt.show()

  if episode == 1200:
    batch = 32
  if episode == 2500:
    batch = 16

  if episode % 5 == 0 :
    nnq_class.decresing_epsilon()


In [None]:
env = gym.make('MountainCar-v0')
load_file = "200"
average_reward, win_episode = [], 0
learning_rate=0.01
epsilon= 1.0
episodes = 20

nnq_class = nnq(env, learning_rate, epsilon)
nnq_class.net = load_model(f'./nn_model/nn7.h5')


for episode in range(episodes):

    state, _ = env.reset()
    terminate, truncate, episode_reward = False, False, 0.0

    while not terminate and not truncate:
      action = nnq_class.best_action(state)

      next_state, reward, terminate, truncate, _ = env.step(action)
      episode_reward += reward
      state = next_state

      if next_state[0] >= 0.5:
        win_episode += 1

    average_reward.append(episode_reward)
    print(f"Episode: {episode} - Episode reward: {episode_reward:.2f}")


mean = sum(average_reward) / len(average_reward)
accuracy = win_episode / episodes

print(f"\n\nAverage Reward: {mean:.2f}, Accuracy {accuracy:.2f}\n")