In [1]:
#Środowisko gry Pacman
import gym 
from gym import spaces
import numpy as np
import pygame
from pygame.locals import *
from run import GameController
from constants import *
from pacman import Pacman
from ghost import Ghosts
from nodes import NodeGroup
from pellets import PelletGroup
from fruits import Fruits
from stable_baselines3 import DQN
from stable_baselines3 import PPO
import os

class PacmanEnv(gym.Env):
    metadata = {"render.modes": ["human"]}

class PacmanEnv(gym.Env):
    def __init__(self, render_mode=False):
        super(PacmanEnv, self).__init__()
        self.render_mode = render_mode
        self.time_limit = 300
        self.elapsed_time = 0 

        if not render_mode:
            os.environ["SDL_VIDEODRIVER"] = "dummy"
        else:
            os.environ.pop("SDL_VIDEODRIVER", None)

        pygame.quit()
        pygame.init()

        
        self.game = GameController(render_mode=render_mode)

        self.action_space = spaces.Discrete(5, start=-2)
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(SCREENHEIGHT, SCREENWIDTH, 3), dtype=np.uint8
        )

    def reset(self):
        self.game.startGame(3)  # Rozpoczęcie gry
        state = self.get_observation()  # Pobranie stanu początkowego
        return state
    
    def step(self, action):
        action = int(action)  # Konwersja akcji na int
        action = np.clip(action, 0, 4)
    
        # Mappowanie akcji z {0, 1, 2, 3, 4} na {-2, -1, 0, 1, 2}
        action = action - 2
        if self.game.pacman.validDirection(action):
            self.game.pacman.direction = action 

        pelletBefore = self.game.pellets.numEaten 
        lifesBefore = self.game.pacman.life_amount   

        self.game.update()
        self.elapsed_time += self.game.clock.get_time() / 1000

        if self.game.pacman.target is not None and self.game.pacman.overshotTarget():
            self.game.pacman.node = self.game.pacman.target
            self.game.pacman.setPosition()

        self.game.update()

        state = self.get_observation()

        reward = 0

        pellet = self.game.pellets.numEaten - pelletBefore
        if pellet == 1:
            reward += 20

        #liczenie za owocki nie działa drodzy panstwo
        fruit = None
        if self.game.fruits is not None:
            fruit = self.game.pacman.eatFruits(self.game.fruits)
            if fruit:
                reward += 20

        
        lifes = self.game.pacman.life_amount - lifesBefore
        if lifes == -1:
            reward -= 50

        
        if pellet == 0 and fruit is None:
            reward -= 2

        done = self.check_game_over()

        info = {}

        return state, reward, done, info
    
    def render(self, mode="human"):
        if self.render_mode and mode == "human":
            self.game.render()

    def get_observation(self):
        return pygame.surfarray.array3d(self.game.screen)

    def _init_pygame(self):
        if not pygame.get_init():
            pygame.init()

    def close(self):
        pygame.quit()

    def check_game_over(self):
        game_over = (
          self.game.pacman.life_amount == 0 or
          self.game.pellets.isEmpty() or
          self.elapsed_time >= self.time_limit
        )
        if game_over:
          self.elapsed_time = 0
        return game_over
    
    def change_resolution(self, width, height):
        global SCREENWIDTH, SCREENHEIGHT

        constants_path = os.path.join(os.path.dirname(__file__), "constants.py")
        with open(constants_path, "r") as file:
            lines = file.readlines()

        with open(constants_path, "w") as file:
            for line in lines:
                if line.startswith("SCREENWIDTH"):
                    file.write(f"SCREENWIDTH = {width}\n")
                elif line.startswith("SCREENHEIGHT"):
                    file.write(f"SCREENHEIGHT = {height}\n")
                else:
                    file.write(line)

        SCREENWIDTH, SCREENHEIGHT = width, height
        
        self.game.screen = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT))
        self.game.width, self.game.height = SCREENWIDTH, SCREENHEIGHT

    def get_observation(self):
        observation = pygame.surfarray.array3d(self.game.screen)
        return np.transpose(observation, (1, 0, 2))  # Zamienia wymiary: (800, 600, 3) na (600, 800, 3)








In [None]:
#BADANIE 1

import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym  # Gymnasium zamiast gym
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3 import DQN, A2C, PPO  # Algorytmy RL

def test_model(model, env, episodes=15):
    rewards = []
    durations = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        steps = 0
        done = False

        while not done:
            action, _ = model.predict(state)
            result = env.step(action)
            
            if len(result) == 5:
                state, reward, terminated, truncated, _ = result
            else: 
                state, reward, done, _ = result
                terminated = done
                truncated = False

            done = terminated or truncated
            episode_reward += reward
            steps += 1

        rewards.append(episode_reward)
        durations.append(steps)

    return rewards, durations

