In [None]:
import copy
import numpy as np
import random
from matplotlib import pyplot as plt

import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
## FIXME :: Takes very long to execute. Or should buy graphics card eventually.

TRIALS=500
MAXSTEPS=1000

DIM = 4

ALPHA=0.1       # Learnrate
EPSILON=0.01    # Randomness over Exploration versus Exploitation
GAMMA=0.9       # Influence of single training examples

T.seed = 1337
np.random.seed(T.seed)
random.seed(T.seed)

In [None]:
class GridWorld():
    def __init__(self, d=4, mode='static'):
        if d < 4:
            raise Exception("Dimension should be equal to 4.")
        if mode not in ['static', 'player-dynamic', 'dynamic']:
            raise Exception("The mode should be either static, player-dynamic or dynamic.")
        
        self.mode = mode
        self.d = d
        self.maxMoves = 25

        self.actionSpace = {'up': (-1, 0), 'right': (0, 1), 'down': (1, 0), 'left': (0, -1)}               
        self.cellTypes = [' ', '+', '-', 'P'] 
        self.cellTypesMap = {' ': 0, '+': 1, '-': 2, 'P': 3}       
        self.rewardMap = {' ': -1, '+': 25, '-': -10}
        
        # Initialize the environment
        self.reset()

    # Transform the grid to a multi-dimensional array, to later flatten it into 1-dimension vector for torch
    def transformGrid(self):
        grid = []
        cellTypes = ['+', '-', 'P']
        for a in cellTypes:
            plane = np.zeros_like(self.grid, dtype=np.int8)
            for b in cellTypes:
                if a == b:
                    plane = np.where(self.grid == b, 1, plane)
            grid.append(plane)
        return np.array(grid)

    # Reset to initial state
    def reset(self):
        self.moveCount = 0
        self.grid = np.tile(np.array(self.cellTypes[0]), (self.d, self.d))
        
        self.treasure = np.array([0, 0])
        self.pitfall = np.array([0, 3])
        self.player = np.array([3, 2])
            
        # If mode is player, set the player position to random
        if self.mode == 'player-dynamic':
            self.player = np.random.choice(self.d, size=2)            

        if self.mode == 'dynamic':
            self.pitfall = np.random.choice(self.d, size=2)  
            self.treasure = np.random.choice(self.d, size=2)  
            self.player = np.random.choice(self.d, size=2)  
            
        while (self.treasure == self.player).all():
            self.treasure = np.random.choice(self.d, size=2)

        while (self.pitfall == self.player).all() or (self.pitfall == self.treasure).all():
            self.pitfall = np.random.choice(self.d, size=2)
            

        self.grid[self.treasure[0], self.treasure[1]] = self.cellTypes[1] # Win
        self.grid[self.pitfall[0], self.pitfall[1]] = self.cellTypes[2] # Loss
        self.grid[self.player[0], self.player[1]] = self.cellTypes[3] # Player

        return self.transformGrid().flatten()

    def step(self, action):
        self.moveCount += 1
        reward = -1
        info = ''
        done = False

        # Setup new coordinates
        x, y = copy.copy(self.player) + self.actionSpace[action]
        if self.moveCount < self.maxMoves:
            if not done:                                    
                # Check that we are not at the edges
                if x >= 0 and x < self.d and y >= 0 and y < self.d:
                    reward = self.rewardMap[self.grid[x, y]]
                        # Check if its game won or over
                    if self.grid[x, y] == '+' or self.grid[x, y] == '-':
                        done = True
                        # Update the grid with this new transition                             
                    self.grid[x, y] = 'P'
                    self.grid[self.player[0], self.player[1]] = ' '
                        
                    # Update the player position
                    self.player = np.array([x, y])

        # Return the next state, reward received, is done flag and any other info                
        return self.transformGrid().flatten(), reward, done, info

    # Renders the game to the screen
    def render(self):
        print(self.grid)


