In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

In [2]:
def create_network(input_dim, output_dim, seed=0):
    # create a neural network with 2 hidden layers
    # set seed
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    
    network = nn.Sequential(
        nn.Linear(input_dim, 36),
        nn.ReLU(),
        nn.Linear(36, 48),
        nn.ReLU(),
        nn.Linear(48, output_dim)
    )
    return network

In [3]:
env = gym.make('MountainCar-v0', render_mode=None)

gamma = 0.99

input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n

epsilon = 1
epsilon_decay = 0.995
epsilon_min = 0.01

learing_rate = 0.001

replay_buffer = deque(maxlen=10000)

n_episodes = 1200

n_steps = 200

batch_size = 50

train_network = create_network(input_dim, output_dim)
target_network = create_network(input_dim, output_dim)

target_network.load_state_dict(train_network.state_dict())

criterion = nn.MSELoss()
optimizer = optim.Adam(train_network.parameters(), lr=learing_rate)

In [None]:
def get_action(states, seed=None):
    # set the seed
    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)

    # get the right value of epsilon
    global epsilon
    epsilon = max(epsilon_min, epsilon)

    # if the state is a 1D array, convert it to a tensor and get the action
    if states.ndim == 1:
        # epsilon greedy
        if np.random.rand() < epsilon:
            return np.random.randint(0, output_dim)
        else:
            return torch.argmax(train_network(torch.tensor(states).float())).item()
    
    # if the state is a 2D array, convert it to a tensor and get the actions
    else:
        # epsilon greedy
        randoms = np.random.rand(states.shape[0])
        actions = np.zeros(states.shape[0])

        actions[randoms < epsilon] = np.random.randint(0, output_dim, size=(randoms < epsilon).sum())
        actions[randoms >= epsilon] = torch.argmax(train_network(states[torch.tensor(randoms >= epsilon)]).float()).item()

        return actions

In [None]:
def train():
    # if the replay buffer is not full enough, do not train
    if len(replay_buffer) < batch_size:
        return

    # sample a minibatch from the replay buffer
    minibatch = random.sample(replay_buffer, batch_size)

    # initialize lists to store the states, actions, rewards, next_states, and dones
    states = []
    next_states = []
    rewards = []
    dones = []
    actions = []

    # unpack minibatch
    for state, action, reward, next_state, done in minibatch:
        states.append(state)
        next_states.append(next_state)
        rewards.append(reward)
        dones.append(done)
        actions.append(action)

    # because of the warning saying that it's better to convert the list to a numpy array before converting it to a tensor
    states = np.array(states, dtype=np.float32).reshape(batch_size, input_dim)
    next_states = np.array(next_states, dtype=np.float32).reshape(batch_size, input_dim)
    actions = np.array(actions, dtype=np.float32).reshape(batch_size, 1)
    rewards = np.array(rewards, dtype=np.float32).reshape(batch_size)
    dones = np.array(dones, dtype=np.float32).reshape(batch_size)

    # convert to tensor
    states = torch.tensor(states)
    next_states = torch.tensor(next_states)
    actions = torch.tensor(actions)
    rewards = torch.tensor(rewards)
    dones = torch.tensor(dones)

    # Q(s, a) with epsilon greedy
    q_values = train_network(states).gather(1, actions.long()).reshape(batch_size)

    # Q(s', a') with max a' of Q(s', a')
    next_q_values = torch.max(target_network(next_states), 1)[0].detach()

    # Q(s, a) = r + gamma * Q(s', a')
    target_q_values = rewards + gamma * next_q_values * (1 - dones)

    # update the Q values
    loss = criterion(q_values, target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
def original_try(state, episode):
    # initialize the cumulative reward
    cumulative_reward = 0

    # initialize the max position reached by the car (initially it's negative infinity)
    max_position = -np.inf

    # iterate over the number of steps
    for step in range(n_steps):
        # if state is a tuple, take it's first element
        if isinstance(state, tuple):
            state = state[0]

        # get the action
        action = get_action(state, seed=episode*n_steps + step)
        
        # # render the environment every 50 episodes
        # if episode % 50 == 0:
        #     env.render()

        # take a step in the environment
        next_state, reward, terminated, truncated, _ = env.step(action)

        # check if the episode has ended
        done = terminated or truncated

        # update max position
        if next_state[0] > max_position:
            max_position = next_state[0]

        # # give reward for reaching a new maximum position
        # reward += 0.5 # + next_state[0]

        # append the state, action, reward, next_state, and done to the replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        # train the model
        train()

        # update the cumulative reward
        cumulative_reward += reward

        # break if the episode has ended
        if done:
            print(f"Episode {episode} finished after {step} steps with cumulative reward {cumulative_reward}")
            break

        state = next_state

    # update the target network after every episode
    target_network.load_state_dict(train_network.state_dict())

    # decay the epsilon
    global epsilon
    epsilon *= epsilon_decay

In [None]:
def train_model():
    # iterate over the number of episodes
    for episode in range(n_episodes):
        # reset the environment 
        state = env.reset(seed=episode)
        # train the model
        original_try(state, episode)

In [None]:
train_model()

In [5]:
#load model
model = torch.load("mountain_car_model_36_48.pth")

In [6]:
target_network.load_state_dict(model)

<All keys matched successfully>

In [None]:
env_test = gym.make('MountainCar-v0', render_mode=None) # render_mode='human' for visualization

seeds = [42 + i for i in range(100)]

steps = []
fails = 0

for seed in seeds:
    observation, _ = env_test.reset(seed=seed)
    total_reward = 0
    
    for i in range(200):
        env_test.render()
        q_values = target_network(torch.tensor(observation).float())
        # print(q_values)
        action = torch.argmax(q_values).item()
        observation, reward, terminated, truncated, info = env_test.step(action)
        total_reward += reward
        # print(i, observation, reward, terminated, truncated, info, action)
        if terminated or truncated:
            break

    print(f"Episode with seed={seed} finished after {i} steps and {total_reward} reward")
    if i == 199:
        fails += 1

    steps.append(i)

print(f"\nAverage steps: {np.mean(steps)}")
print(f"Fails: {fails}")

env_test.close()

In [None]:
# save the model
torch.save(target_network.state_dict(), "mountain_car_model_36_48.pth")