if __name__ == "__main__":
    def make_env():
        return gym.make("ALE/MsPacman-v5", render_mode="rgb_array")

    env = DummyVecEnv([make_env])

    dqn_model = DQN.load("ms_pacman_dqn_gym", env=env)
    a2c_model = A2C.load("ms_pacman_a2c_gym", env=env)
    ppo_model = PPO.load("ms_pacman_ppo_gym", env=env)

    test_episodes = 15

    print("Testing DQN model...")
    dqn_rewards, dqn_durations = test_model(dqn_model, env, episodes=test_episodes)

    print("Testing A2C model...")
    a2c_rewards, a2c_durations = test_model(a2c_model, env, episodes=test_episodes)

    print("Testing PPO model...")
    ppo_rewards, ppo_durations = test_model(ppo_model, env, episodes=test_episodes)

    env.close()

    print(f"DQN - Średnia liczba punktów: {np.mean(dqn_rewards):.2f}, Średni czas trwania: {np.mean(dqn_durations):.2f}")
    print(f"A2C - Średnia liczba punktów: {np.mean(a2c_rewards):.2f}, Średni czas trwania: {np.mean(a2c_durations):.2f}")
    print(f"PPO - Średnia liczba punktów: {np.mean(ppo_rewards):.2f}, Średni czas trwania: {np.mean(ppo_durations):.2f}")

    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(dqn_rewards, label="DQN", color='blue')
    plt.plot(a2c_rewards, label="A2C", color='orange')
    plt.plot(ppo_rewards, label="PPO", color='green')
    plt.title("Liczba punktów w kolejnych epizodach")
    plt.xlabel("Epizod")
    plt.ylabel("Punkty")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(dqn_durations, label="DQN", color='blue')
    plt.plot(a2c_durations, label="A2C", color='orange')
    plt.plot(ppo_durations, label="PPO", color='green')
    plt.title("Czas trwania epizodów")
    plt.xlabel("Epizod")
    plt.ylabel("Liczba kroków")
    plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
#BADANIE 2

import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import DQN, PPO, A2C
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
import gymnasium as gym
import time

# Funkcja testująca model
def evaluate_model(model, env, episodes=15):
    rewards = []
    times = []

    for i in range(episodes):
        print(f"Epizod {i + 1}")
        observation = env.reset()
        if isinstance(observation, tuple):
            observation = observation[0]
        episode_reward = 0
        done = False
        start_time = time.time()

        while not done:
            action, _ = model.predict(observation, deterministic=True)
            result = env.step(action)

            if len(result) == 5:
                observation, reward, terminated, truncated, _ = result
                done = terminated or truncated
            else:
                observation, reward, done, _ = result

            episode_reward += reward

        times.append(time.time() - start_time)
        rewards.append(episode_reward)

    return np.array(rewards), np.array(times)


def make_env():
    return gym.make("ALE/MsPacman-v5", render_mode="rgb_array")

env = DummyVecEnv([make_env])
env = VecTransposeImage(env)

models = {
    "DQN": DQN.load("ms_pacman_dqn_gym", env=env),
    "PPO": PPO.load("ms_pacman_ppo_gym", env=env),
    "A2C": A2C.load("ms_pacman_a2c_gym", env=env),
}

test_results = {}
for model_name, model in models.items():
    print(f"\nTestowanie modelu: {model_name}")
    rewards, times = evaluate_model(model, env)
    test_results[model_name] = {
        "rewards": rewards,
        "times": times,
        "mean_reward": np.mean(rewards),
        "max_reward": np.max(rewards),
        "min_reward": np.min(rewards),
        "std_reward": np.std(rewards),
        "mean_time": np.mean(times),
    }


for model_name, result in test_results.items():
    print(f"\nModel: {model_name}")
    print(f"Średnia nagroda: {result['mean_reward']:.2f}")
    print(f"Maksymalna nagroda: {result['max_reward']:.2f}")
    print(f"Minimalna nagroda: {result['min_reward']:.2f}")
    print(f"Odchylenie standardowe nagród: {result['std_reward']:.2f}")
    print(f"Średni czas decyzji: {result['mean_time']:.4f} sekundy")


plt.figure(figsize=(10, 6))
for model_name, result in test_results.items():
    plt.plot(result["rewards"], label=f'{model_name} - nagrody')
plt.xlabel("Epizod")
plt.ylabel("Nagroda")
plt.title("Krzywa nagród modeli Pacmana")
plt.legend()
plt.show()


plt.figure(figsize=(10, 6))
model_names = list(test_results.keys())
mean_rewards = [test_results[model]["mean_reward"] for model in model_names]
plt.bar(model_names, mean_rewards, color=['red', 'green', 'blue'])
plt.xlabel("Model")
plt.ylabel("Średnia nagroda")
plt.title("Porównanie średnich nagród modeli Pacmana")
plt.show()


plt.figure(figsize=(10, 6))
mean_times = [test_results[model]["mean_time"] for model in model_names]
plt.bar(model_names, mean_times, color=['cyan', 'magenta', 'yellow'])
plt.xlabel("Model")
plt.ylabel("Średni czas decyzji (s)")
plt.title("Porównanie średnich czasów decyzji modeli Pacmana")
plt.show()


