In [None]:
"""uncomment it in case gynmasium is not present"""

# %pip install gymnasium[all]
# %pip install shimmy
# %pip install "gymnasium[atari, accept-rom-license]"
# %pip install ale_py

"""uncomment it to check if Pong-v0 and others are present or not"""
# # To Check if all modules are working fine 
# import ale_py
# import gymnasium
# import shimmy

# import gymnasium as gym

# req = set(['MountainCar-v0','Pong-v0','CartPole-v0','LunarLander-v2'])
# keys = set(gym.envs.registry.keys())
# for key in req:

#   if(keys.intersection({key}) == set({})):
#       print(key +' not found')   


import gymnasium as gym
import numpy as np
import shimmy #it is compulsory to import
# import ale_py #it's too

env_1 = gym.make("MountainCar-v0")

print('\n Environment : MountainCar-v0')
 # Print state and action space
print("State space:", env_1.observation_space)
print("Action space:", env_1.action_space)
state, _ = env_1.reset()
done = False
total_reward = 0
step_count = 0
rew_1 = {}
print("Running random agent...")


while not done:
    action = env_1.action_space.sample()  # Random action
    state, reward, done, _, _ = env_1.step(action)
    total_reward += reward
    if reward not in rew_1:
        rew_1[reward] = 0
    rew_1[reward] += 1
    step_count += 1

print(f"Total reward after {step_count} steps: {total_reward}\n")

# Close the environment
env_1.close()
print(f'These are the Rewards(and respective frequencies) after experienting with Random Agent : \n {rew_1}')



 Environment : MountainCar-v0
State space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
Action space: Discrete(3)
Running random agent...
Total reward after 21342 steps: -21342.0

These are the Rewards(and respective frequencies) after experienting with Random Agent : 
 {-1.0: 21342}


In [7]:
import cv2

# import time
import json
import random
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm

from collections import deque

ENVIRONMENT = "MountainCar-v0"

# DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE = 'cpu'

SAVE_MODELS = True  # Save models to file so you can test later
MODEL_PATH = "./MountainCar-"  # Models path for saving or loading
SAVE_MODEL_INTERVAL = 200 # Save models at every X epoch
TRAIN_MODEL = True  # Train model while playing (Make it False when testing a model)

LOAD_MODEL_FROM_FILE = False  # Load model from file
LOAD_FILE_EPISODE = 0  # Load Xth episode from file

BATCH_SIZE = 64  # Minibatch size that select randomly from mem for train nets
MAX_EPISODE = 10000  # Max episode
MAX_STEP = 1000  # Max step size for one episode

MAX_MEMORY_LEN = 100000  # Max memory len
MIN_MEMORY_LEN = 10000  # Min memory len before start train 

GAMMA = 0.999  # Discount rate
ALPHA = 0.005  # Learning rate
EPSILON_DECAY = 0.999  # Epsilon decay rate by step


RENDER_GAME_WINDOW = False # Opens a new window to render the game (Won't work on colab default)

In [8]:
class DQNMountain(nn.Module):
    def __init__(self,action_space):
        super(DQNMountain,self).__init__()
        self.fun = nn.Sequential(
            nn.Linear(2,16),
            nn.SiLU(),
            nn.Linear(16,action_space)
        )
    def forward(self,x):
        return self.fun(x)


