In [1]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.distributions import Categorical
import torchvision
import torchvision.transforms as transforms
import numpy as np
from Simulation import Level
import matplotlib.pyplot as plt
import math

# L1 and L2 are simple and complex examples we will use to test policy
# learning
L1 = ["11111",
     "12221",
      "12221",
      "12221",
     "12121",
     "12121",
     "11X11"]

L2 = ["11111",
     "00001",
     "00011",
     "21111",
     "11111",
     "12000",
     "22000",
     "11011",
     "12111",
     "11111",
     "11X11"]

simple_level = Level(L1)
complex_level = Level(L2)

In [2]:
print(simple_level)
print(complex_level)

11111
12221
12221
12221
12121
12121
11X11

11111
00001
00011
21111
11111
12000
22000
11011
12111
11111
11X11



In [3]:
# Hyperparameters
agent_view = 5*5*3
agent_choices = 6
learning_rate = 0.001
gamma = 0.01
hidden_size = 128
dropout_prob = 0
epsilon = 0.1
episodeNumber = 0

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.state_space = agent_view # Input vector
        self.action_space = agent_choices # Number of choices
        
        # Neural Net architecture
        self.l1 = nn.Linear(self.state_space, hidden_size, bias=True)
        self.l2 = nn.Linear(hidden_size, hidden_size, bias=True)
        self.l3 = nn.Linear(hidden_size, self.action_space, bias=False)
        
        self.gamma = gamma
        
        # Episode policy and reward history
        self.policy_history = Variable(torch.Tensor())
        self.reward_episode = []
        # Overall reward and loss history
        self.reward_history = []
        self.loss_history = []
    
    def forward(self, x):
        model = torch.nn.Sequential(
            self.l1,
            nn.Dropout(p=dropout_prob),
            nn.SELU(),
            self.l2,
            nn.Dropout(p=dropout_prob),
            nn.SELU(),
            self.l3,
            nn.Softmax(dim=-1)
        )
        return model(x)
        

In [4]:
policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)

In [5]:
# Implement select_action here
def select_action(state):
    state = torch.from_numpy(state).type(torch.FloatTensor)
    choices = policy(Variable(state))
    c = Categorical(choices)
    action = c.sample()
    
    if(random.random() < epsilon):
        tempArray = np.array([0.166,0.166,0.166,0.166,0.166,0.167])
        choices2 = torch.Tensor(tempArray)
        c2 = Categorical(choices2)
        action = c2.sample()
    
    if policy.policy_history.nelement() == 0:
        policy.policy_history = torch.stack([c.log_prob(action)])
    else:
        policy.policy_history = torch.cat([policy.policy_history, torch.stack([c.log_prob(action)])])

    return int(action)

In [6]:
# We apply Monte-Carlo Policy Gradient to improve out policy according
# to the equation
def update_policy():
    
    R = 0
    rewards = []

    # Discount future rewards back to the present using gamma
    for r in policy.reward_episode[::-1]:
        R = r + policy.gamma * R
        rewards.insert(0,R)

    # Scale rewards
    rewards = torch.FloatTensor(rewards)
    rewards = (rewards - rewards.mean()) / (rewards.std() + np.finfo(np.float64).eps)

    # Calculate loss
    loss = (torch.sum(torch.mul(policy.policy_history, Variable(rewards)).mul(-1), -1))

    # Update network weights
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    policy.loss_history.append(loss.data.item())
    
    #Save and intialize episode history counter
    policy.reward_history.append(np.sum(policy.reward_episode))
    policy.policy_history = Variable(torch.Tensor())
    policy.reward_episode = []

In [7]:
def rfunc0(x,steps,done):
    reward = 0
    if done:
        if(x <= 0):
            reward = 100000000000 - (20*steps)
        else:
            reward = 0
    else:
        reward = 3000 - (200*x) - (100*steps)
    #reward -= steps/4
    return reward


def rfunc1(x,steps,done):
    reward = 20 - (x*3)
    if done:
        if(x <= 0):
            reward += 40 - steps
        else:
            reward -= 40 + steps
    #reward -= steps/4
    return reward

def rfunc2(x,steps,done):
    return 1/(x+2)

def rfunc3(x, steps, done):
    return random.random()

def rfunc4(x, steps, done):
    return 5-x

level = complex_level
max_reward = 1

ActDictionary = {-1:"IN",
                 0:"SU",
                 1:"SL",
                 2:"SR",
                 3:"JU",
                 4:"JL",
                 5:"JR"}

def main(episodes):
    global episodeNumber
    episodeNumber = 0
    global epsilon
    truMinX = 200
    storeSteps = None
    for episode in range(episodes):
        minX = 200
        episodeNumber += 1
        epsilon = 2/(math.log(episodeNumber+2,2)+1)
        done = False     
        level.Reset()
        stps = []
        x,steps,done = level.getLivingRewardState()
        prevR = rfunc0(x, steps, done)
        while not done:
            state = np.asarray(level.getVector())
            action = select_action(state)
            x,steps,done = level.Act(action)
            reward = rfunc0(x,steps,done)
            stps += [(ActDictionary[action], reward - prevR)]
            if(x < minX):
                minX = x
            policy.reward_episode.append(reward - prevR)
            prevR = reward
            if x <= 0:
                print("\nReached the end!",end=" ")
                print(episodeNumber)
                print(stps)
            if done:
                break 
        update_policy()
        #print("Episode Done!")
        if(minX < truMinX):
            storeSteps = stps
    print(storeSteps)

In [8]:
main(1000)


Reached the end! 849
[('JU', 300), ('JL', -100), ('SU', 100), ('SU', 100), ('SU', 100), ('SU', 100), ('JR', -100), ('JR', -100), ('SU', 100), ('SU', 100), ('SU', 100), ('SU', 99999998060)]
[('JU', 300), ('SR', -100), ('SU', 100), ('SU', -1300)]
