<a href="https://colab.research.google.com/github/ccasanoval/RLtests/blob/master/DoomV4b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

DoomV4b

RL  = Stable Baseline 3 : PPO

ENV = Gymnasium + VizDoom

URL = https://github.com/AKapich/Reinforcement_Learning_Doom

URL = https://stable-baselines3.readthedocs.io/en/master/

In [25]:
!pip install vizdoom
!pip install stable_baselines3



In [26]:
############### GYM ENV == VIZ DOOM ###########################################
from vizdoom import DoomGame
from vizdoom import GameVariable
import numpy as np
from gymnasium import Env
from gymnasium.spaces import Discrete, Box
import cv2

class MyWayHomeGym(Env):
    def __init__(self, scenario, render=True, number_of_actions=3):
        self.game = DoomGame()
        self.game.load_config(f"{scenario}.cfg")

        # self.game.set_mode(Mode.SPECTATOR)  # spectator

        self.game.add_available_game_variable(GameVariable.POSITION_X)
        self.game.add_available_game_variable(GameVariable.POSITION_Y)
        self.game.add_available_game_variable(GameVariable.POSITION_Z)

        self.pos = None

        self.game.set_window_visible(render)
        self.game.init()

        self.pos_history_length = 200
        self.position_history = [None] * self.pos_history_length
        self.i = 0

        # self.observation_space = Box(
        #     low=0, high=255, shape=(100, 160, 320), dtype=np.uint8
        # )
        self.observation_space = Box(
            low=0, high=255, shape=(100, 160, 1), dtype=np.uint8
        )
        self.number_of_actions = number_of_actions
        self.action_space = Discrete(number_of_actions)

    def step(self, action):
        actions = np.identity(self.number_of_actions)
        reward = self.game.make_action(actions[action], 4)

        if self.game.get_state():
            _, pos_x, pos_y, pos_z = self.game.get_state().game_variables
            pos = np.array([pos_x, pos_y, pos_z])

            cur_index = self.i % self.pos_history_length
            self.position_history[cur_index] = pos

            prev_pos = self.position_history[self.pos_history_length - cur_index - 1]

            same_place_penalty = None
            if np.array_equal(self.position_history[cur_index], prev_pos):
                same_place_penalty = -1
            else:
                same_place_penalty = (
                    -0.5
                    / np.sqrt(
                        np.sum((self.position_history[cur_index] - prev_pos) ** 2)
                    )
                    if prev_pos is not None
                    else 0
                )

            same_place_penalty = max(-1, same_place_penalty)

            reward += same_place_penalty

            self.i += 1

            movement_reward = 0
            if self.pos is not None:
                dist = np.sqrt(np.sum((pos - self.pos) ** 2))
                movement_reward = dist * 0.005
                reward += movement_reward

            self.pos = pos

            state = self.game.get_state().screen_buffer

            green_reward = self.get_green_reward(np.moveaxis(state, 0, -1))

            reward += green_reward
            #print(movement_reward, green_reward, reward, same_place_penalty)

            state = self.grayscale(state)
            info = self.game.get_state().game_variables[0]  # ammo
        else:
            state = np.zeros(self.observation_space.shape)
            info = 0

        info = {"info": info}
        terminated = self.game.is_episode_finished()

        truncated = (
            self.game.is_player_dead()
            or self.game.is_player_dead()
            or self.game.is_player_dead()
        )

        return state, reward, terminated, truncated, info

    def reset(self, seed=0):
        self.game.new_episode()
        state = self.game.get_state().screen_buffer

        if self.game.get_state():
            info = self.game.get_state().game_variables[0]  # ammo
        else:
            info = 0

        return (self.grayscale(state), {"ammo": info})

    def grayscale(self, observation):
        gray = cv2.cvtColor(np.moveaxis(observation, 0, -1), cv2.COLOR_BGR2GRAY)
        resize = cv2.resize(gray, (160, 100), interpolation=cv2.INTER_CUBIC)
        state = np.reshape(resize, (100, 160, 1))
        return state

    def get_green_reward(self, observation):
        hsv = cv2.cvtColor(observation, cv2.COLOR_BGR2HSV)
        mask_green = cv2.inRange(hsv, (36, 25, 25), (70, 255, 255))
        imask_green = mask_green > 0
        green = np.zeros_like(observation, np.uint8)
        green[imask_green] = observation[imask_green]
        #cv2.imwrite("green.jpg", green)

        green_px_count = np.count_nonzero(green)

        # print("PX COUNT", green_px_count)
        if green_px_count > 800 and green_px_count < 3000:
            #print("Vest visible!")
            return 0.3

        pw = 10**6
        return green_px_count / pw
        # if green_px_count < 115000:
        #     return green_px_count / pw
        # else:
        #     b = 115000 / pw
        #     a = -1 / pw * 0.4
        #     return a * (green_px_count - 115000) + b

    def close(self):
        self.game.close()


In [27]:

##################### SB3 : CALLBACK ##########################################
from stable_baselines3.common.callbacks import BaseCallback
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, verbose=1, name="?"):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.name = name

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = '{}_{}'.format(self.name, self.n_calls)
            self.model.save(model_path)
        return True



In [28]:


modelType = "PPO"     # @param {type:"string"}
modelName = "DoomHome"+modelType # @param {type:"string"}
modelNew = False       # @param {type:"boolean"}
modelTrain = True     # @param {type:"boolean"}

print(modelName)

DoomHomePPO


In [None]:

###################### TRAIN == SB3 ###########################################
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3 import PPO


callback = TrainAndLoggingCallback(check_freq=10000, name=modelName)
env = MyWayHomeGym(render=False, scenario="my_way_home")

if modelNew:
    model = PPO(
        "CnnPolicy",
        env,
        verbose=1,
        seed=0,
        learning_rate=0.0001,
        n_steps=2048,
    )
else:
    model = PPO.load(modelName, env=env)

# TRAIN
if modelTrain:
    model.learn(
        total_timesteps=200000,
        callback=callback,
    )



Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 525      |
|    ep_rew_mean     | 53.3     |
| time/              |          |
|    fps             | 84       |
|    iterations      | 1        |
|    time_elapsed    | 24       |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 525        |
|    ep_rew_mean          | 73.3       |
| time/                   |            |
|    fps                  | 94         |
|    iterations           | 2          |
|    time_elapsed         | 43         |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.02588726 |
|    clip_fraction        | 0.212      |
|    clip_range           | 0.2        |
|    entr

In [None]:
###################### TEST == SB3 ########################################
model = PPO.load(modelName, env=env)
env = env = MyWayHomeGym(render=True, scenario="my_way_home")
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
env.close()

print(f"mean_reward:{mean_reward:.2f}")
print(f"std_reward:{std_reward:.2f}")
