In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import gym
from collections import namedtuple, deque
import random
import math
import os
from itertools import count
import matplotlib.pyplot as plt
import numpy as np
from mlagents_envs.environment import UnityEnvironment, ActionTuple, BaseEnv
from typing import Dict, NamedTuple, List

env = gym.make('MountainCar-v0')
train_mode = True # Whether to run the environment in training or inference mode
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class Experience(NamedTuple):
  """
  An experience contains the data of one Agent transition.
  - Observation
  - Action
  - Reward
  - Done flag
  - Next Observation
  """

  obs: np.ndarray
  action: np.ndarray
  reward: float
  done: bool
  next_obs: np.ndarray

# A Trajectory is an ordered sequence of Experiences
Trajectory = List[Experience]

# A Buffer is an unordered list of Experiences from multiple Trajectories
Buffer = List[Experience]

In [3]:
class model(nn.Module):
    def __init__(self, input_size, output_size):
        super(model, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 24),
            nn.ReLU(),
            nn.Linear(24, 12),
            nn.ReLU(),
            nn.Linear(12, output_size)
        )

    def forward(self, x):
        x = self.linear_relu_stack(x)
        return x

In [4]:
class Trainer:
    @staticmethod
    def generate_trajectories(
        env: BaseEnv, q_net: model, buffer_size: int, epsilon: float
    ):
        """
        Given a Unity Environment and a Q-Network, this method will generate a
        buffer of Experiences obtained by running the Environment with the Policy
        derived from the Q-Network.
        :param BaseEnv: The UnityEnvironment used.
        :param q_net: The Q-Network used to collect the data.
        :param buffer_size: The minimum size of the buffer this method will return.
        :param epsilon: Will add a random normal variable with standard deviation.
        epsilon to the value heads of the Q-Network to encourage exploration.
        :returns: a Tuple containing the created buffer and the average cumulative
        the Agents obtained.
        """
        # Create an empty Buffer
        buffer: Buffer = []

        # Reset the environment
        env.reset()
        # Read and store the Behavior Name of the Environment
        behavior_name = env.action_space.sample()

        # Create a Mapping from AgentId to Trajectories. This will help us create
        # trajectories for each Agents
        dict_trajectories_from_agent = self.Trajectory
        # Create a Mapping from AgentId to the last observation of the Agent
        dict_last_obs_from_agent = np.ndarray
        # Create a Mapping from AgentId to the last observation of the Agent
        dict_last_action_from_agent = np.ndarray
        # Create a Mapping from AgentId to cumulative reward (Only for reporting)
        dict_cumulative_reward_from_agent:
        # Create a list to store the cumulative rewards obtained so far
        cumulative_rewards: List[float] = []
        
        while len(buffer) < buffer_size:  # While not enough data in the buffer
            
            # Get the Decision Steps and Terminal Steps of the Agents
            observation, reward, done, _ = env.step(behavior_name)
            onState = done
            
            # For all Agents with a Terminal Step:
            if not onState: # on Terminal State

                # Create its last experience (is last because the Agent terminated)
                last_experience = Experience(
                    obs=observation,
                    reward=reward,
                    done=done,
                    action=behavior_name,
                    next_obs=terminal_steps[agent_id_terminated].obs[0],
                )

                """
                # Clear its last observation and action (Since the trajectory is over)
                dict_last_obs_from_agent.pop(agent_id_terminated)
                dict_last_action_from_agent.pop(agent_id_terminated)
                """

                # Report the cumulative reward
                cumulative_reward = (
                    dict_cumulative_reward_from_agent.pop(agent_id_terminated)
                    + terminal_steps[agent_id_terminated].reward
                )
                cumulative_rewards.append(cumulative_reward)
                # Add the Trajectory and the last experience to the buffer
                buffer.extend(dict_trajectories_from_agent.pop(agent_id_terminated))
                buffer.append(last_experience)

            # For all Agents with a Decision Step:
            else:
                # If the Agent does not have a Trajectory, create an empty one
                if agent_id_decisions not in dict_trajectories_from_agent:
                    dict_trajectories_from_agent = []
                    dict_cumulative_reward_from_agent = 0

                # Create an Experience from the last observation and the Decision Step
                exp = Experience(
                    obs=observation,
                    reward=reward,
                    done=False,
                    action=dict_last_action_from_agent[agent_id_decisions].copy(),
                    next_obs=,
                )
                # Update the Trajectory of the Agent and its cumulative reward
                dict_trajectories_from_agent.append(exp)
                dict_cumulative_reward_from_agent += (
                    reward
                )
                # Store the observation as the new "last observation"

                dict_last_obs_from_agent = observation
        
            # Compute the values for each action given the observation
            actions_values = (
                q_net(torch.from_numpy(decision_steps.obs[0])).detach().numpy()
            )
            
            if random.uniform(0, 1) <= epsilon:
                actions = np.random.randint(3, size=(decision_steps.agent_id.shape[0], 1))
            else:
                actions = np.argmax(actions_values, axis=1)
                actions.resize((len(decision_steps), 1))
            
            # Store the action that was picked, it will be put in the trajectory later
            for agent_index, agent_id in enumerate(decision_steps.agent_id):
                dict_last_action_from_agent[agent_id] = actions[agent_index]

            # Set the actions in the environment
            # Unity Environments expect ActionTuple instances.
            action_tuple = ActionTuple()
            action_tuple.add_discrete(actions)
            
            # Perform a step in the simulation
            env.set_actions(behavior_name, action_tuple)
            env.step()
        return buffer, np.mean(cumulative_rewards)

    @staticmethod
    def update_q_net(
        q_net: model,
        optimizer: torch.optim,
        buffer: Buffer,
        action_size: int
    ):
        """
        Performs an update of the Q-Network using the provided optimizer and buffer
        """
        BATCH_SIZE = 1000
        NUM_EPOCH = 3
        GAMMA = 0.9
        batch_size = min(len(buffer), BATCH_SIZE)
        random.shuffle(buffer)
        # Split the buffer into batches
        batches = [
            buffer[batch_size * start : batch_size * (start + 1)]
            for start in range(int(len(buffer) / batch_size))
        ]
        
        for _ in range(NUM_EPOCH):
            for batch in batches:
                # Create the Tensors that will be fed in the network
                obs = torch.from_numpy(np.stack([ex.obs for ex in batch]))
                reward = torch.from_numpy(
                    np.array([ex.reward for ex in batch], dtype=np.float32).reshape(-1, 1)
                )
                done = torch.from_numpy(
                    np.array([ex.done for ex in batch], dtype=np.float32).reshape(-1, 1)
                )
                action = torch.from_numpy(np.stack([ex.action for ex in batch]))
                next_obs = torch.from_numpy(np.stack([ex.next_obs for ex in batch]))
                
                # Use the Bellman equation to update the Q-Network
                target = (
                    reward
                    + (1.0 - done)
                    * GAMMA
                    * torch.max(q_net(next_obs).detach(), dim=1, keepdim=True).values
                )

                mask = torch.zeros((len(batch), action_size))
                mask.scatter_(1, action, 1)
                prediction = torch.sum(q_net(obs) * mask, dim=1, keepdim=True)
                criterion = torch.nn.SmoothL1Loss()
                loss = criterion(prediction, target)

                # Perform the backpropagation
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

