In [1]:
! pip install gym==0.21.0
! pip install gym gym-retro 
! pip install pygame
! pip install tensorflow



In [2]:
! pip install torch==1.10.2+cu113 torchvision==0.11.3+cu113 torchaudio===0.10.2+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html


Looking in links: https://download.pytorch.org/whl/cu113/torch_stable.html


In [3]:
import retro # The main library
import time # For timing learning, if needed
import pygame # For rendering the game
! pip install gymgrid2

pygame 2.5.0 (SDL 2.28.0, Python 3.6.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [4]:
retro.data.list_games()
# There are actually many games provided by Gym Retro: this command let's you check them out!

['1942-Nes',
 '1943-Nes',
 '3NinjasKickBack-Genesis',
 '8Eyes-Nes',
 'AaahhRealMonsters-Genesis',
 'AbadoxTheDeadlyInnerWar-Nes',
 'AcceleBrid-Snes',
 'ActRaiser2-Snes',
 'ActionPachio-Snes',
 'AddamsFamily-GameBoy',
 'AddamsFamily-Genesis',
 'AddamsFamily-Nes',
 'AddamsFamily-Sms',
 'AddamsFamily-Snes',
 'AddamsFamilyPugsleysScavengerHunt-Nes',
 'AddamsFamilyPugsleysScavengerHunt-Snes',
 'AdvancedBusterhawkGleylancer-Genesis',
 'Adventure-Atari2600',
 'AdventureIsland-GameBoy',
 'AdventureIsland3-Nes',
 'AdventureIslandII-Nes',
 'AdventuresOfBatmanAndRobin-Genesis',
 'AdventuresOfBayouBilly-Nes',
 'AdventuresOfDinoRiki-Nes',
 'AdventuresOfDrFranken-Snes',
 'AdventuresOfKidKleets-Snes',
 'AdventuresOfMightyMax-Genesis',
 'AdventuresOfMightyMax-Snes',
 'AdventuresOfRockyAndBullwinkleAndFriends-Genesis',
 'AdventuresOfRockyAndBullwinkleAndFriends-Nes',
 'AdventuresOfRockyAndBullwinkleAndFriends-Snes',
 'AdventuresOfStarSaver-GameBoy',
 'AdventuresOfYogiBear-Snes',
 'AeroFighters-Snes',
 

In [5]:
# Command to create your environment
env = retro.make(game = "StreetFighterIISpecialChampionEdition-Genesis", scenario='scenario',
            obs_type=retro.Observations.RAM)

In [6]:
import tensorflow as tf
from tensorflow import keras
from collections import deque
from keras.models import Sequential 
from keras.layers import Dense, InputLayer
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, CSVLogger
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
import os
import numpy as np
import random

In [10]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.1
        self.log_dir='Graph'
        self.filepath_best = "model_weights_{}_{}_{}_best.hdf5".format(str(self.gamma), str(self.epsilon), str(self.learning_rate))
        self.filepath_last = "model_weights_{}_{}_{}_last.hdf5".format(str(self.gamma), str(self.epsilon), str(self.learning_rate))
        self.tb_call_back = TensorBoard(log_dir="logs/fit(ad)", histogram_freq=1, write_graph=True, write_images=False)
        self.checkpoint = ModelCheckpoint(
            self.filepath_best,
            monitor='loss',
            verbose=0,
            save_best_only=True,
            save_weights_only=True,
            save_freq=10,
            mode='min'
        )
        self.reduce_lr = ReduceLROnPlateau(
            monitor='loss',
            factor=0.1,
            patience=10,
            verbose=1,
            mode='auto',
            min_delta=0.0001,
            cooldown=0,
            min_lr=0
        )
        self.model = self._build_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(InputLayer(input_shape=self.state_size))
        model.add(Dense(64, activation='relu',kernel_initializer='RandomNormal'))
        model.add(Dense(64, activation='relu',kernel_initializer='RandomNormal'))
        model.add(Dense(self.action_size, activation='linear',kernel_initializer='RandomNormal'))
        model.compile(loss='mse',
                      optimizer=Adam(lr=self.learning_rate))
        if os.path.isfile(self.filepath_best):
            print("Loaded model weights from {}".format(self.filepath_best))
            model.load_weights(self.filepath_best)
        else:
            print("Initialized empty model")
        return model

    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        print(act_values)
        return np.argmax(act_values[0])  # returns action
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0, callbacks=[self.checkpoint, self.tb_call_back, self.reduce_lr])
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    #def load(self, name):
    #    self.model.load_weights(name)

    def save(self):
        self.model.save_weights(self.filepath_last)

In [11]:
def get_reward(enemy_health, last_enemy_health, own_health, last_own_health):
    reward = 0
    if enemy_health != last_enemy_health or own_health != last_own_health:
        if enemy_health != base_health or own_health != base_health:

            if last_enemy_health > enemy_health:
                inflicted_damage_reward = (last_enemy_health - enemy_health)
            else:
                inflicted_damage_reward = 0
            received_damage_penalty = (own_health - last_own_health)
            #received_damage_penalty = 0

            # our reward is defined by 'damage I inflict - damage I receive'
            reward = inflicted_damage_reward + received_damage_penalty
            if reward != 0:
                print("Hit enemy for {} reward".format(reward))

    return reward
def action_to_array(action, n):
    action_array = np.zeros(n)

    if action < 0:
        return action_array

    action_array[action] = 1
    return action_array

In [None]:
base_health = 176
#%load_ext tensorboard
#%tensorboard --logdir=logs2/fit
EPISODES=500
if __name__ == "__main__":
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    # agent.load("./save/cartpole-dqn.h5")
    batch_size = 32
    frameskip=7
    noop=np.zeros(action_size)
    start=noop
    start[5]=1
    #env.render()
    for e in range(EPISODES):
        state = env.reset()
        done = False
        state = np.reshape(state, [1, state_size])
        last_enemy_health = base_health
        last_own_health = base_health
        episode_reward=0
        while not done:
            action = agent.act(state)
            next_state, _, done, info = env.step(action_to_array(action, action_size))
            for _ in range(frameskip):
                if not done:
                    next_state, _, done, info = env.step(noop)
            enemy_health = info['enemy_health']
            own_health = info['health']
            reward = get_reward(enemy_health, last_enemy_health, own_health, last_own_health)
            if own_health <= -1 or enemy_health <= -1:  # this means the round is over
                for _ in range(5):  # skip some frames so that scores come in
                    _, _, done, info = env.step(start)
                print("Round over, {}:{} got {} reward without KO".format(
                            info['matches_won'],
                            info['enemy_matches_won'],
                            episode_reward
                        ))
                while True:
                    _, _, done, info = env.step(start)
                    if (info['enemy_health'] == base_health and info['health'] == base_health) or done:
                        break
                last_enemy_health = base_health
                last_own_health = base_health
            else:
                last_enemy_health = enemy_health
                last_own_health = own_health
            next_state = np.reshape(next_state, [1, state_size])
            agent.memorize(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward
            #if done:
            #    print("episode: {}/{}, score: {}, e: {:.2}"
            #          .format(e, EPISODES, time, agent.epsilon))
            #    break
        print("Episode {}# Reward: {}".format(e, episode_reward))
        print("Training...")
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)
        print("Done!")
        if e % 10 == 0:
            agent.save()
    env.close()

In [None]:
# This is the basic loop to be run. Here the sampling is random.
# Watch this run carefully. Do you realise that finishing the first level barely requires any skill?
# The juicy victory won by smashing keys randomly is an incentive to bring in more players to the arcade!
# Reset game to starting state
obs = env.reset()
# Set flag to false
done = False
print(env.observation_space.shape[0])
for game in range(1): 
    while not done: 
        if done: 
            obs = env.reset()
        env.render()
        action=env.action_space.sample()
        obs, reward, done, info = env.step(action)
        #print(action)
        time.sleep(0.01)
        #print(reward)
        


In [None]:
print(env.observation_space.shape[0])
env.close()

In [12]:
%load_ext tensorboard
%tensorboard --logdir=logs/fit

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6006 (pid 37832), started 2 days, 12:10:07 ago. (Use '!kill 37832' to kill it.)