<a href="https://colab.research.google.com/github/aquapathos/MLLesson/blob/master/PPO%E5%AD%A6%E7%BF%92%E7%B5%90%E6%9E%9C%E3%81%AE%E7%A2%BA%E8%AA%8D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#remove " > /dev/null 2>&1" to see what is going on under the hood
!pip install pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install pfrl > /dev/null 2>&1

In [None]:
import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor
gymlogger.set_level(40) #error only
import tensorflow as tf
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML

from IPython import display as ipythondisplay

In [None]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

<pyvirtualdisplay.display.Display at 0x7f212e4c0080>



> インデントされたブロック



In [None]:
def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")
    

def wrap_env(env):
  env = Monitor(env, './video', force=True)
  return env

In [None]:
import gym
from gym import ObservationWrapper
from gym.spaces import Box

import cv2
import numpy as np
import torch

from torch import nn
from torch import optim
from torch.distributions import Categorical
from torch.nn import functional as F

from pfrl.wrappers.atari_wrappers import FrameStack,NoopResetEnv,MaxAndSkipEnv

In [None]:
class myCrop(ObservationWrapper):
    def __init__(self, env, tmgn=0, bmgn=0,lmgn=0,rmgn=0,igcolors=[],bgcolor=[0,0,0]):
        super(myCrop, self).__init__(env)
        self.tmgn, self.bmgn = tmgn, bmgn
        self.lmgn, self.rmgn = lmgn, rmgn
        self.igcolors, self.bgcolors = igcolors, bgcolor
        self.observation_space = Box(low=0, high=255, shape=(84,84), dtype=np.uint8)
    def observation(self, obs):
        img_mask = np.zeros(obs.shape[:2],np.uint8)
        for color in self.igcolors:
            bgrLower = np.array(color)    
            bgrUpper = np.array(color)
            tmask = cv2.inRange(obs, bgrLower, bgrUpper) 
            img_mask = cv2.bitwise_or(img_mask,tmask)
        obs = cv2.bitwise_and(obs,obs,mask=255-img_mask) # 元画像とマスクを合成
        RIGHT=obs.shape[1]-self.rmgn
        BOTTOM=obs.shape[0]-self.bmgn
        obs = obs[self.tmgn:BOTTOM,self.lmgn:RIGHT]
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        observation = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
        return observation

class myFrameStack(FrameStack):
    def __init__(self, env, k=4, deadloss=0):
        super(myFrameStack, self).__init__(env, k=k, channel_order="chw")
    def reset(self):
        ob = self.env.reset()
        for _ in range(self.k):
            self.frames.append(ob)
        self.lives = self.env.unwrapped.ale.lives()
        return  self.obs_to_torch(np.array(list(self.frames)))
    def step(self, action):
        ob, reward, done, info = self.env.step(action)
        self.frames.append(ob)
        return  self.obs_to_torch(np.array(list(self.frames))), reward, done, info
    @staticmethod
    def obs_to_torch(obs: torch.Tensor) -> torch.Tensor:
        return torch.tensor([obs], dtype=torch.float32, device=device) / 255.

def mkenv(envname,k=8,skip=2,tmgn=0,bmgn=0,lmgn=0,rmgn=0,igcolors=[],deadloss=0,noop_max=30):
  env=gym.make(envname)
  if noop_max > 0:
      env = NoopResetEnv(env, noop_max=noop_max)
  env = MaxAndSkipEnv(env, skip=skip)
  env=myCrop(env, tmgn=tmgn, bmgn=bmgn, lmgn=lmgn, rmgn=rmgn, igcolors=igcolors)
  env=myFrameStack(env,k=k,deadloss=deadloss)
  return env

In [None]:
import random
from datetime import datetime
random.seed(datetime.now())
DEFAULTSEED = random.randint(1, 10000)
def Game(seed=DEFAULTSEED,k=8,skip=1,deadloss=0):
    ENV_NAME = 'SpaceInvadersNoFrameskip-v4'
    Tmgn=20
    Bmgn=12
    Lmgn=8
    Rmgn=8
    #NOCOLOR=[[162,134,56]]  # 背景と同一視するカラー
    NOCOLOR=[]  # 背景と同一視するカラー

    env = mkenv(ENV_NAME,k,skip,Tmgn,Bmgn,Lmgn,Rmgn,NOCOLOR,deadloss=deadloss)
    env.seed(seed)
    return env

In [None]:
class Model(nn.Module):

    def __init__(self,nframes=8):
        super().__init__()

        # 4x84x84 → 32x20x20 
        self.conv1 = nn.Conv2d(in_channels=nframes, out_channels=32, kernel_size=8, stride=4)
        # 32x20x20 →64x9x9
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
        # 64x9x9 → 64x7x7 
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
        # 64x7x7 → 512 
        self.lin = nn.Linear(in_features=7 * 7 * 64, out_features=512)
        # 512 → 4 actions 0-1 （行動決定）
        self.pi_logits = nn.Linear(in_features=512, out_features=4)
        # 行動価値
        self.value = nn.Linear(in_features=512, out_features=1)

    def forward(self, obs: torch.Tensor):
        h = F.relu(self.conv1(obs))
        h = F.relu(self.conv2(h))
        h = F.relu(self.conv3(h))
        h = h.reshape((-1, 7 * 7 * 64))

        h = F.relu(self.lin(h))

        pi = Categorical(logits=self.pi_logits(h))
        value = self.value(h).reshape(-1)

        return pi, value

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
PROJECTNAME = 'ppo'
SAVEFOLDER = '/content/drive/MyDrive/M/{}'.format(PROJECTNAME)
import torch
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(repr(device))
print(device)

device(type='cpu')
cpu


# NFRAME を１２としています。適宜修正してください。

In [None]:
NFRAME=12
model = Model(NFRAME).to(device)
model.load_state_dict(torch.load(SAVEFOLDER+'/model'))

<All keys matched successfully>

In [None]:
random.seed(datetime.now())
DEFAULTSEED = random.randint(1, 10000)
game = wrap_env(Game(DEFAULTSEED,NFRAME))

observation = game.reset()
while True:

    game.render()
    
    pi, v = model(observation)
    action = pi.sample() # 方策関数によりアクションを決定
    # action =a0.cpu().numpy()[0] # アクション番号の数値化

    observation, reward, done, info = game.step(action) 
        
    if done: 
      break;            
game.close()
show_video()