In [5]:
# -----------------
# This code is used to close an env that might not have been closed before
try:
    env.close()
except:
    pass
# -----------------

# Create the GridWorld Environment from the registry
env.reset()

num_actions = 3
num_obs = 9
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200

try:
    # Create a new Q-Network.
    qnet = model(num_obs, num_actions)

    experiences: Buffer = []
    optim = torch.optim.Adam(qnet.parameters(), lr= 0.001)

    cumulative_rewards: List[float] = []

    # The number of training steps that will be performed
    NUM_TRAINING_STEPS = int(os.getenv('QLEARNING_NUM_TRAINING_STEPS', 30))
    # The number of experiences to collect per training step
    NUM_NEW_EXP = int(os.getenv('QLEARNING_NUM_NEW_EXP', 10000))
    # The maximum size of the Buffer
    BUFFER_SIZE = int(os.getenv('QLEARNING_BUFFER_SIZE', 10000))

    for n in range(NUM_TRAINING_STEPS):
        eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * n / EPS_DECAY)
        new_exp,_ = Trainer.generate_trajectories(env, qnet, NUM_NEW_EXP, epsilon=eps_threshold)
        random.shuffle(experiences)
        if len(experiences) > BUFFER_SIZE:
            experiences = experiences[:BUFFER_SIZE]
        experiences.extend(new_exp)
        Trainer.update_q_net(qnet, optim, experiences, num_actions)
        _, rewards = Trainer.generate_trajectories(env, qnet, 1000, epsilon=0)
        cumulative_rewards.append(rewards)
        print("Training step ", n+1, "\treward ", rewards)
except KeyboardInterrupt:
    print("\nTraining interrupted, continue to next cell to save to save the model.")
finally:
    env.close()

# Show the training graph
try:
    plt.plot(range(NUM_TRAINING_STEPS), cumulative_rewards)
except ValueError:
    print("\nPlot failed on interrupted training.")

UnityCommunicatorStoppedException: Communicator has exited.

In [1]:
import gym
env = gym.make('CartPole-v0')
env.reset()
for _ in range(10):
    env.render()
    observation, reward, done, info = env.step(env.action_space.sample()) # take a random action
    print(env.action_space(0))
    
env.close()

TypeError: 'Discrete' object is not callable