In [1]:
# Import os for Save Path
import os
#Import time to slow down
import time
# Import Training algo PPO from stablebaseline3 for RL
from stable_baselines3 import PPO
#Import enviornment base Class for a Wrapper
from gym import Env
#Import the space shapes for the enviornment
from gym.spaces import MultiBinary,Box
#Import numpy to calculate from delta
import numpy as np 
#Import opencv for grayscaling
import cv2
#Import retro 
import retro
# Monitor Wrapper from sb3 helps to extract reward easily from vectorized env
from stable_baselines3.common.monitor import Monitor
# Import vectorize wrapper from sb3
from stable_baselines3.common.vec_env import DummyVecEnv,VecFrameStack

In [2]:
#Create custom Env
class StreetFighter(Env):
    def __init__(self) -> None:
        super().__init__()
        #Specify observation space
        self.observation_space = Box(low=0,high=255,shape=(84,84,1),dtype=np.uint8)
        #Specify action space
        self.action_space = MultiBinary(12)
        #Start Game Instance
        self.game = retro.make(game='StreetFighterIISpecialChampionEdition-Genesis',
                               use_restricted_actions=retro.Actions.FILTERED)
        
    def step(self, action) -> tuple:
        #Take a step
        obs,reward,done,info = self.game.step(action)
        obs = self.preprocess(obs)
        #Frame delta
        #frame_delta = obs - self.previous_frame
        #obs = frame_delta
        #self.previous_frame = obs
        #Score delta
        #score_delta = info["score"]-self.score
        #self.score=info["score"] 

        # #Reset Match Start Value    
        if ((self.matches_won==2 and info["matches_won"]==0) or
             (self.enemy_matches_won== -2 and info["enemy_matches_won"]==0)):
            self.resetmatchvalues()

        #Health delta
        health_delta = info["health"]-self.health
        self.health = info["health"]
        #Enemy Health delta
        enemy_health_delta = -info["enemy_health"]+self.enemy_health
        self.enemy_health = info["enemy_health"]
        #Matches Win
        matches_won_delta = info["matches_won"]-self.matches_won
        self.matches_won=info["matches_won"]
        #Enemy Matches win
        enemy_matches_won_delta = -info["enemy_matches_won"]+self.enemy_matches_won
        self.enemy_matches_won = info["enemy_matches_won"]

        #Reset Round End Value
        if matches_won_delta!=0 or enemy_matches_won_delta!=0:
            self.resetroundvalues()

        #Calculate the reward function    
        reward = (
                  (health_delta+
                  enemy_health_delta)+
                  matches_won_delta*100+
                  enemy_matches_won_delta*100
                  )
        
        #TEST:
        #Print if reward not equall to zero with other params
        # if(health_delta!=0 or
        #     enemy_health_delta!=0 or
        #     matches_won_delta!=0 or 
        #     enemy_matches_won_delta!=0):
            # print(reward)
            # print(f"My health {health_delta}")
            # print(f"Enemy health {enemy_health_delta}")
            # print(f"matches won {matches_won_delta}")
            # print(f"Enemy matches won {enemy_matches_won_delta}")

        #Return state-reward-matchfinish-info tuple
        return obs,reward,done,info
      
    def render(self, *args,**kwargs) ->None:
        self.game.render()

    def reset(self) -> np.array:
        #resating self made values
        self.resetroundvalues()
        #reseting permanent values
        self.resetmatchvalues()
        #Reset State      
        obs = self.game.reset()
        #reset obs
        obs = self.preprocess(obs)
        #current frame - previous frame
        #self.previous_frame = obs
        #return current obs
        return obs
    
    def resetroundvalues(self):
        #self.score = 0
        self.health = 0 #seen from info
        self.enemy_health = 0 #seen from info
    def resetmatchvalues(self):
        self.matches_won = 0 #seen from info
        self.enemy_matches_won = 0 #seen from info
    def preprocess(self,observation) -> np.array:
        #Grayscale the frame
        gray = cv2.cvtColor(observation,cv2.COLOR_BGR2GRAY)
        #Resize the frame
        resize = cv2.resize(gray,(84,84),interpolation=cv2.INTER_CUBIC)
        #Add the channels value
        channels = np.reshape(resize,(84,84,1))
        return channels

    def close(self) -> None:
        self.game.close()

In [3]:
# Path to log dir
LOG_DIR=os.path.join("train","logs")
# Path to optimization dir
OPT_DIR=os.path.join("train","config")
# Path to model saved dir
CHECKPOINT_DIR = os.path.join("train","model")

In [4]:
#Create environment
env = StreetFighter()
env = Monitor(env,LOG_DIR)
env = DummyVecEnv([lambda:env])
env = VecFrameStack(env,4,channels_order='last')

In [5]:
model = PPO.load('./train/model/best_model_4100000.zip')

In [7]:
#Testing our model
episodes=1
for _ in range(episodes):
    obs = env.reset()
    done = False
    while not done:
        env.render()
        obs,reward,done,info = env.step(model.predict(obs)[0])
        time.sleep(0.002)
        if reward!=0:
            print(reward)

[7.]
[41.]
[29.]
[-42.]
[28.]
[36.]
[3.]
[100.]
[102.]
[-102.]
[-35.]
[-23.]
[37.]
[-6.]
[36.]
[40.]
[-31.]
[29.]
[35.]
[100.]
[82.]
[-82.]
[35.]
[-44.]
[-21.]
[-12.]
[-33.]
[-12.]
[-13.]
[-42.]
[-100.]
[-142.]
[142.]
[24.]
[37.]
[-44.]
[47.]
[26.]
[-10.]
[-25.]
[5.]
[-54.]
[7.]
[-30.]
[-100.]
[-17.]
[17.]


In [7]:
env.close()