In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
import math
import random
from collections import namedtuple
from itertools import count

In [2]:
env = gym.make('MountainCar-v0')
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.normal_(m.weight, 0, 1)

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.state_space = env.observation_space.shape[0]
        self.action_space = env.action_space.n
        self.hidden = 200
        self.l1 = nn.Linear(self.state_space, self.hidden, bias=False)
        self.l2 = nn.Linear(self.hidden, self.action_space, bias=False)
    
    def forward(self, x):    
        model = torch.nn.Sequential(
            self.l1,
            self.l2,
        )
        return model(x)

In [3]:
Transition = namedtuple('Transition',
                        ('state', 'next_s1', 'next_s2', 'next_s3', 'reward_1', 'reward_2', 'reward_3'))
class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [4]:
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10
possible_actions = [0, 1, 2]

policy_net = Policy()
target_net = Policy()
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)
loss_fn = nn.MSELoss().type(torch.FloatTensor)

In [5]:
steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            Q = policy_net(state)
            _, action = torch.max(Q, -1)
            return torch.tensor(action)
    else:
        return torch.tensor(random.randrange(env.action_space.n))

In [6]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))
    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask_1 = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_s1)), dtype=torch.uint8)
    non_final_next_states_1 = torch.stack([s for s in batch.next_s1
                                                if s is not None], dim = 0)
    non_final_mask_2 = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_s2)), dtype=torch.uint8)
    non_final_next_states_2 = torch.stack([s for s in batch.next_s2
                                                if s is not None], dim = 0)
    non_final_mask_3 = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_s3)), dtype=torch.uint8)
    non_final_next_states_3 = torch.stack([s for s in batch.next_s3
                                                if s is not None], dim = 0)
    
    state_batch = torch.stack(batch.state, dim = 0)
    reward_batch_1 = torch.cat(batch.reward_1)
    reward_batch_2 = torch.cat(batch.reward_2)
    reward_batch_3 = torch.cat(batch.reward_3)
    
    state_batch.requires_grad_(True)
    prediction = policy_net(state_batch)
    
    next_state_values_1 = torch.zeros(BATCH_SIZE)
    next_state_values_1[non_final_mask_1] = target_net(non_final_next_states_1).max(1)[0].detach()
    
    next_state_values_2 = torch.zeros(BATCH_SIZE)
    next_state_values_2[non_final_mask_2] = target_net(non_final_next_states_2).max(1)[0].detach()
    
    next_state_values_3 = torch.zeros(BATCH_SIZE)
    next_state_values_3[non_final_mask_3] = target_net(non_final_next_states_3).max(1)[0].detach()
    
    next_state_values = torch.stack([next_state_values_1, next_state_values_2,
                                     next_state_values_3], dim = 1)
    reward_batch = torch.stack([reward_batch_1, reward_batch_2,
                                     reward_batch_3], dim = 1)
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch
    loss = loss_fn(expected_state_action_values, prediction)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [7]:
num_episodes = 50
for i_episode in range(num_episodes):
    # Initialize the environment and state
    state = torch.from_numpy(env.reset()).type(torch.FloatTensor)
    for t in count():
        
        next_state_1, reward_1, done_1, _ = env.step(0)
        #reward_1 = next_state_1[0] + 0.5
        # Adjust reward for task completion
        #if next_state_1[0] >= 0.5:
        #    reward_1 += 1
        
        reward_1 = torch.tensor([reward_1])
        if not done_1:
            next_state_1 = torch.from_numpy(next_state_1).type(torch.FloatTensor)
        else:
            next_state_1 = None
        env.state = (state[0], state[1])
        
        next_state_2, reward_2, done_2, _ = env.step(1)
        #reward_2 = next_state_2[0] + 0.5
        # Adjust reward for task completion
        #if next_state_2[0] >= 0.5:
        #    reward_2 += 1
        
        reward_2 = torch.tensor([reward_2])
        if not done_2:
            next_state_2 = torch.from_numpy(next_state_2).type(torch.FloatTensor)
        else:
            next_state_2 = None
        env.state = (state[0], state[1])
        
        next_state_3, reward_3, done_3, _ = env.step(2)
        #reward_3 = next_state_3[0] + 0.5
        # Adjust reward for task completion
        #if next_state_3[0] >= 0.5:
        #    reward_3 += 1
        
        reward_3 = torch.tensor([reward_3])
        if not done_3:
            next_state_3 = torch.from_numpy(next_state_3).type(torch.FloatTensor)
        else:
            next_state_3 = None
        env.state = (state[0], state[1])
        
        # Select and perform an action
        action = select_action(state)
        next_state, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward])
        
        if not done:
            next_state = torch.from_numpy(next_state).type(torch.FloatTensor)
        else:
            next_state = None
        
        # Store the transition in memory
        memory.push(state, next_state_1, next_state_2, next_state_3, 
                    reward_1, reward_2, reward_3)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the target network)
        optimize_model()
        if done:
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())
print('Complete')
env.close()



Complete


In [8]:
successes = 0
for i_episode in range(num_episodes):
    # Initialize the environment and state
    state = torch.from_numpy(env.reset()).type(torch.FloatTensor)
    for t in count():
        # Select and perform an action
        env.render()
        action = select_action(state)
        next_state, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward])
        
        next_state = torch.from_numpy(next_state).type(torch.FloatTensor)
        state = next_state
        
        if done:
            print(state)
            if state[0] >= 0.5:
                successes += 1
            break
print(successes/num_episodes*100)
print('Complete')
env.close()



tensor([0.5027, 0.0282])
tensor([0.5077, 0.0311])
tensor([0.5121, 0.0298])
tensor([0.5190, 0.0491])
tensor([0.5264, 0.0298])
tensor([0.5402, 0.0465])
tensor([0.5136, 0.0144])
tensor([0.5369, 0.0500])
tensor([0.5240, 0.0307])
tensor([0.5020, 0.0338])
tensor([0.5369, 0.0500])
tensor([0.5393, 0.0412])
tensor([0.5345, 0.0455])
tensor([0.5054, 0.0117])
tensor([0.5162, 0.0300])
tensor([0.5391, 0.0470])
tensor([0.5369, 0.0500])
tensor([0.5069, 0.0245])
tensor([0.5357, 0.0488])
tensor([0.5246, 0.0444])
tensor([0.5373, 0.0457])
tensor([0.5325, 0.0432])
tensor([0.5369, 0.0500])
tensor([0.5290, 0.0327])
tensor([0.5369, 0.0500])
tensor([0.5010, 0.0475])
tensor([0.5342, 0.0427])
tensor([0.5369, 0.0500])
tensor([0.5212, 0.0485])
tensor([0.5064, 0.0340])
tensor([0.5246, 0.0347])
tensor([0.5343, 0.0370])
tensor([0.5104, 0.0279])
tensor([0.5464, 0.0485])
tensor([0.5162, 0.0402])
tensor([0.5165, 0.0412])
tensor([0.5366, 0.0437])
tensor([0.5331, 0.0470])
tensor([0.5160, 0.0447])
tensor([0.5191, 0.0231])