In [9]:
class Agent:
    def __init__(self, environment):
        """
        Hyperparameters definition for Agent
        """

        # Activation size for breakout env. Used as output size in network
        self.action_size = environment.action_space.n

        # Trust rate to our experiences
        self.gamma = GAMMA  # Discount coef for future predictions
        self.alpha = ALPHA  # Learning Rate

        self.epsilon = 1  # Explore or Exploit
        self.epsilon_decay = EPSILON_DECAY  # Adaptive Epsilon Decay Rate
        self.epsilon_minimum = 0.05  # Minimum for Explore

        self.memory = deque(maxlen=MAX_MEMORY_LEN)

        # Create two model for DDQN algorithm
        self.online_model = DQNMountain(self.action_size).to(DEVICE)
        self.target_model = DQNMountain(self.action_size).to(DEVICE) 
        self.target_model.load_state_dict(self.online_model.state_dict())
        self.target_model.eval()

        # Adam used as optimizer
        self.optimizer = optim.Adam(self.online_model.parameters(),lr=self.alpha,betas=(0.9, 0.999),weight_decay=1e-5)


    def act(self, state):
       

        act_protocol = 'Explore' if random.uniform(0, 1) <= self.epsilon else 'Exploit'

        if act_protocol == 'Explore':
            action = random.randrange(self.action_size)
        else:
            with torch.no_grad():
                state = torch.tensor(state, dtype=torch.float, device=DEVICE).unsqueeze(0)
                q_values = self.online_model.forward(state)  # (1, action_size)
                action = torch.argmax(q_values).item()  # Returns the indices of the maximum value of all elements

        return action

    def train(self):
        """
        Train neural nets with replay memory
        returns loss and max_q val predicted from online_net
        """
        if len(self.memory) < MIN_MEMORY_LEN:
            loss, max_q = [0, 0]
            return loss, max_q
        # We get out minibatch and turn it to numpy array
        state, action, reward, next_state, done = zip(*random.sample(self.memory, BATCH_SIZE))

        # Concat batches in one array
        # (np.arr, np.arr) ==> np.BIGarr
        state = np.concatenate(state)
        next_state = np.concatenate(next_state)

        # Convert them to tensors
        state = torch.tensor(state, dtype=torch.float, device=DEVICE)
        next_state = torch.tensor(next_state, dtype=torch.float, device=DEVICE)
        action = torch.tensor(action, dtype=torch.long, device=DEVICE)
        reward = torch.tensor(reward, dtype=torch.float, device=DEVICE)
        done = torch.tensor(done, dtype=torch.float, device=DEVICE)
        # print(state.shape)
        # Make predictions
        state_q_values = self.online_model(state)
        next_states_q_values = self.online_model(next_state)
        next_states_target_q_values = self.target_model(next_state)

        # Find selected action's q_value
        selected_q_value = state_q_values.gather(1, action.unsqueeze(1)).squeeze(1)
        next_states_target_q_value = next_states_target_q_values.gather(1, next_states_q_values.max(1)[1].unsqueeze(1)).squeeze(1)
        # Use Bellman function to find expected q value
        expected_q_value = reward + self.gamma * next_states_target_q_value * (1 - done)
        criterion = torch.nn.MSELoss()
        loss = criterion(selected_q_value, expected_q_value)


        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss, torch.max(state_q_values).item()

    def storeResults(self, state, action, reward, nextState, done):
        """
        Store every result to memory
        """
        self.memory.append([state[None, :], action, reward, nextState[None, :], done])

    def adaptiveEpsilon(self):
        """
        Adaptive Epsilon means every step
        we decrease the epsilon so we do less Explore
        """
        if self.epsilon > self.epsilon_minimum:
            self.epsilon *= self.epsilon_decay

In [10]:
import matplotlib.pyplot as plt
environment = gym.make(ENVIRONMENT)  # Get env
agent = Agent(environment)  # Create Agent
l = []
if LOAD_MODEL_FROM_FILE:
    agent.online_model.load_state_dict(torch.load(MODEL_PATH+str(LOAD_FILE_EPISODE)+".pth"))

    with open(MODEL_PATH+str(LOAD_FILE_EPISODE)+'.json') as outfile:
        param = json.load(outfile)
        agent.epsilon = param.get('epsilon')

    startEpisode = LOAD_FILE_EPISODE + 1

else:
    startEpisode = 1

last_100_ep_reward = deque(maxlen=100)  
total_step = 1
avg_reward_list = []
avg_reward = 0
best_avg_reward=-1000 

