In [266]:
nActions = 7
nInput = 3
rewardRange = 50
loadModel = 1
difficulty = 1

In [267]:
import random
from enum import Enum
from collections import namedtuple
import numpy as np
import pygame
import time

WHITE = (255, 255, 255)
GRAY = (200,200,200)
RED = (200,0,0)
BLUE1 = (0, 100, 255)
BLACK = (0,0,0)
GREEN = (0,200,0)


class Game:
    def __init__(self):
        self.w = 550
        self.h = 50
        self.display = pygame.display.set_mode((self.w, self.h))
        pygame.display.set_caption('Area')
        self.clock = pygame.time.Clock()
        self.inMaColor = RED
        self.actions = [-10,-5,-2,0,2,5,10]
        self.tracksNumber = 300
        
        self.m = 0.003162
        self.c = -0.112
        
        self.reset()
    
    def getArea(self):
        #self.k = (1 - np.random.normal(0,1,1)/3)
        if difficulty == 1:
            self.k = 1
            delta = 0
            aAct = (self.m * self.pSet * self.k + self.c + delta)*1000
        elif difficulty == 2:
            delta = np.random.normal(0,1,1)/100
            self.k = 1
            aAct = (self.m * self.pSet * self.k + self.c + delta)*1000

        return aAct
    
    def reset(self):
        self.pSet = 350
        self.score = 0
        self.frame_iteration = 0
        self.aTarget = (1000 + np.random.normal(0,1,1)*200)[0]
        self.aAct = self.getArea()
        self.startingPoint = self.aAct
        self.draw()
        
    def play_step(self, action):
        self.frame_iteration += 1
        idx = 0
        self.inMaColor = RED
        for i in action:
            if i==1:
                break
            idx+=1
        self.pSet += self.actions[idx]
        self.aAct = self.getArea()
        diff = np.sqrt((self.aAct - self.aTarget)**2)
        reward = 0
        donema = False
        
        #if diff<rewardRange:
            #self.inMaColor = GREEN
            #self.score+=1
            #reward = -1 * diff
        reward = -1 * diff
        self.score = reward
            # POSSIBLY TODO: change the aTarget once we hit the spot
        #if diff>700:
         #   donema = True
          #  reward = -10000
        if self.frame_iteration > self.tracksNumber:
            donema = True
            
        
        self.draw()
        return reward, donema, self.score
    def draw(self):
        self.display.fill(GRAY)
        pygame.draw.rect(self.display, BLUE1, pygame.Rect(0, 10, 1/3*self.aTarget, 3))
        pygame.draw.rect(self.display, BLACK, pygame.Rect(0, 15, 1/3*self.aAct, 3))
        pygame.draw.rect(self.display, self.inMaColor, pygame.Rect(1/3*(self.aTarget-rewardRange),5,3,3))
        pygame.draw.rect(self.display, self.inMaColor, pygame.Rect(1/3*(self.aTarget+rewardRange),5,3,3))
        pygame.draw.rect(self.display, BLUE1, pygame.Rect(1/3*self.startingPoint, 15, 3, 3))        
        pygame.display.update()


In [268]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os

class Linear_QNet(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.linear1 = nn.Linear(nInput, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, nActions)
    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        return x
    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)


In [269]:
        
class QTrainer:
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr = self.lr)
        self.criterion = nn.MSELoss()
        
    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)
        
        if len(state.shape) == 1:
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )
        
        pred = self.model(state)
        target=pred.clone()
        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))
            target[idx][torch.argmax(action[idx]).item()] = Q_new
        
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()
        
        self.optimizer.step()
        
        

In [270]:

from collections import deque

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.0001

class Agent:
    def __init__(self):
        self.n_games = 0
        self.epsilon = 0
        self.gamma = 0.9
        self.memory = deque(maxlen=MAX_MEMORY)
        self.model = Linear_QNet(64)
        if loadModel == 1:
            self.model.load_state_dict(torch.load("./model/model.pth"))

        self.trainer = QTrainer(self.model, lr=LR, gamma = self.gamma)
    
    def get_state(self, game):
        return np.array([game.pSet, game.aAct, game.aTarget], dtype=int)
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE)
        else:
            mini_sample = self.memory
        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
    
    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)
    
    def get_action(self, state):
        self.epsilon = 150 - self.n_games
        final_move = [0 for i in range(nActions)]
        if random.randint(0,200) < self.epsilon:
            move = random.randint(0,nActions-1)
            final_move[move] = 1
        else:
            state0 = torch.tensor(state, dtype = torch.float)
            prediction = self.model(state0)
            move = torch.argmax(prediction).item()
            final_move[move] = 1
        return final_move
def train():
    plot_scores = []
    total_score = 0
    record = -500
    agent = Agent()
    game = Game()
    while (1):
        state_old = agent.get_state(game)
        final_move = agent.get_action(state_old)
        
        reward, done, score = game.play_step(final_move)
        state_new = agent.get_state(game)
        
        agent.train_short_memory(state_old, final_move, reward, state_new, done)
        
        agent.remember(state_old, final_move, reward, state_new, done)
        
        if done:
            game.reset()
            agent.n_games +=1
            agent.train_long_memory()
            if score > record:
                record = score
                agent.model.save()
                print('Game', agent.n_games, 'Score', score, 'Record:', record)

            if agent.n_games % 10==0:
                print('Game', agent.n_games, 'Score', score, 'Record:', record)
            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            if agent.n_games == 1000:
                print("Done")
                agent.model.save()
                return

def test():
    total_score = 0
    record = -10
    agent = Agent()
    game = Game()
    while (1):
        state_old = agent.get_state(game)
        final_move = agent.get_action(state_old)
        
        reward, done, score = game.play_step(final_move)
        time.sleep(0.001)
        state_new = agent.get_state(game)        
        if done:
            game.reset()
            agent.n_games +=1
            if score > record:
                record = score

            if agent.n_games % 10==0:
                print('Game', agent.n_games, 'Score', score, 'Record:', record)


In [271]:
test()

Game 10 Score -5.532559123446845 Record: -2.0163802666877473
Game 20 Score -12.466503467028815 Record: -0.05432438676325546
Game 30 Score -16.995044551872752 Record: -0.020248993961331507
Game 40 Score -1.5271699408428958 Record: -0.020248993961331507
Game 50 Score -31.81349562916307 Record: -0.020248993961331507
Game 60 Score -1.7812811090918785 Record: -0.020248993961331507


KeyboardInterrupt: 

In [None]:
#DIFFICULTY - 1 - NO DELTA, 2 - WITH DELTA