In [None]:
import gym
import random
import torch
from torch import nn
from torch.optim import Adam
import numpy as np
from collections import deque
from tqdm import tqdm

env = gym.make('MountainCar-v0',render_mode='human')
np.random.seed(10)


In [None]:
class Agent(nn.Module):
    def __init__(self,state_space,action_space):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_features=state_space,out_features=64),
            nn.ReLU(),
            nn.Linear(in_features=64,out_features=32),
            nn.ReLU(),
            nn.Linear(in_features=32,out_features=action_space)
        )
    def forward(self,x):
        res = self.layers(x)
        return res
        

In [None]:
class DQN():
    def __init__(self,state_space, action_space,agent):
        self.state_space = state_space
        self.action_space = action_space
        self.gamma = 0.95
        self.batch_size = 64
        self.lr = 0.001
        self.eps = 1.0
        self.eps_decay = 0.995
        self.eps_min = 0.001
        self.memory = deque(maxlen=100000)
        self.agent = agent
        
    def remember(self,state,action,reward,next_state,done):
        self.memory.append((state,action,reward,next_state,done))
        
    def act(self,state):
        if np.random.rand() <= self.eps:    ### pick random action
            return random.randrange(self.action_space)
        vals = self.agent(torch.tensor(state).float())
        return np.argmax(vals.detach().numpy()[0])

    def train_step(self,states,targets_full,loss_fn,optimizer):
        self.agent.train()
        optimizer.zero_grad()
        logits = self.agent(torch.tensor(states).float())
        loss = loss_fn((targets_full),logits)
        loss.backward()
        optimizer.step()
        
    
    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        states = np.array([i[0] for i in minibatch])
        actions = np.array([i[1] for i in minibatch])
        rewards = np.array([i[2] for i in minibatch])
        next_states = np.array([i[3] for i in minibatch])
        dones = np.array([i[4] for i in minibatch])
        states = np.squeeze(states)
        next_states = np.squeeze(next_states)
        
        ### expected rewards for future actions
        targets = torch.from_numpy(rewards+self.gamma*(np.argmax(self.agent(torch.tensor(next_states)).detach().numpy(),axis=1))*((1-dones)))
        
        ### all possible actions and their rewards
        targets_full = self.agent(torch.tensor(states).float())
        
        ind = np.array([i for i in range(self.batch_size)])
        
        ### update actions' rewards
        targets_full[[ind], [actions]] = targets.to(dtype=torch.float32)

        ### eps should only be updated after successful runnings
        ### train to optimize expected rewards
        self.train_step(states,targets_full,nn.MSELoss(),Adam(params=self.agent.parameters(), lr=self.lr))
        if self.eps > self.eps_min:
            self.eps *= self.eps_decay
        
        


In [None]:
def get_reward(state):  ### sustom reward strategy
    if state[0] >= 0.5:
        return 10
    if state[0] > -0.4 or state[0]<-0.8:
        return -(1+state[0])**2
    return -1

def trainer(episodes,patience=3,resume_training=False,path=None):
    successes = {}
    loss = []
    state_space = env.env.observation_space.shape[0]
    action_space = env.action_space.n
    agent = Agent(state_space, action_space)
    if resume_training:
        agent.load_state_dict(torch.load(path))
        agent.eval()
    dqn = DQN(state_space,action_space,agent)
    for e in (range(episodes)):
        state = env.reset()[0]
        state = np.reshape(state,(1,2))
        score = 0
        max_steps = 1000
        for i in range(max_steps):
            action = dqn.act(state)
            env.render()
            next_state, reward, done,_,_ = env.step(action)
            #reward = get_reward(next_state)    ### uncomment to use custom reward strategy
            score += reward
            next_state = np.reshape(next_state,(1,2))
            dqn.remember(state,action,reward,next_state,done)
            state = next_state
            dqn.replay()
            if done:
                successes[e] = score
                print(f'Episode {e}/{episodes}, score:{score}')
                break
        loss.append(score)
    torch.save(agent.state_dict(),f'checkpoint_{episodes}.pt')
    return loss, successes

def random_policy(episode, step):
    for i in (episode):
        env.reset()
        for t in range(step):
            env.render()
            action = env.action_space.sample()
            state,reward,done,_,_ = env.step(action)
            if done:
                print(f"Episode finished after {i} timesteps")
                break
        
    
    

In [None]:

episodes = 1000
loss,succ= trainer(episodes,resume_training=False,path='checkpoint_1000.pt')