for model_name, result in test_results.items():
    plt.figure(figsize=(10, 6))
    plt.hist(result["rewards"], bins=20, alpha=0.7, color='orange')
    plt.xlabel("Nagroda")
    plt.ylabel("Liczba epizodów")
    plt.title(f"Histogram nagród dla modelu {model_name}")
    plt.show()


In [None]:
#Nasze środowisko, uczenie modelu DQN
if __name__ == "__main__":
    from stable_baselines3 import DQN

    env = PacmanEnv(render_mode=False)

    model = DQN(
        "CnnPolicy", 
        env, 
        verbose=1, 
        buffer_size=1000, 
        device="cuda"
    )

    train = True

    if train:
        print("Training the model...")
        model.learn(total_timesteps=50000)
        model.save("pacman_dqn_model")
        env.close()

        print("Switching to testing mode...")
        env = PacmanEnv(render_mode=True)
        state = env.reset()
        rewardMain = 0
        print("Trained model:")

    else:
        print("Switching to testing mode...")
        env = PacmanEnv(render_mode=True)
        model = DQN.load("pacman_dqn_model", env=env, device="cuda")
        state = env.reset()
        rewardMain = 0
        print("Trained model:")

    for _ in range(1000):
        action, _states = model.predict(state)

        state, reward, done, info = env.step(action)

        env.render()
        rewardMain += reward

        if done:
            print("Game Over")
            break

    print(f"Total reward during testing: {rewardMain}")

    env.close()


CUDA (GPU) is not available. Make sure your NVIDIA drivers and CUDA toolkit are properly installed.
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.




Training the model...
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 204      |
|    ep_rew_mean      | 564      |
|    exploration_rate | 0.845    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 0        |
|    time_elapsed     | 1826     |
|    total_timesteps  | 817      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 5.09     |
|    n_updates        | 179      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 174      |
|    ep_rew_mean      | 637      |
|    exploration_rate | 0.735    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 0        |
|    time_elapsed     | 3024     |
|    total_timesteps  | 1395     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 7.41     

KeyboardInterrupt: 

In [None]:

#Nasze środowisko, uczenie modelu PPO
if __name__ == "__main__":
    from stable_baselines3 import PPO
    from stable_baselines3.common.env_util import make_vec_env

    env = PacmanEnv(render_mode=False)
    
    model = PPO("CnnPolicy", env, verbose=1, n_steps=256, batch_size=64, ent_coef=0.01)

    print("Training the PPO model...")
    model.learn(total_timesteps=100)
    model.save("pacman_ppo_model")
    env.close()

    print("Switching to testing mode...")
    env = PacmanEnv(render_mode=True)

    model = PPO.load("pacman_ppo_model", env=env)

    state = env.reset()
    rewardMain = 0
    print("Trained PPO model:")

    for _ in range(1000):
        action, _states = model.predict(state)

        state, reward, done, info = env.step(action)

        env.render()
        rewardMain += reward

        if done:
            print("Game Over")
            break

    print(f"Total reward during testing: {rewardMain}")

    env.close()


In [None]:

#Nasze środowisko, uczenie modelu A2C
if __name__ == "__main__":
    from stable_baselines3 import A2C
    from stable_baselines3.common.env_util import make_vec_env

    env = PacmanEnv(render_mode=False)
    
    model = A2C("CnnPolicy", env, verbose=1, n_steps=5, ent_coef=0.01, learning_rate=0.0007, gamma=0.99)

    print("Training the A2C model...")
    model.learn(total_timesteps=100)
    model.save("pacman_a2c_model")
    env.close()

    print("Switching to testing mode...")
    env = PacmanEnv(render_mode=True)
    
    model = A2C.load("pacman_a2c_model", env=env)

    state = env.reset()
    rewardMain = 0
    print("Trained A2C model:")

    for _ in range(1000):
        action, _states = model.predict(state)

        state, reward, done, info = env.step(action)

        env.render()
        rewardMain += reward

        if done:
            print("Game Over")
            break

    print(f"Total reward during testing: {rewardMain}")

    env.close()


In [None]:
#Środowisko gry Pacman z OpenAI Gym
#Aby wytrenować pozostałe modele, należy zmienić nazwe na odpowiedni model DQN/A2C/PPO i wziąc parametry takie same jak przy trenowaniu modelu przy zaimplementowanym naszym środowisku
import gym
import ale_py
import gym.envs
import gym.envs.registration
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv


def make_env():
    return gym.make("ALE/MsPacman-v5")

env = DummyVecEnv([make_env])


model = DQN('CnnPolicy', env, verbose=1, learning_rate=1e-4, buffer_size=50000)
model.learn(total_timesteps=50000)
model.save("ms_pacman_dqn_gym")

env = DummyVecEnv([make_env])
obs = env.reset()
done = False
rewardMain = 0

while not done:
    action, _states = model.predict(obs)
    obs, reward, done, info = env.step(action)
    rewardMain += reward
    env.render()

print(f"Total reward during testing: {rewardMain}")

env.close()