for episode in tqdm(range(startEpisode, MAX_EPISODE),colour='green'):

    state,_ = environment.reset()  # Reset env
   

    total_max_q_val = 0
    total_reward = 0 
    # total_loss = 0  
    done = False
    while not done and total_reward>-1000:

        if RENDER_GAME_WINDOW:
            environment.render()

        # Select and perform an action
        action = agent.act(state)  # Act
        next_state, reward, done,_,_ = environment.step(action)  
        
        
        agent.storeResults(state, action, reward, next_state, done)  

        # Move to the next state
        state = next_state  # Update state

        if TRAIN_MODEL:
            loss, max_q_val = agent.train()  
        else:
            loss, max_q_val = [0, 0]

        # total_loss += loss
        total_max_q_val += max_q_val
        total_reward += reward
        total_step += 1
        if total_step % 1000 == 0:
            agent.adaptiveEpsilon()  

        if SAVE_MODELS and episode % SAVE_MODEL_INTERVAL == 0: 
            weightsPath = MODEL_PATH + str(episode) + '.pth'
            epsilonPath = MODEL_PATH + str(episode) + '.json'

            torch.save(agent.online_model.state_dict(), weightsPath)
            with open(epsilonPath, 'w') as outfile:
                json.dump({'epsilon': agent.epsilon}, outfile)

        if TRAIN_MODEL and total_step%10000==0:
            agent.target_model.load_state_dict(agent.online_model.state_dict())  

        last_100_ep_reward.append(total_reward)

        # if(episode%100==0):
        
        avg_max_q_val = total_max_q_val
        
    l.append(total_reward)

    avg_reward_list.append(np.mean(np.array(list(last_100_ep_reward))))

    if episode % 100 == 0:
        plt.plot(l)
        plt.xlabel('Episode')
        plt.ylabel('Tot_rew')
        plt.title(f'Plot at Episode {episode}')
        plt.savefig(f'Mountain_carv0.png') 
        plt.close()

        plt.plot(avg_reward_list)
        plt.xlabel('Episode')
        plt.ylabel('Tot_rew')
        plt.title(f'best avg {episode}')
        plt.savefig(f'Mountain_carv0_best_avg.png') 
        plt.close()

        
            

  1%|[32m          [0m| 68/9999 [01:24<3:25:56,  1.24s/it]


KeyboardInterrupt: 

In [None]:

env = gym.make('MountainCar-v0',render_mode = 'human')
state,_ = env.reset()
model = DQNMountain(3)
model.load_state_dict(torch.load('MountainCar-6000.pth'))
model.eval()
tore = 0
while not done:
    action = model(torch.FloatTensor(state)).argmax().item()  # Random action
    state, reward, done, _, _ = env.step(action)
    tore += reward
env.close()

    

  model.load_state_dict(torch.load('MountainCar-6000.pth'))


In [None]:
env2 = gym.make('MountainCar-v0', render_mode='human')

test_dqn = DQNMountain(3) 
test_dqn.load_state_dict(torch.load(r"moun_best\MountainCar-6000.pth"))
test_dqn.eval() 


for i in range(5):
    state = env2.reset()[0]  # Initialize to state 0
    terminated = False      # True when agent falls in hole or reached goal
    truncated = False       # True when agent takes more than 200 actions            

    rewards = 0

    # Agent navigates map until it falls into a hole (terminated), reaches goal (terminated), or has taken 200 actions (truncated).
    while(not terminated and rewards>-300):  
        # Select best action   
        with torch.no_grad():
            action = test_dqn(torch.tensor(state)).argmax().item()

        # Execute action
        state,reward,terminated,truncated,_ = env2.step(action)
        rewards += reward

    print(rewards)

env2.close()

  test_dqn.load_state_dict(torch.load(r"moun_best\MountainCar-6000.pth"))


-176.0
-164.0
-250.0
-259.0
-251.0
