In [210]:
from collections import deque
import gym
import numpy as np
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import deque
import traceback

### Models

In [213]:
class Neuralnetwork(nn.Module):
    def __init__(self, input_size, output_size, hidden_size = 128):
        super().__init__()
        self.sequetial_stack = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        x = self.sequetial_stack(x)
        return x

    def save(self, file_name=f'model{random.randint(0,10000)}.pth'):
        model_folder_path = './model'
        
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_path = os.path.join(model_folder_path, file_name)

        torch.save(self.state_dict(), file_path)

class QTrainer:
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)
        # (n, x)

        if len(state.shape) == 1:
            # (1, x)
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )

        # 1: predicted Q values with current state
        pred = self.model(state)

        target = pred.clone()
        for idx in range(len(done)):
            q_new = reward[idx]
            if not done[idx]:
                q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action[idx]).item()] = q_new
    
        # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
        # pred.clone()
        # preds[argmax(action)] = Q_new
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()

        self.optimizer.step()


### Plot functions

In [214]:
import matplotlib.pyplot as plt
from IPython import display

plt.ion()

def plot(scores, mean_scores):
    display.clear_output(wait=True)
    display.display(plt.gcf())
    plt.clf()
    plt.title('Training...')
    plt.xlabel('Number of Games')
    plt.ylabel('Score')
    plt.plot(scores)
    plt.plot(mean_scores)
    plt.ylim(ymin=0)
    plt.text(len(scores)-1, scores[-1], str(scores[-1]))
    plt.text(len(mean_scores)-1, mean_scores[-1], str(mean_scores[-1]))
    plt.show(block=False)
    plt.pause(.1)

### Agent

In [229]:
MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001

class Agent:
  
  def __init__(self, input_size, hidden_size, output_size) -> None:
    self.n_games = 0
    self.epsilon = 0 # randomness
    self.gamma = 0.9 # discount rate
    self.memory = deque(maxlen=MAX_MEMORY)
    self.model = Neuralnetwork(input_size,output_size ,hidden_size)
    self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)

  def get_state(self, state):
    # is not need right now for our game but it is good to have it
    pass

  def get_action(self, state) -> list[int]:
    # self.epsilon = 80 - self.n_games
    self.epsilon = 90 - self.n_games*(80/200)
    self.epsilon = 10 if self.epsilon < 10 else self.epsilon
    final_move = None

    if (random.randint(0, 100) < self.epsilon):
        final_move = random.randint(0, 1)
    else:
        state0 = torch.tensor(state, dtype=torch.float)
        prediction = self.model(state0)
        final_move = torch.argmax(prediction).item()
        print(prediction)
        # print('final_model', final_move)
        
    return final_move
    
  def remember(self, state, action, reward, next_state, done) -> None:
    self.memory.append((state, action, reward, next_state, done))

  def train_short_memory(self, state, action, reward, next_state, done) -> None:
    self.trainer.train_step(state, action, reward, next_state, done)

  
  def train_long_memory(self) -> None:
      if len(self.memory) > BATCH_SIZE:
          mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
      else:
          mini_sample = self.memory

      states, actions, rewards, next_states, dones = zip(*mini_sample)
      self.trainer.train_step(states, actions, rewards, next_states, dones)

### main function

In [231]:
def start() -> None:
  env = gym.make('CartPole-v1', render_mode='human')
  # 0 Cart position -4.8 to 4.8
  # 1 Cart velocity -inf to inf
  # 2 Pole angle ~ -0.418 rad (-24°) to 0.418 rad (24°)
  # 3 Pole velocity at tip -inf to inf
  n_actions = env.action_space.n # output
  state, info = env.reset()
  n_observations = len(state) ## input
  total_score = 0
  record = 0
  
  agent = Agent(n_observations, 128, n_actions)

  while True:
    env.render()
    display.clear_output(wait=True)
    print(f'Game number: {agent.n_games}')
    state_old = state
    final_move = agent.get_action(state_old) # [0, 1]

    curr_state, reward, terminated, truncated, _ = env.step(final_move)
    print('Reward: ', reward)
    state = curr_state

    agent.train_short_memory(state_old, final_move, reward, state, terminated)

    agent.remember(state_old, final_move, reward, state, terminated)

    if terminated:
      env.reset()
      agent.n_games += 1
      agent.train_long_memory()

      if total_score > record:
        record = total_score
        agent.model.save()
    
if __name__ == '__main__':
  try:       
    start()
  except:
    traceback.print_exc()
    env.close()
        

Game number: 26
else 79.6
tensor([2.5756, 0.0550], grad_fn=<ViewBackward0>)
Reward:  1.0


Traceback (most recent call last):
  File "/tmp/ipykernel_588532/3192077506.py", line 41, in <module>
    start()
  File "/tmp/ipykernel_588532/3192077506.py", line 33, in start
    agent.train_long_memory()
  File "/tmp/ipykernel_588532/2785920552.py", line 51, in train_long_memory
    self.trainer.train_step(states, actions, rewards, next_states, dones)
  File "/tmp/ipykernel_588532/2898612456.py", line 65, in train_step
    loss.backward()
  File "/home/adria/.local/lib/python3.10/site-packages/torch/_tensor.py", line 522, in backward
    torch.autograd.backward(
  File "/home/adria/.local/lib/python3.10/site-packages/torch/autograd/__init__.py", line 266, in backward
    Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
KeyboardInterrupt


In [None]:
env.close()