In [17]:
import gymnasium as gym
import numpy as np
from torch import nn
import torch
from collections import deque
import random

In [18]:
torch.manual_seed(42)
np.random.seed(42)


In [19]:
class DQN(nn.Module):
    def __init__(self, state,action):
        super(DQN,self).__init__()
        self.input=nn.Linear(state,128)
        self.hidden=nn.Linear(128,128)
        self.output=nn.Linear(128,action)

    def forward(self,x):
        x=self.input(x)
        x=torch.relu(x)
        x=self.hidden(x)
        x=torch.relu(x)
        x=self.output(x)
        return x

In [20]:
class Replaymemory:
    def __init__(self,maxlen):
        self.maxlen=maxlen
        self.memory=deque([],maxlen=self.maxlen)
    
    def append(self,transition):
        self.memory.append(transition)
    
    def sample(self,sample_size):
        return random.sample(self.memory,sample_size)
    
    def __len__(self):
        return len(self.memory)


In [21]:
class Hyperparameters:
    epsilon=1.0
    epsilon_end=0.01
    epsilon_dec=0.992

    batch_size=64
    memory_size=10000
    target_update_freq=1000

    learning_rate=0.005
    discount_factor=0.99


In [22]:
class Agent:

    def __init__(self,is_train=True,DDQN=True):
        if is_train:
            hp=Hyperparameters()
            self.epsilon=hp.epsilon
            self.epsilon_dec=hp.epsilon_dec
            self.epsilon_end=hp.epsilon_end
            self.batch_size=hp.batch_size
            self.memory_size=hp.memory_size
            self.target_update_freq=hp.target_update_freq
            self.lossfn=nn.SmoothL1Loss()
            self.optimizer=None
            self.learning_rate=hp.learning_rate
            self.discount_factor=hp.discount_factor

            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

            self.env = gym.make("LunarLander-v3")
            obs_shape = self.env.observation_space.shape[0]
            n_actions = self.env.action_space.n

            self.policy_net = DQN(obs_shape, n_actions).to(self.device)
            self.target_net = DQN(obs_shape, n_actions).to(self.device)
            self.target_net.load_state_dict(self.policy_net.state_dict())
            self.target_net.eval()

            self.memory = Replaymemory(self.memory_size)
            self.DDQN=DDQN

    def run(self,is_train=True,render=False,episodes=100,DDQN=True):
        env=gym.make("LunarLander-v3",render_mode="human" if render else None)
        #env=gym.make("CartPole-v1",render_mode="human")

        policy_net=self.policy_net

        if is_train:
            memory=self.memory
            Target_net=self.target_net
            Target_net.load_state_dict(policy_net.state_dict())
            self.optimizer=torch.optim.Adam(policy_net.parameters(),lr=self.learning_rate)
            stepcount=0

        reward_list=[]

        for i in range(episodes):
            state_main=env.reset()[0]
            reward_ep=0
            while True:
                term=False
                state=torch.tensor(state_main,dtype=torch.float32,device=self.device).unsqueeze(0)

                if np.random.random()<self.epsilon and is_train:
                    action=env.action_space.sample()    
                else:
                    with torch.no_grad():
                        action=policy_net(state).argmax().item()

                new_state,reward_move,term,trunc,_=env.step(action)
                reward_ep+=reward_move
                done=term or trunc
                state_main=new_state

                if is_train:
                    action_tens=torch.tensor([action],dtype=torch.int64,device=self.device)
                    new_state_tens=torch.tensor(new_state,dtype=torch.float32,device=self.device).unsqueeze(0)
                    reward_tens=torch.tensor(reward_move,dtype=torch.float32,device=self.device)
                    term_tens=torch.tensor(term,dtype=torch.float32,device=self.device)
                    memory.append((state,action_tens,new_state_tens,reward_tens,term_tens))
                    stepcount+=1


                if is_train and len(memory)>=self.batch_size:
                    batch=memory.sample(self.batch_size)
                    self.optimize(batch,policy_net,Target_net,self.DDQN)
                    if stepcount%self.target_update_freq==0:
                        Target_net.load_state_dict(policy_net.state_dict())
                        stepcount=0

                if done :
                    break

            if self.epsilon>self.epsilon_end and is_train:
                self.epsilon*=self.epsilon_dec
            else:
                self.epsilon=self.epsilon_end
            reward_list.append(reward_ep)
            
            print(f"Episode: {i+1}, Reward: {reward_ep}, Epsilon: {self.epsilon}")
        return reward_list
    
    def optimize(self,batch,policy_net,Target_net,DDQN=True):

        states,actions,new_state,reward,term=zip(*batch)

        states=torch.cat(states)
        actions=torch.stack(actions).view(-1,1).long()
        new_state=torch.cat(new_state)
        reward=torch.stack(reward).unsqueeze(1)
        term=torch.tensor(term,dtype=torch.float32).unsqueeze(1)

        curr_q=policy_net(states).gather(1,actions)
        
        if DDQN:
            with torch.no_grad():
                best_nxt_act=policy_net(new_state).argmax(1).unsqueeze(1)
                tar_q_val=Target_net(new_state).gather(1,best_nxt_act).squeeze(1)
                tar_q=reward+(1-term)*self.discount_factor*tar_q_val
        else:
            with torch.no_grad():
                tar_q_val=Target_net(new_state).max(1)[0]
                tar_q=reward+(1-term)*self.discount_factor*tar_q_val.unsqueeze(1)

        loss=self.lossfn(curr_q,tar_q)

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
        self.optimizer.step()


In [23]:
my_agent=Agent(is_train=True) 

train_rewards=my_agent.run(is_train=True,render=False,episodes=500,DDQN=False)

Episode: 1, Reward: -357.94041736143106, Epsilon: 0.992
Episode: 2, Reward: -189.69902276709928, Epsilon: 0.9840639999999999
Episode: 3, Reward: -201.48500362980647, Epsilon: 0.9761914879999999
Episode: 4, Reward: -66.1271266161129, Epsilon: 0.9683819560959999
Episode: 5, Reward: -202.43978479832901, Epsilon: 0.9606349004472319
Episode: 6, Reward: -240.04715886525332, Epsilon: 0.952949821243654
Episode: 7, Reward: -135.5073724811097, Epsilon: 0.9453262226737048
Episode: 8, Reward: -116.87531463991535, Epsilon: 0.9377636128923151
Episode: 9, Reward: -86.15990272265552, Epsilon: 0.9302615039891766
Episode: 10, Reward: -88.34747320042256, Epsilon: 0.9228194119572632
Episode: 11, Reward: -130.2701038070507, Epsilon: 0.9154368566616051
Episode: 12, Reward: -72.46089854024613, Epsilon: 0.9081133618083123
Episode: 13, Reward: -173.49400826912412, Epsilon: 0.9008484549138458
Episode: 14, Reward: -165.79386429498663, Epsilon: 0.893641667274535
Episode: 15, Reward: -276.67923515253017, Epsilon: 

In [24]:
test_rewards=my_agent.run(is_train=False,render=True,episodes=10)

Episode: 1, Reward: -36.28427794377227, Epsilon: 0.01
Episode: 2, Reward: -39.310861964776635, Epsilon: 0.01
Episode: 3, Reward: -202.94240142334243, Epsilon: 0.01
Episode: 4, Reward: -33.028158457987416, Epsilon: 0.01


KeyboardInterrupt: 

In [None]:
torch.save(my_agent.policy_net.state_dict(),"lunarlander_dqn.pt")
