In [17]:
# Necessary Imports
import os
import random
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

In [13]:
# DQN pytorch based model
class Network(nn.Module):

    def __init__(self,state_size,action_size,seed = 42):
        super(Network,self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64,64)
        self.fc3 = nn.Linear(64,action_size)

    def forward(self,state):
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)




In [6]:
#importing environment from openAI
env = gym.make('LunarLander-v3')
# print('State',env.observation_space)
# print('action',env.action_space)
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n

In [46]:
learning_rate = 5e-4
minibatch_size = 100
discount_factor = 0.985
replay_buffer_size = int(1e5)
interpolation_parameter = 1e-3


In [47]:
# Replay Buffer model - first step
# used self.device so that i can run it on my laptop and on google colab both
class ReplayMemory(object):

    def __init__(self,capacity):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity
        self.memory = []

    def push(self,event):
        self.memory.append(event)
        if len(self.memory) > self.capacity:
            del self.memory[0]

    def sample(self, batch_size):
        experiences = random.sample(self.memory, k=batch_size)

        states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
        return states, next_states, actions, rewards, dones



In [51]:
class Agent():

    def __init__(self,state_size,action_size):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = Network(state_size,action_size).to(self.device)
        self.target_qnetwork = Network(state_size,action_size).to(self.device)
        self.optimizer = optim.Adam(self.local_qnetwork.parameters(),lr = learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0

    def step(self,state,action,reward,next_state,done):
        self.memory.push((state,action,reward,next_state,done))
        self.t_step = (self.t_step+1)%4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)
                self.learn(experiences,discount_factor)
    def act(self,state,epsilon = 0.):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
            self.local_qnetwork.train()
            if random.random() > epsilon:
                return np.argmax(action_values.cpu().data.numpy())
            else:
                return random.choice(np.arange(self.action_size))
    def learn(self,experiences,discount_factor):
        states,next_states,actions,rewards,dones = experiences
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + discount_factor*next_q_targets*(1-dones)
        q_expected = self.local_qnetwork(states).gather(1,actions)
        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.local_qnetwork,self.target_qnetwork,interpolation_parameter)
    def soft_update(self,local_model,target_model,interpolation_parameter):
        for target_param,local_param in zip(target_model.parameters(),local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)



In [52]:
agent = Agent(state_size,number_actions)
number_episodes = 2000
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100)

In [None]:
for episode in range(1,number_episodes+1):
    state,_ = env.reset()
    score = 0
    for t in range(maximum_number_timesteps_per_episode):
        action = agent.act(state,epsilon)
        next_state,reward,done,_,_ = env.step(action)
        agent.step(state,action,reward,next_state,done)
        state = next_state
        score += reward
        if done:
            break
        scores_on_100_episodes.append(score)
        epsilon = max(epsilon_ending_value,epsilon_decay_value*epsilon)
        if episode % 100 == 0:
            print('\rEpisode {} - Average Score: {:.2f}'.format(episode,np.mean(scores_on_100_episodes)),end = '')
