<a href="https://colab.research.google.com/github/henry-bokyum-kim/NNStudy/blob/master/GymDisplay.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Turn on virtual display

In [0]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get install x11-utils

from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

v_display = Display(visible=0, size=(1400,900),)
v_display.start()

# Show frame using HTML player

In [1]:
!pip install JSAnimation
from matplotlib import animation
from JSAnimation.IPython_display import display_animation
from IPython.display import display
from IPython.display import HTML
import matplotlib.pyplot as plt

# Imports specifically so we can render outputs in Colab.
def display_frames_as_gif(frame, intv=30):
    """Displays a list of frames as a gif, with controls."""
    fig = plt.figure()
    patch = plt.imshow(frame[0].astype(int))
    def animate(i):
        patch.set_data(frame[i].astype(int))
    anim = animation.FuncAnimation(
        fig, animate, frames=len(frame), interval=intv, blit=False
    )
    #display(display_animation(anim, default_mode='loop'))
    # Set up formatting for the movie files
    display(HTML(data=anim.to_html5_video()))
    #FFwriter = animation.FFMpegWriter()
    #anim.save('basic_animation.mp4', writer = FFwriter)
    #show_video()
# display 

display_frames_as_gif(frame)

# Show frame using gym.Monitor

In [5]:
def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        for mp4 in mp4list:
            video = io.open(mp4, 'r+b').read()
            print(mp4)
            encoded = base64.b64encode(video)
            ipythondisplay.display(HTML(data='''<video alt="test" autoplay loop controls style="height: 300px;">
                                      <source src="data:video/mp4;base64,{0}" type="video/mp4"/>
                                      </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

# Replay Memory

In [14]:
import collections
step_data = collections.namedtuple("step", ("state", "action_value", "action", "next_state", "reward", "done"))

class Replay:
    def __init__(self, size):
        self.memory = collections.deque(maxlen = size)
        
    def push(self, data):
        self.memory.append(data)
        
    def prepare(self, env):
        pass
        
    def sample(self, size):
        if len(self.memory) >= size:
            return random.sample(self.memory, size)

# Agent

In [13]:
import gym
import torch
from collections import namedtuple

class Agent:
    def __init__(self, env, actor, noise, rend_wait = -1, rend_interval = -1 \
                 , frame = None, max_step = None, device = None):
        self.env = env
        if max_step is not None:
            self.env._max_episode_steps = max_step
        if device is None:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")            
        self.actor = actor
        self.noise = noise
        self.device = device
        
        self.wait = rend_wait
        self.interval = rend_interval
        self.frame = frame
        
    def reset(self):
        self.env.close()
        self.env.reset()
        
    def render(self, epoch):
        if self.wait >= 0 and epoch < self.wait:
            return
        if self.interval >= 0 and epoch % self.interval == 0:
            rend = self.env.render("rgb_array")
            if self.frame is not None:
                self.frame.append(self.env.render("rgb_array"))
        
    def episode(self, epoch):
        self.obs = self.env.reset()
        self.render(epoch)
            
        while True:
            with torch.no_grad():
                act_v = self.actor(torch.FloatTensor(self.obs).to(self.device)).cpu().numpy()
                act_v += self.noise.get_noise()
                if self.env.action_space.shape:
                    act_v = act_v.clip(self.env.action_space.low, self.env.action_space.high)
                act = self.actor.get_action(act_v)

            next_obs, rew, done, etc = self.env.step(act)
            self.render(epoch)

            obs = self.obs
            self.obs = next_obs

            yield obs, act_v, act, next_obs, rew, done, etc
            if done:
                break

# NoiseMaker

In [47]:
import numpy as np
import math

class NoiseMaker():
    def __init__(self, action_size, n_type = None, param = None, decay = False):
        self.action_size = action_size
        self.state = np.zeros(action_size, dtype=np.float32)
        self.count = 0
        self.decay = decay
        if n_type is None:
            n_type = "normal"
        self.type = n_type
        
        if param is None:
            self.param = {
                "start": 0.9,
                "end":0.02,
                "decay": 2000
            }
            if n_type =="ou":
                self.param["ou_mu"] = 0.0
                self.param["ou_th"] = 0.15
                self.param["ou_sig"] = 0.2
        else:
            self.param = param
            
    def get_noise(self):
        eps = self.param["end"] + (self.param["start"] - self.param["end"]) \
                * math.exp(-1*self.count/ self.param["decay"])
        
        noise = np.random.normal(size=self.action_size)
        if self.type == "ou":
            self.state += self.param["ou_th"] * (self.param["ou_mu"] - self.state) \
                        + self.param["ou_sig"] * noise
            noise = self.state
        if not self.decay:
            eps = 1
        self.count += 1
            
        return noise * eps

# Box2d Install

In [None]:
!apt-get install swig
!pip3 install box2d box2d-kengz

# GDrive Save

In [None]:
from google.colab import drive 
drive.mount('/content/gdrive/')

import pickle
with open('/content/gdrive/My Drive/noise','rb') as f:
    noise = pickle.load(f)