In [5]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

In [6]:
class DeepQNetwork(nn.Module):
    def __init__(self, ALPHA):
        #super().__init__()
        super(DeepQNetwork, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 8, stride=4, padding=1)
        self.conv2 = nn.Conv2d(32,64,4, stride=2)
        self.conv3 = nn.Conv2d(64, 128, 3)
        self.fc1 = nn.Linear(128*19*8, 512)
        self.fc2 = nn.Linear(512, 6)
        
        self.optimizer = optim.RMSprop(self.parameters(), lr=ALPHA)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda' if T.cuda.is_available() else 'cpu')
        self.to(self.device)
        
    def forward(self, observation):
        observation = T.Tensor(observation).to(self.device)
        observation = observation.view(-1, 1, 185, 95)
        observation = F.relu(self.conv1(observation))
        observation = F.relu(self.conv2(observation))
        observation = F.relu(self.conv3(observation))
        observation = observation.view(-1, 128*19*8)
        observation = F.relu(self.fc1(observation))
        
        actions = self.fc2(observation)
        
        return actions

In [7]:
class Agent(object):
    def __init__(self, gamma, epsilon, alpha, maxMemorySize, epsEnd=0.05, replace=10000, actionSpace=[0,12,3,4,5]):
        self.GAMMA = gamma
        self.EPSILON = epsilon
        self.EPS_END = epsEnd
        self.memSize = maxMemorySize
        
        self.steps = 0
        self.learn_step_counter = 0
        self.memory = []
        self.memCntr = 0
        self.replace_target_cnt = replace
        self.Q_eval = DeepQNetwork(alpha)
        self.Q_next = DeepQNetwork(alpha)
        
    def StoreTransition(self, state, action, reward, state_):
        if (self.memCntr < self.memSize):
            self.memory.append([state, action, reward, state_])
        else:
            self.memory[self.memCntr % self.memSize] = [state, action, reward, state_]
        self.memCntr += 1
        
    def ChooseAction(self, observation):
        rand = np.random.random()
        actions = self.Q_eval.forward(observation)
        if (rand < 1 - self.EPSILON):
            action = T.argmax(actions[1]).item()
        else:
            action = np.random.choice(self.actionSpace)
        
        self.steps += 1
        
        return action

    def learn(self, batch_size):
        self.Q_eval.optimizer.zero_grad()
        if self.replace_target_cnt is not None and \
           self.learn_step_counter % self.replace_target_cnt == 0:
            self.Q_next.load_state_dict(self.Q_eval.state_dict())
        
        if self.memCntr + batch_size < self.memSize:
            memStart = int(np.random.choice(range(self.memCntr)))
        else:
            memStart = int(np.random.choice(range(self.memCntr - batch_size -1)))
        
        miniBatch = self.memory[memStart:memStart+batch_size]
        memory = np.array(miniBatch)
        
        Qpred = self.Q_eval.forward(list(memory[:,0][:])).to(self.Q_eval.device)
        Qnext = self.Q_next.forward(list(memory[:,3][:])).to(self.Q_eval.device)
        maxA = T.argmax(Qnext, dim = 1).to(self.Q_eval.device)
        rewards = T.Tensor(list(memory[:,2])).to(self.Q_eval.device)
        
        Qtarget = Qpred
        Qtarget[:,maxA] = rewards + self.GAMMA * T.max(Qnext[1])
        
        if self.steps > 500:
            if self.EPSILON - 1e-4 > self.EPS_END:
                self.EPSILON -= 1e-4
            else:
                self.EPSILON = self.EPS_END
        
        loss = self.Q_eval.loss(Qtarget, Qpred).to(self.Q_eval.device)
        loss.backward()
        self.Q_eval.optimizer.step()
        self.learn_step_counter += 1

In [None]:
import gym
from model import DeepQNetwork, Agent

from utils import plootLearning
import numpy as np



if __name__ == '__main__':
    env = gym.make('SpaceInvaders-v0')
    brain = Agent(gamma=0.95, epsilon=1.0, alpha=0.003, maxMemorySize=5000, replace=None)

    while brain.memCntr < brain.memSize:
        observation = env.reset()
        done = False
        
        while not done:
            # 0 no action, 1 fire, 2 move right, 3 move left, 4 move right fire, 5 move left fire
            action = env.action_space.sample()
            observation_, reward, done, info = env.step(action)
            if done and info['ale.lives'] == 0:
                reward = -100
                
            brain.storeTransition(np.mean(observation[15:200, 30:125], axis=2), action, reward, np.mean(observation_[15:200, 30:125],axis=2))
            observation = observation_
            print('done initializing memory')
            
            scores = []
            epsHistory = []
            numGames = 50
            batch_size = 32
            
            for i in range(numGames):
                print('starting game', i+1, 'epsilon : %.4f' %brain.EPSILON)
                
                done = False
                observation = env.reset()
                
                frames = [np.sum(observation[15:200, 30:125], axis=2)]
                
                score = 0
                lastAction = 0
                
                while not done:
                    if len(frames) == 3:
                        action = brain.chooseAction(frames)
                        frames = []
                    else:
                        action = lastAction
                        
                    observation_, reward, done, info = env.step(action)
                    
                    score += reward
                    frames.append(np.sum(observation[15:200, 30:125], axis=2))
                    
                    if done and info['ale.lives'] == 0:
                        reward = -100
                    brain.storeTransition(np.mean(observation[15:200, 30:125], axis=2), action, reward,
                                          np.mean(observation_[15:200, 30:125],axis=2))
                    
                    observation = observation_
                    brain.learn(batch_size)
                    lastAction = action
                    #env.render()
                    
                    
                    scores.append(score)
                    print('score: ', score)
                    
                x = [i+1 for in in range(numGames)]
                fileName = 'test' + str(numGames) + '.png'
                
                plotLearning(x, scores, epsHistory,	fileName)
                