In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# this is MuZero!
# https://arxiv.org/abs/1911.08265
# It works, but only for CartPole
# Would be happy if it worked for Follower, LunarLander, or Acrobot
# Also, while it's written, it's not using mcts.
# Replace naive_search(which tries n**K) with mcts_search to use

In [None]:
%cd drive/MyDrive/github

In [None]:
# Clone github repository setup
# import join used to join ROOT path and MY_GOOGLE_DRIVE_PATH
from os.path import join  

# VM root path
ROOT = "/content/drive"
# Google Drive path to the project
MY_GOOGLE_DRIVE_PATH = "MyDrive/github" 
# Github username 
GIT_USERNAME = "antferdom" 
# definitely replace with your
GIT_TOKEN = "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"  
# Github repository name to clone
GIT_REPOSITORY = "ai-notebooks" 

PROJECT_PATH = join(ROOT, MY_GOOGLE_DRIVE_PATH)

# Guarantee the path leading to the project is well formed
print("PROJECT_PATH: ", PROJECT_PATH)   

# In case we haven't created the folder already; we will create a folder in the project path 
!mkdir "{PROJECT_PATH}"    

#GIT_PATH = "https://{GIT_TOKEN}@github.com/{GIT_USERNAME}/{GIT_REPOSITORY}.git"
GIT_PATH = "https://" + GIT_TOKEN + "@github.com/" + GIT_USERNAME + "/" + GIT_REPOSITORY + ".git"
print("GIT_PATH: ", GIT_PATH)

In [None]:
!mkdir ./temp
!git clone "{GIT_PATH}"
!mv ./temp/* "{PROJECT_PATH}"
!rm -rf ./temp
!rsync -aP --exclude=data/ "{PROJECT_PATH}"/*  ./

In [None]:
%load_ext autoreload
%autoreload 2
%pylab inline
import tensorflow as tf
import numpy as np
import gym
from tqdm import tqdm, trange
import os,sys
sys.path.append(os.getcwd())

In [None]:
%%bash

# install required system dependencies
apt-get install -y xvfb x11-utils

# install required python dependencies (might need to install additional gym extras depending)
pip install gym[box2d]==0.17.* pyvirtualdisplay==0.2.* PyOpenGL==3.1.* PyOpenGL-accelerate==3.1.*

In [None]:
import pyvirtualdisplay


_display = pyvirtualdisplay.Display(visible=False,  # use False with Xvfb
                                    size=(1400, 900))
_ = _display.start()

In [None]:
!echo $DISPLAY

In [None]:
%cd ai-notebooks

In [None]:
# Make Follower work! Will give interview to anyone who does.
from muzero.follower import Follower
#env = Follower()
env = gym.make("CartPole-v0")
# Observe the environment
print(env.reset())
env.obs_space
#env = gym.make("MountainCar-v0")
#env = gym.make("LunarLander-v2")
#env = gym.make("Acrobot-v1")

from muzero.model import MuModel
m = MuModel(env.observation_space.shape, env.action_space.n, s_dim=128, K=3, lr=0.001)
print(env.observation_space.shape, env.action_space.n)

from muzero.game import Game, ReplayBuffer
from muzero.mcts import naive_search, mcts_search
replay_buffer = ReplayBuffer(50, 128, m.K)
rews = []

In [None]:
def play_game(env, m):
  import random
  game = Game(env, discount=0.997)
  while not game.terminal():
    game.env.render()
    cc = random.random()
    if cc < 0.05:
      policy = [1/m.a_dim]*m.a_dim
    else:
      policy = naive_search(m, game.observation, T=1)
    game.act_with_policy(policy)
  return game

In [None]:
# !pip3 install 'gym[all]'

In [None]:
# video_rate = 100
# env = gym.wrappers.Monitor(env, "../video", video_callable=lambda episode_id: (episode_id%video_every)==0, force=True)

In [None]:
from muzero.model import reformat_batch
import collections
for j in range(30):
  game = play_game(env, m)
  replay_buffer.save_game(game)
  for i in range(20):
    m.train_on_batch(replay_buffer.sample_batch())
  rew = sum(game.rewards)
  rews.append(rew)
  print(len(game.history), rew, collections.Counter(game.history), m.losses[-1][0])

In [None]:
plt.plot(rews)
plt.figure()
plt.yscale('log')
plt.plot([x[0] for x in m.losses])
plt.plot([x[1] for x in m.losses])
plt.plot([x[-3] for x in m.losses])

In [None]:
# can act?
state = env.reset()
for sn in range(2000):
  p_0 = naive_search(m, state, debug=False, T=0.1)
  p_0, _ = mcts_search(m, state, 50)
  a_1 = np.random.choice(list(range(len(p_0))), p=p_0)
  _, v_0 = m.ft(m.ht(state))
  
  env.render()
  state,r,done,_ = env.step(a_1)
  print(a_1, v_0, r, p_0)
  if done:
    print("DONE", sn)
    break

In [None]:
import typing

import gym
import matplotlib.pyplot as plt
import numpy as np
from IPython import display


# represent states as arrays and actions as ints
State = np.ndarray
Action = int

# agent is just a function! 
Agent = typing.Callable[[State], Action]


def uniform_random_policy(state: State,
                          number_actions: int,
                          random_state: np.random.RandomState) -> Action:
    """Select an action at random from the set of feasible actions."""
    feasible_actions = np.arange(number_actions)
    probs = np.ones(number_actions) / number_actions
    action = random_state.choice(feasible_actions, p=probs)
    return action


def make_random_agent(number_actions: int,
                      random_state: np.random.RandomState = None) -> Agent:
    """Factory for creating an Agent."""
    _random_state = np.random.RandomState() if random_state is None else random_state
    return lambda state: uniform_random_policy(state, number_actions, _random_state)


def simulate(agent: Agent, env: gym.Env, ax: plt.Axes) -> None:
    state = env.reset()
    img = ax.imshow(env.render(mode='rgb_array'))
    done = False
    while not done:
        action = agent(state)
        img.set_data(env.render(mode='rgb_array')) 
        ax.axis('off')
        display.display(plt.gcf())
        display.clear_output(wait=True)
        state, reward, done, _ = env.step(action)       
    env.close()
    
# create the Gym environment
lunar_lander_v2 = gym.make('LunarLander-v2')
_ = lunar_lander_v2.seed(42)

# create an agent
random_agent = make_random_agent(lunar_lander_v2.action_space.n, random_state=None)

# simulate agent interacting with the environment
_, ax = plt.subplots(1, 1)
simulate(random_agent, lunar_lander_v2, ax)