## Task : Maximize the score in defend the centre

In [None]:
from vizdoom import *
from vizdoom import DoomGame

import numpy as np
import matplotlib.pyplot as plt

import gymnasium as gym

from gymnasium import Env


from gymnasium.spaces import Box,Discrete

import cv2

import time

import os

from stable_baselines3.common.callbacks import BaseCallback

from stable_baselines3.common import env_checker


from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

CHECK_POINT_DIR="./train/Centre"
LOG_DIR="./log/Centre"

In [3]:

class TrainAndLoggingCallback(BaseCallback):
    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)
            if self.verbose:
                print(f" Model checkpoint saved at step {self.n_calls} → {model_path}")
        return True
    
callback= TrainAndLoggingCallback(check_freq=10000,save_path=CHECK_POINT_DIR)

In [24]:
class VizDoomEnv(Env):
    def __init__ (self,render=False,config="github/ViZDoom/scenarios/defend_the_center.cfg",tics=2,punishment=-0.1):
        super().__init__()
        self.game=DoomGame()
        self.game.load_config(config)
        self.game.set_window_visible(render)

        self.game.init()
        self.tics=tics
        self.punish=punishment

        self.observation_space=Box(low=0,high=255,shape=(100,160,1), dtype=np.uint8)
        self.action_space=Discrete(3)


        # self.game.set_damage_taken_penalty(-0.01)
        # self.game.set_living_reward(0.01)
    def step(self,action):
            actions = np.identity(3, dtype=np.uint8)
            state_obj = self.game.get_state()
            if state_obj:
                prev_health = state_obj.game_variables[1]  #  2nd variable is HEALTH
                prev_ammo = state_obj.game_variables[0]    # AMMO2
            else:
                prev_health = 100
        
                prev_ammo = 26  # default ammo

            reward =0

            if action == 2:  # Shoot
                reward +=self.punish # Penalty for shooting
            

            action_reward = self.game.make_action(actions[action], self.tics)
            reward += action_reward

             # Get new state after action
            if self.game.get_state():
                new_vars = self.game.get_state().game_variables
                new_ammo = new_vars[0]
                new_health = new_vars[1]
            else:
                new_health =prev_health
                new_ammo = prev_ammo


            reward += (new_health - prev_health) * 0.05  # Reward for health change

            terminated = self.game.is_episode_finished()
            truncated = False  # 
            if not terminated:
                reward += 0.05  # Small reward for staying alive

            if self.game.get_state():
                state = self.game.get_state().screen_buffer
                state = self.greyscale(state)
                ammo = self.game.get_state().game_variables[0]
                info = {"ammo": ammo}
            else:
                state = np.zeros(self.observation_space.shape, dtype=np.uint8)
                info = {"ammo": 0}

            return state, reward, terminated, truncated, info

    def render(self):
        pass
    def greyscale(self,observation):
        grey= cv2.cvtColor(np.moveaxis(observation,0,-1),cv2.COLOR_BGR2GRAY) #move axis to reoder the channels to (240,320,3)
        resize=cv2.resize(grey,(160,100),interpolation=cv2.INTER_CUBIC)
        state=np.reshape(resize,(100,160,1))
        return state
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)  # Let Gymnasium handle seeding
        if seed is not None:
            np.random.seed(seed)  # Optional, for reproducibility

        self.game.new_episode()
        observation = self.greyscale(self.game.get_state().screen_buffer)

        info = {}  # You can add useful debug info here if needed
        return observation, info
    def close(self):
        self.game.close()

In [25]:
env=VizDoomEnv(render=True)
env_checker.check_env(env, warn=True)

In [None]:
env=VizDoomEnv(render=True)

while True:
    state,reward,terminated,truncated,info=env.step(0)
    print(reward)
    time.sleep(0.05)
    if terminated or truncated: break

In [None]:
vec_env = make_vec_env(lambda: VizDoomEnv(render=False, punishment=0, tics=3), n_envs=4)

model = PPO(
    "CnnPolicy",
    vec_env,
    verbose=1,
    tensorboard_log=LOG_DIR,
    n_steps=2048,
    batch_size=256,
    learning_rate=1e-5,
    callback=callback
)


In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import time

def play(model_path="train/Centre/best_model_250000.zip", render=True, num_episodes=5, sleep_time=0.03):
    """
    Run a trained PPO agent on ViZDoom for visualization.

    Args:
        model_path (str): Path to the saved PPO model.
        render (bool): Whether to enable rendering.
        num_episodes (int): How many episodes to run.
        sleep_time (float): Time to wait between frames (for FPS control).
    """
    # Import your custom ViZDoom environment class
 # <- change this to your actual file

    # Create a single environment (non-vectorized for inference)
    def make_env():
        return VizDoomEnv(render=render, punishment=0, tics=3)

    env = DummyVecEnv([make_env])  # Wrap to match training format

    # Load trained model
    model = PPO.load(model_path)

    # Run episodes
    for ep in range(num_episodes):
        obs = env.reset()
        done = False
        total_reward = 0
        step = 0

        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, info = env.step(action)
            total_reward += reward[0]  # Since DummyVecEnv returns a list
            step += 1

            if render:
                time.sleep(sleep_time)

        print(f"Episode {ep+1} finished in {step} steps with total reward: {total_reward:.2f}")

    env.close()