In [None]:
class TorchQAgent():
    def __init__(self, envDimension=4, learningRate=1e-3, gamma=0.9, maxEpsilon = 1, minEpsilon=0.1):
        self.envDimension = envDimension
        self.learningRate = learningRate
        self.gamma = gamma
        self.maxEpsilon = maxEpsilon
        self.minEpsilon = minEpsilon

        self.availableActions = list(GridWorld(self.envDimension, mode='static').actionSpace.keys())
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')

        linQLayerIn = self.envDimension * 4 * 3
        linqLayerOut = len(self.availableActions)

        self.model = nn.Sequential(
            nn.Linear(linQLayerIn, 288),
            nn.ReLU(),
            nn.Linear(288, 144),
            nn.ReLU(),
            nn.Linear(144, linqLayerOut)
        ).to(self.device)

        self.optimizer = optim.Adam(self.model.parameters(), lr = self.learningRate)
        self.loss = nn.MSELoss()

    def train(self, mode='static', epochs=50):                 
        model = self.model 
        optimizer = self.optimizer
        lossFn = self.loss

        epsilon = self.maxEpsilon
        rewards = []

        for epoch in range(epochs):            
            env = GridWorld(self.envDimension, mode=mode)
            state = T.from_numpy(env.reset()).to(self.device).float()
            reward_per_epoch = []
            done = False            
            while not done:
                Q = model(state)
                Q_ = Q.data.cpu().numpy()

                if(np.random.random() < epsilon):
                    action = np.random.choice(len(self.availableActions))
                else:                                     
                    action = np.argmax(Q_)

                nextState_, reward, done, _ = env.step(self.availableActions[action])
                nextState = T.from_numpy(nextState_).to(self.device).float()
                reward_per_epoch.append(reward)

                state = nextState
                nextQ = model(nextState)                        
                maxNextQ = T.max(nextQ)                    

                if not done:
                    YHat = reward + (self.gamma * maxNextQ)
                else:
                    YHat = reward
                    rewards.append(np.sum(reward_per_epoch))
                    
                YHat = T.Tensor([YHat]).detach().to(self.device)                                       
                Y = Q.squeeze()[action]            
                                
                loss = lossFn(Y, YHat)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()              

            # Adapt the epsilon value
            if epsilon > self.minEpsilon:
                epsilon -= 0.01        

            if epoch % 100 == 0:
                print("Processing epoch {}".format(epoch))

        plt.title('TorchQAgent Training - Static Gridworld (A:{}, G:{}, E:{})'.format(ALPHA, GAMMA, EPSILON))
        plt.xlabel("Trials")
        plt.ylabel("Scores")
        plt.plot(rewards)

        picname = 'TorchQAgent_{}x{}_Static_Alp{}_Gam{}_Eps{}.png'.format(DIM, DIM, ALPHA,GAMMA, EPSILON)
        plt.savefig(picname)

    def test(self, numGames=10, mode='static', display=False):
        numWins = 0
        numLoss = 0

        for game in range(numGames):        
            testEnv = GridWorld(4, mode=mode)        
            state = T.from_numpy(testEnv.reset()).to(self.device).float()
            m = 0        
            done = False
            maxMoves = 10
            while not done and maxMoves > 0:
                maxMoves -= 1

                Q = self.model(state)
                Q_ = Q.data.cpu().numpy()                        

                action = np.argmax(Q_)    
                action = self.availableActions[action]
                
                nextState_, reward, done, _ = testEnv.step(action)    
                state = T.from_numpy(nextState_).to(self.device).float()
                
                m += 1         
                print('Game {}, Step {}, Action {}, Reward {}'.format(game, m, action, reward))
                if display:
                    testEnv.render()
                
                if done:                
                    if reward == 25:
                        numWins += 1
                    if reward == -10:
                        numLoss += 1
            
        print('Total Games={}. Wins={}, Losses={}'.format(numGames, numWins, numLoss))

In [None]:
agent = TorchQAgent(learningRate=ALPHA, gamma=GAMMA, maxEpsilon=EPSILON)

In [None]:
agent.train(epochs=100)

In [None]:
# Test on static mode
agent.test(numGames=50, mode='static', display=True)

In [None]:
# Test on player mode
agent.test(numGames=50, mode='player-dynamic', display=True)

In [None]:
# Test on random mode
agent.test(numGames=50, mode='dynamic', display=True)