# **PAC-MAN, PROGETTO ML :)**

***INTRODUZIONE DEL PROGETTO:***


# ***FASE 1:*** configurazione e analisi dell'ambiente di lavoro

in questa prima parte andremo a configurare correttamente l'ambiente di gioco scelto.
<br> è stato scelto come ambiente quello legato al Berkeley Pacman Project che però non è disponibile come pacchetto PyPI installabile direttamente; --> si deve cercare una open source scritta in python 3 (o tradotta da python 2) su github per poi clonarla e configurarla correttamente su questo colab.
l'ambiente di lavoro è preso dal seguente link di github:
<br> https://github.com/jeff-hykin/berkeley_pacman.git

- ***perché dobbiamo clonare il pacman project?***
<br> l'ambiente di gioco di pacman non è incluso in python né in colab --> non si può fare semplicemente import pacman come con altre librerie. il Berkeley Pacman Project è un ambiente didattico sviluppato dalla university of caliornia per progetti dove si legano i concetti di ML e reinforcement learning.
<br> questo ambiente è comodo perché ci fornisce già
* il gioco implementato in python
* struttura che si può usare per definire gli agenti intelligenti
* layout dei labirinti
* funzioni di scoring e reward etc.
* la struttura dei fantasmi

il link sopra messo porta alla repo di github dove sono contenute delle cartelle con i vari codici o documentazioni necessarie



---




di seguito andiamo a clonare la repo e fare un po' di considerazioni sull'ambiente di lavoro

In [None]:
# cloniamo la repo del progetto di pacman da github e scarichiamolo

!git clone https://github.com/jeff-hykin/berkeley_pacman.git

# entriamo nella cartella del progetto

%cd berkeley_pacman

Cloning into 'berkeley_pacman'...
remote: Enumerating objects: 3567, done.[K
remote: Counting objects: 100% (154/154), done.[K
remote: Compressing objects: 100% (112/112), done.[K
remote: Total 3567 (delta 77), reused 90 (delta 37), pack-reused 3413 (from 1)[K
Receiving objects: 100% (3567/3567), 1.55 MiB | 20.03 MiB/s, done.
Resolving deltas: 100% (2288/2288), done.
/content/berkeley_pacman


In [None]:
# vediamo dove siamo
!pwd

# spostiamoci nella cartella main
%cd main

/content/berkeley_pacman
/content/berkeley_pacman/main


In [None]:
# librerie necessarie
import pandas as pd
from layout import Layout, get_layout
from util import manhattan_distance

from game import Agent
import random
import util


from pacman import ClassicGameRules
from ghost_agents import DirectionalGhost, RandomGhost

# ***FASE 2:*** analisi dell'ambiente di lavoro



---


qui di seguito una piccola analisi specifica sul labirinto small_classic

* **!cat** --> comando shell che permette di stampare il contenuto del file di testo del layout
* **get_layout()** --> restituisce un oggetto Layout del progetto di berkeley, con strutture interne che poi sono chiamate dopo nei vari print

In [None]:
print("---- visualizzazione del labirinto SMALL ----")
!cat layouts/small_classic.lay
print()
print("-"*15)
# Carichiamo il layout e stampa info

layout = get_layout("small_classic")

print(f"Dimensioni: {layout.width} x {layout.height}")
print(f"Num. food: {layout.food.count()}")
print(f"Num. capsule: {len(layout.capsules)}")
print(f"Pacman start: {layout.agent_positions[0]}")
print(f"Ghost starts: {[pos for pos in layout.agent_positions[1:]]}")

# Calcoliamo "densità" del labirinto
total_cells = layout.width * layout.height
wall_cells = sum(1 for x in range(layout.width)
                  for y in range(layout.height)
                  if layout.walls[x][y])
open_cells = total_cells - wall_cells

print(f"Celle totali: {total_cells}")
print(f"Celle muro: {wall_cells}")
print(f"Celle libere: {open_cells}")
print(f"Wall occupancy (%): {wall_cells/total_cells:.1%}")




---- visualizzazione del labirinto SMALL ----
%%%%%%%%%%%%%%%%%%%%
%......%G  G%......%
%.%%...%%  %%...%%.%
%.%o.%........%.o%.%
%.%%.%.%%%%%%.%.%%.%
%........P.........%
%%%%%%%%%%%%%%%%%%%%

---------------
Dimensioni: 20 x 7
Num. food: 55
Num. capsule: 2
Pacman start: (True, (9, 1))
Ghost starts: [(False, (8, 5)), (False, (11, 5))]
Celle totali: 140
Celle muro: 76
Celle libere: 64
Wall occupancy (%): 54.3%




---

di seguito un analisi più completa e comparativa sui 3 labirinti classici, che useremo nel training/test
* small_classic
* medium_classic
* original_classic

* **df[...].to_string()** --> estrazione di righe o colonne con un metodo di stampa (opportuno del DataFrame) in modo che sia ben formattata e leggibile sulla console (senza troncamenti)
* **.loc[row_label, column_label], .iloc[]** --> metodo di accesso label based ai dati del DataFrame, si utilizzano nel primo caso le etichette (nomi), il secondo invece usa i numeri interi

In [None]:
# restituisce un dizionario di statistiche strutturali e spaziali
# in forma di funzione almeno si può chiamare su più layout
def analyze_layout(layout_name):

    layout = get_layout(layout_name)

    # Dimensioni base
    total_cells = layout.width * layout.height
    wall_cells = sum(1 for x in range(layout.width)
                      for y in range(layout.height)
                      if layout.walls[x][y])
    open_cells = total_cells - wall_cells

    # Posizioni
    pacman_pos = layout.agent_positions[0][1]
    ghost_positions = [pos[1] for pos in layout.agent_positions[1:]]
    food_positions = layout.food.as_list()

    # Distanze fantasmi
    ghost_distances = [manhattan_distance(pacman_pos, g) for g in ghost_positions]

    # Distanze cibo
    food_distances = [manhattan_distance(pacman_pos, f) for f in food_positions] if food_positions else [0]

    return {
        "Layout": layout_name,
        "Width": layout.width,
        "Height": layout.height,
        "Celle_Totali": total_cells,
        "Celle_Libere": open_cells,
        "Celle_Muro": wall_cells,
        "Wall_occupancy_%": round(wall_cells / total_cells * 100, 1),
        "Food": layout.food.count(),
        "Capsule": len(layout.capsules),
        "n_Fantasmi": len(ghost_positions),
        "Cibo_%": round(layout.food.count() / open_cells * 100, 1) if open_cells > 0 else 0,
        "Ghost_Dist_Min": min(ghost_distances),
        "Ghost_Dist_Max": max(ghost_distances),
        "Ghost_Dist_Avg": round(sum(ghost_distances) / len(ghost_distances), 1),
        "Food_Dist_Min": min(food_distances),
        "Food_Dist_Max": max(food_distances),
        "Food_Dist_Avg": round(sum(food_distances) / len(food_distances), 1)
    }

# data una lista di nomi e si costruisce un DataFrame
def create_layout_comparison(layout_names):
    data = [analyze_layout(name) for name in layout_names]
    df = pd.DataFrame(data)
    df.set_index("Layout", inplace=True)

    return df

def display_analysis(layout_names):
    # DataFrame principale
    df = create_layout_comparison(layout_names)

    print("="*80)
    print("ANALISI COMPARATIVA LAYOUT")
    print("="*80)
    print()

    # Tabella 1: Dimensioni e struttura
    print("DIMENSIONI E STRUTTURA")
    print("-"*80)
    cols_dim = ["Width", "Height", "Celle_Totali", "Celle_Libere", "Celle_Muro", "Wall_occupancy_%"]
    print(df[cols_dim].to_string())
    print()

    # Tabella 2: Risorse
    print("CIBO E AGENTI")
    print("-"*80)
    cols_res = ["Food", "Capsule", "n_Fantasmi", "Cibo_%"]
    print(df[cols_res].to_string())
    print()

    # Tabella 3: Distanze
    print("DISTANZE INIZIALI")
    print("-"*80)
    cols_dist = ["Ghost_Dist_Min", "Ghost_Dist_Avg", "Ghost_Dist_Max",
                 "Food_Dist_Min", "Food_Dist_Avg", "Food_Dist_Max"]
    print(df[cols_dist].to_string())
    print()

    # Statistiche riassuntive
    print("STATISTICHE")
    print("-"*80)

    # Confronto dimensioni
    print(f"\nRapporto dimensioni:")
    base_size = df["Celle_Totali"].iloc[0]
    for layout in df.index:
        ratio = df.loc[layout, "Celle_Totali"] / base_size
        print(f"  {layout}: {ratio:.1f}x rispetto a {df.index[0]}")

    # Difficoltà stimata
    print(f"\nDifficoltà stimata (basata su dimensione e spawn):")
    df_sorted = df.sort_values("Celle_Totali")
    for i, layout in enumerate(df_sorted.index, 1):
        size = "piccolo" if df.loc[layout, "Celle_Totali"] < 250 else \
               "medio" if df.loc[layout, "Celle_Totali"] < 500 else "grande"
        spawn = "sicuro" if df.loc[layout, "Ghost_Dist_Min"] > 5 else "pericoloso"
        print(f"  {i}. {layout}: {size}, spawn {spawn}")

    return df

In [None]:
# Layout da analizzare
layouts = ["small_classic", "medium_classic", "original_classic"]

# Analisi completa
df = display_analysis(layouts)



ANALISI COMPARATIVA LAYOUT

DIMENSIONI E STRUTTURA
--------------------------------------------------------------------------------
                  Width  Height  Celle_Totali  Celle_Libere  Celle_Muro  Wall_occupancy_%
Layout                                                                                   
small_classic        20       7           140            64          76              54.3
medium_classic       20      11           220           106         114              51.8
original_classic     28      27           756           294         462              61.1

CIBO E AGENTI
--------------------------------------------------------------------------------
                  Food  Capsule  n_Fantasmi  Cibo_%
Layout                                             
small_classic       55        2           2    85.9
medium_classic      97        2           2    91.5
original_classic   229        4           4    77.9

DISTANZE INIZIALI
-------------------------------------------

# ***FASE 3:*** progettazione dell'agente Q-learning tabellare

in questa fase viene definito il primo modello di agente che imparerà a giocare a pacman.
<br> si baserà sul Q-learning e quindi sul reinforcement learning

- **cosa è il reinforcement learning?**
<br> è un ramo dell'apprendimento automatico in cui un agente impara a compiere azioni in un ambiente per massimizzare una ricompensa. tutto viene formalizzato tramite una catena di markov con le seguenti componenti:
* S = insieme degli stati
* A = insieme delle azioni possibili
* R(s,a) = ricompensa per aver fatto l'azione a in s
* gamma = discount factor: quanto varrà in futuro

in questo caso abbiamo
* pacman come agente
* ogni configurazione (pos, cibo, fantasmi, etc) rappresenta uno stato
* le azioni (nord, sud, est, ovest)
* la ricompensa positiva è quella di mangiare il cibo e quella negativa è di morire o perdere tempo



- **cosa è il Q-learning?**
<br> è un algoritmo del RL off-policy che insegna ad un agente a comportarsi in un ambiente andando a massimizzare una ricompensa comulativa. è inoltre un algoritmo model free --> la dinamica dell'ambiente non viene resa nota all'agente a priori.
<br> il Q-learning in particolare si basa sull'equazione di bellman che è quella che andrà massimizzata

$$
Q(s,a) \leftarrow (1 - \alpha)\, Q(s,a) + \alpha \left[ r + \gamma \max_{a'} Q(s', a') \right]
$$





---


bene ora più o meno siamo pronti per iniziare a definire la classe del nostro agente e successivamente definiremo anche la funzione di training.

nel repo di github non sono presenti agenti costruiti basati sul Q-learning o RL quindi dovremo implementare il tutto da 0.





---
classe agente tabellare:
<br> vengono usate funzioni derivate dalla classe Agent definita nel rispettivo file della repo


* **alpha: learning rate** = quanto velocemente l'agente aggiorna i Q-values dopo aver osservato una transizione. più alto = più "apprende"
* **gamma: discount factor** = quanto valore assegna alle ricompense future rispetto a quelle immediate (vicino a 0: reward immediato)
* **epsilon: esplorazione** = probabilità con cui l'agente fa una mossa casuale (ε-greedy). Serve per evitare di fissarsi troppo presto su una policy non ottimale.
* **q_values** = dizionario che memorizza Q(s,a) per ogni coppia stato-azione osservata. Usa Counter per gestire valori di default a 0.


la tupla che ritorna la funzione _key() è importante perché è una tupla che restituisce:
* in quali direzioni per avvicinarsi al cibo
* in quali direzioni ci sono i pericoli dei fantasmi
* quanto si è lontani dal fantasma più vicino

è una rappresentazione che riduce lo stato continuo ad uno stato discreto in modo da poterlo utilizzare dopo per apprendere i Q-values

* **util.flip_coin()** --> funzione della libreria util che ritorna True con probabilità epsilon e False con 1-epsilon


In [None]:

# ---- IMPLEMENTAZIONE AGENTE Q-LEARNING TABELLARE ----

class QLearningAgent1(Agent):
    def __init__(self, alpha=0.5, gamma=0.99, epsilon=0.1, verbose= False):
        self.q_values = util.Counter()
        self.alpha = float(alpha)
        self.gamma = float(gamma)
        self.epsilon = float(epsilon)
        self.episode_count = 0
        self.verbose = verbose

    def _key(self, state):
        # posizione pacman, cibo e fantasmi
        pacman_pos = state.get_pacman_position()
        food = state.get_food().as_list()
        ghosts = state.get_ghost_positions()
        legal = state.get_legal_actions(0)

        # Trova cibo più vicino
        closest_food_dir = (0, 0, 0, 0)  # N, E, S, W
        if food:
            target_food = min(food, key=lambda f: manhattan_distance(pacman_pos, f))
            closest_food_dir = self._get_direction_vector(pacman_pos, target_food)

        # fantasmi
        ghost_danger = [0, 0, 0, 0]  # pericolo in ogni direzione
        closest_ghost_dist = 999

        for ghost_pos in ghosts:
            dist = manhattan_distance(pacman_pos, ghost_pos)
            closest_ghost_dist = min(closest_ghost_dist, dist)

            # Se il fantasma è vicino (entro 3 celle), marca le direzioni pericolose
            if dist <= 3:
                directions = ['North', 'East', 'South', 'West']
                for i, dir in enumerate(directions):
                    if dir not in legal:
                        continue
                    successor = state.generate_successor(0, dir) # simuliamo la mossa
                    new_pos = successor.get_pacman_position()
                    new_dist = manhattan_distance(new_pos, ghost_pos)

                    # se con la mossa ci avviciniamo al fantasma, aumenta il pericolo
                    if new_dist < dist:
                        ghost_danger[i] = 1

        # discretizzazione della distanza dal fantasma più vicino
        ghost_dist_bucket = 0
        if closest_ghost_dist <= 1:
            ghost_dist_bucket = 0  # pericolo immediato
        elif closest_ghost_dist <= 3:
            ghost_dist_bucket = 1  # vicino
        elif closest_ghost_dist <= 5:
            ghost_dist_bucket = 2  # medio
        else:
            ghost_dist_bucket = 3  # lontano/sicuro

        return (closest_food_dir, tuple(ghost_danger), ghost_dist_bucket)

# Ritorna (N, E, S, W) dove 1 indica che andare in quella direzione avvicina al target
    def _get_direction_vector(self, from_pos, to_pos):
        dx = to_pos[0] - from_pos[0]
        dy = to_pos[1] - from_pos[1]

        north = 1 if dy > 0 else 0
        south = 1 if dy < 0 else 0
        east = 1 if dx > 0 else 0
        west = 1 if dx < 0 else 0

        return (north, east, south, west)
# Restituisce Q(state, action)
    def getQValue(self, state, action):
        return self.q_values[(self._key(state), action)]
# Restituisce max_a Q(state, a)
    def getValue(self, state):
        legal_actions = state.get_legal_actions(0)
        if not legal_actions:
            return 0.0
        return max([self.getQValue(state, action) for action in legal_actions])
# Restituisce la migliore azione secondo la policy corrente
# dato uno stato restituisce l'azione con il valore Q più alto
    def getPolicy(self, state):
        legal = state.get_legal_actions(0) # azioni legali
        if not legal:
            return None

        q_vals = [(a, self.getQValue(state, a)) for a in legal] # valore Q(s,a)
        best = max(v for _, v in q_vals)
        best_actions = [a for a, v in q_vals if v == best]
        return random.choice(best_actions)
# sceglie effettivamente l'azione da fare nel gioco
# implementata con la greedy policy però in modo da evitare mosse suicide
    def get_action(self, state):
        legal = state.get_legal_actions(0)
        if not legal:
            return None

        # euristica di sopravvivenza: esclude mosse suicide
        safe_actions = self._get_safe_actions(state, legal)

        # Se non ci sono mosse sicure, usiamo tutte le legali (siamo disperati qui)
        if not safe_actions:
            safe_actions = legal

        # Esplorazione solo tra mosse sicure
        if util.flip_coin(self.epsilon):
            return random.choice(safe_actions)

        # Exploitation: scegli la migliore tra le mosse sicure
        q_vals = [(a, self.getQValue(state, a)) for a in safe_actions]
        best = max(v for _, v in q_vals)
        best_actions = [a for a, v in q_vals if v == best]
        return random.choice(best_actions)
# Filtra le azioni, euristica difensiva
    def _get_safe_actions(self, state, legal_actions):
        pacman_pos = state.get_pacman_position()
        ghosts = state.get_ghost_positions()
        safe = []

        for action in legal_actions:
            successor = state.generate_successor(0, action)
            new_pos = successor.get_pacman_position()

            # Controlliamo se la nuova posizione è troppo vicina a un fantasma
            is_safe = True
            for ghost_pos in ghosts:
                if manhattan_distance(new_pos, ghost_pos) <= 1:
                    is_safe = False
                    break

            if is_safe:
                safe.append(action)

        return safe
# Aggiorna Q(state, action) usando l'equazione di Bellman standard
# verrà chiamata ogni volta che avviene una transizione
    def update(self, state, action, nextState, reward):
        # Bellman update standard - niente reward shaping qui
        next_legal = nextState.get_legal_actions(0)
        next_best = 0.0 if not next_legal else max(
            self.getQValue(nextState, a) for a in next_legal
        )
        target = reward + self.gamma * next_best

        old = self.getQValue(state, action)
        self.q_values[(self._key(state), action)] = old + self.alpha * (target - old)
# funzione di interfaccia tra il framework e l’agente
    def observeTransition(self, state, action, nextState, deltaReward):
        self.update(state, action, nextState, deltaReward)

# Chiamata all'inizio di ogni episodio
    def startEpisode(self):
        self.episode_count += 1
        if self.verbose and self.episode_count % 10 == 0:
            print(f"\n=== EPISODE {self.episode_count} ===")
# Chiamata alla fine di ogni episodio
    def stopEpisode(self):
        if self.verbose and self.episode_count % 10 == 0:
            print(f"Episode {self.episode_count} ended - Q-values learned: {len(self.q_values)}")
# Aggiorna epsilon durante il training
    def set_epsilon(self, eps):
        self.epsilon = float(eps)
# Aggiorna learning rate durante il training
    def set_alpha(self, alpha):
        self.alpha = float(alpha)



---

di seguito vengono definite le funzioni di training e di test

In [None]:
# ---- FUNZIONE DI TRAINING ----

def train_with_progress1(
    agent,
    num_episodes=100,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_schedule=None,
    seed=None,
    print_every=10,
    max_steps=1000,
    use_powerup_rewards=False
):

# # impostiamo il seed per la riproducibilità, considerando le scelte casuali
    if seed is not None:
        import random
        random.seed(seed)
# carichiamo il layout e le regole di gioco
    layout = get_layout(layout_name)
    rules = ClassicGameRules(timeout=0)
# inizializziamo
    wins = 0
    deaths = 0
    results = []
    recent_wins = []
    recent_scores = []
    recent_steps = []

# ogni iterazione indica un episodio completo
    for ep in range(1, num_episodes + 1):
        agent.startEpisode()

        # Epsilon decay automatico se non viene specificato
        if epsilon_schedule is not None:
            agent.set_epsilon(epsilon_schedule(ep))
        elif ep > 1:  # Decay lineare automatico
            agent.set_epsilon(max(0.05, agent.epsilon * 0.995))

        ghosts = [ghost_cls(i+1) for i in range(n_ghosts)]
        game = rules.new_game(layout, agent, ghosts, display=None, quiet=True, catch_exceptions=False)
        state = game.state
        steps = 0
# fino a quando una delle condizioni non interrompe l'episodio
        while not (state.is_win() or state.is_lose()) and steps < max_steps:
            steps += 1

            # Turno Pacman
            old_score = state.get_score()
            old_food_count = state.get_food().count()

            action = agent.get_action(state)
            next_state = state.generate_successor(0, action)

            # reward shaping
            new_score = next_state.get_score()
            new_food_count = next_state.get_food().count()

            reward = new_score - old_score  # Base reward

            if not next_state.is_lose():
                reward += 2.0  # sopravvivenza

            if next_state.is_lose():
                reward -= 300  # morte

            if next_state.is_win():
                reward += 500  # vittoria

            if old_food_count == new_food_count:
                reward -= 0.5  # efficienza


            if use_powerup_rewards: # solo se sono attivi
                # BONUS: se mangia power-up
                if len(state.get_capsules()) > len(next_state.get_capsules()):
                    reward += 50

                # BONUS: se mangia fantasma spaventato
                if any(gs.scared_timer > 0 for gs in next_state.get_ghost_states()):
                    # Controlliamo se score aumenta molto (200 punti = fantasma mangiato)
                    if new_score - old_score >= 200:
                        reward += 100
# chiamiamo observeTransition() per aggiornare i Q-values (caso tabellare)
            agent.observeTransition(state, action, next_state, reward)
            state = next_state

            if state.is_win() or state.is_lose():
                break

            # Turno fantasmi
            for g_idx, ghost in enumerate(ghosts, start=1):
                g_action = ghost.get_action(state)
                state = state.generate_successor(g_idx, g_action)
                if state.is_win() or state.is_lose():
                    break

        # Statistiche finali episodio
        final_score = state.get_score()
        win = state.is_win()
        lose = state.is_lose()

        wins += 1 if win else 0
        deaths += 1 if lose else 0
        win_rate = wins / ep
        death_rate = deaths / ep
        # ci serve hasattr per poter riutilizzare la func anche con il modello neurale
        q_values = len(agent.q_values) if hasattr(agent, "q_values") else None
        results.append({
            "episode": ep,
            "score": final_score,
            "win": win,
            "lose": lose,
            "steps": steps,
            "epsilon": agent.epsilon,
            "q_values_count": q_values
        })

        recent_wins.append(1 if win else 0)
        recent_scores.append(final_score)
        recent_steps.append(steps)

        # statistiche locali
        if len(recent_wins) > print_every:
            recent_wins.pop(0)
            recent_scores.pop(0)
            recent_steps.pop(0)

        agent.stopEpisode()

        # Stampa statistiche locali
        if ep % print_every == 0:
            local_win_rate = sum(recent_wins) / len(recent_wins)
            avg_score = sum(recent_scores) / len(recent_scores)
            avg_steps = sum(recent_steps) / len(recent_steps)

            print(f"\n{'='*60}")
            print(f"Episode {ep}/{num_episodes}")
            print(f"  Win rate (ultimi {print_every}): {local_win_rate:.2%}")
            print(f"  Win rate (globale): {win_rate:.2%}")
            print(f"  Avg score: {avg_score:.1f} | Avg steps: {avg_steps:.1f}")
            print(f"  Epsilon: {agent.epsilon:.3f}")
            print(f"  Q-values appresi: {q_values}")
            print(f"{'='*60}")

    # statistiche finali
    print(f"\n{'='*60}")
    print(f"TRAINING COMPLETATO")
    print(f"{'='*60}")
    print(f"Episodi totali: {num_episodes}")
    print(f"Vittorie: {wins} ({win_rate:.2%})")
    print(f"Sconfitte: {deaths} ({death_rate:.2%})")
    print(f"Q-values finali: {q_values}")
    print(f"Epsilon finale: {agent.epsilon:.3f}")
    print(f"{'='*60}\n")

    return results


In [None]:
# ---- FUNZIONE DI UTILITÀ PER TESTARE L'AGENTE ADDESTRATO ----

def test_agent(agent, num_episodes=10, layout_name="small_classic", n_ghosts=1, ghost_cls=RandomGhost):
    original_epsilon = agent.epsilon
    agent.set_epsilon(0.0)  # nessuna exploration durante il test

    layout = get_layout(layout_name)
    rules = ClassicGameRules(timeout=0)

    wins = 0
    total_score = 0

    print(f"\n{'='*60}")
    print(f"TESTING AGENT (epsilon=0, greedy policy)")
    print(f"{'='*60}\n")

    for ep in range(1, num_episodes + 1):
        ghosts = [ghost_cls(i+1) for i in range(n_ghosts)]
        game = rules.new_game(layout, agent, ghosts, display=None, quiet=True, catch_exceptions=False)
        state = game.state
        steps = 0

        while not (state.is_win() or state.is_lose()) and steps < 1000:
            steps += 1
            # turno pacman
            action = agent.get_action(state)
            state = state.generate_successor(0, action)

            if state.is_win() or state.is_lose():
                break
            # turno fantasmi
            for g_idx, ghost in enumerate(ghosts, start=1):
                g_action = ghost.get_action(state)
                state = state.generate_successor(g_idx, g_action)
                if state.is_win() or state.is_lose():
                    break

        win = state.is_win()
        score = state.get_score()
        wins += 1 if win else 0
        total_score += score
        #statistica del singolo episodio
        result = "WIN" if win else "LOSE"
        print(f"Test {ep}/{num_episodes}: {result} | Score: {score} | Steps: {steps}")
    # statistiche totali
    win_rate = wins / num_episodes
    avg_score = total_score / num_episodes

    print(f"\n{'='*60}")
    print(f"TEST RESULTS")
    print(f"  Win rate: {win_rate:.2%} ({wins}/{num_episodes})")
    print(f"  Avg score: {avg_score:.1f}")
    print(f"{'='*60}\n")

    agent.set_epsilon(original_epsilon)  # Ripristina epsilon originale
    return win_rate, avg_score

In [None]:
# Creiamo l'agente
agent = QLearningAgent1(alpha=0.3, gamma=0.9, epsilon=0.3, verbose=False)

# training
results = train_with_progress1(
    agent,
    num_episodes=300,
    n_ghosts=1,
    ghost_cls=RandomGhost,
    print_every=50
)


Episode 50/300
  Win rate (ultimi 50): 72.00%
  Win rate (globale): 72.00%
  Avg score: 435.1 | Avg steps: 284.7
  Epsilon: 0.235
  Q-values appresi: 536

Episode 100/300
  Win rate (ultimi 50): 86.00%
  Win rate (globale): 79.00%
  Avg score: 613.8 | Avg steps: 296.2
  Epsilon: 0.183
  Q-values appresi: 546

Episode 150/300
  Win rate (ultimi 50): 92.00%
  Win rate (globale): 83.33%
  Avg score: 726.7 | Avg steps: 239.9
  Epsilon: 0.142
  Q-values appresi: 548

Episode 200/300
  Win rate (ultimi 50): 92.00%
  Win rate (globale): 85.50%
  Avg score: 759.1 | Avg steps: 202.1
  Epsilon: 0.111
  Q-values appresi: 552

Episode 250/300
  Win rate (ultimi 50): 88.00%
  Win rate (globale): 86.00%
  Avg score: 675.3 | Avg steps: 259.3
  Epsilon: 0.086
  Q-values appresi: 552

Episode 300/300
  Win rate (ultimi 50): 94.00%
  Win rate (globale): 87.33%
  Avg score: 840.5 | Avg steps: 134.5
  Epsilon: 0.067
  Q-values appresi: 552

TRAINING COMPLETATO
Episodi totali: 300
Vittorie: 262 (87.33%)
S

**NOTA:** se volessimo considerare dei risultati con meno varianza si potrebbe condurre più chiamate dei test e fare poi una media tra i win rate ottenuti

In [None]:
# test con 1 fantasma random ghost in small_classic
test_agent(agent, num_episodes=30,
           n_ghosts=1, ghost_cls=RandomGhost,
           layout_name="small_classic")


TESTING AGENT (epsilon=0, greedy policy)

Test 1/30: LOSE | Score: -510.0 | Steps: 1000
Test 2/30: WIN | Score: 917.0 | Steps: 133
Test 3/30: LOSE | Score: -375.0 | Steps: 165
Test 4/30: WIN | Score: 931.0 | Steps: 119
Test 5/30: WIN | Score: 211.0 | Steps: 839
Test 6/30: WIN | Score: 926.0 | Steps: 124
Test 7/30: WIN | Score: 720.0 | Steps: 330
Test 8/30: WIN | Score: 915.0 | Steps: 135
Test 9/30: WIN | Score: 947.0 | Steps: 103
Test 10/30: LOSE | Score: -443.0 | Steps: 183
Test 11/30: WIN | Score: 305.0 | Steps: 745
Test 12/30: WIN | Score: 606.0 | Steps: 444
Test 13/30: WIN | Score: 683.0 | Steps: 367
Test 14/30: WIN | Score: 712.0 | Steps: 338
Test 15/30: WIN | Score: 931.0 | Steps: 119
Test 16/30: WIN | Score: 880.0 | Steps: 170
Test 17/30: WIN | Score: 966.0 | Steps: 84
Test 18/30: WIN | Score: 524.0 | Steps: 526
Test 19/30: WIN | Score: 933.0 | Steps: 117
Test 20/30: WIN | Score: 869.0 | Steps: 181
Test 21/30: LOSE | Score: -591.0 | Steps: 331
Test 22/30: WIN | Score: 970.0 | S

(0.8666666666666667, 620.0)



---
di seguito vedremo:
* caso con un 1 directional ghost su small_classic
* caso con un 1 random ghost ma su un altro labirinto (es. medium_classic) per vedere come variano le statistiche



In [None]:
# Creiamo un nuovo agente
agent = QLearningAgent1(alpha=0.3, gamma=0.9, epsilon=0.3, verbose=False)
print("agente caricato con successo")

agente caricato con successo


In [None]:
# FASE 1: Pre-training con RandomGhost
print("\n" + "="*70)
print("FASE 1: Pre-training con RandomGhost")
print("="*70)

results_R = train_with_progress1(agent,
                     num_episodes=300,
                     n_ghosts=1,
                     ghost_cls=RandomGhost,
                     layout_name="small_classic",
                     print_every=50)



FASE 1: Pre-training con RandomGhost

Episode 50/300
  Win rate (ultimi 50): 70.00%
  Win rate (globale): 70.00%
  Avg score: 390.3 | Avg steps: 356.3
  Epsilon: 0.235
  Q-values appresi: 533

Episode 100/300
  Win rate (ultimi 50): 90.00%
  Win rate (globale): 80.00%
  Avg score: 619.2 | Avg steps: 331.6
  Epsilon: 0.183
  Q-values appresi: 541

Episode 150/300
  Win rate (ultimi 50): 82.00%
  Win rate (globale): 80.67%
  Avg score: 610.7 | Avg steps: 254.7
  Epsilon: 0.142
  Q-values appresi: 543

Episode 200/300
  Win rate (ultimi 50): 90.00%
  Win rate (globale): 83.00%
  Avg score: 707.2 | Avg steps: 235.2
  Epsilon: 0.111
  Q-values appresi: 554

Episode 250/300
  Win rate (ultimi 50): 80.00%
  Win rate (globale): 82.40%
  Avg score: 642.5 | Avg steps: 183.1
  Epsilon: 0.086
  Q-values appresi: 554

Episode 300/300
  Win rate (ultimi 50): 94.00%
  Win rate (globale): 84.33%
  Avg score: 840.8 | Avg steps: 146.4
  Epsilon: 0.067
  Q-values appresi: 554

TRAINING COMPLETATO
Episod

In [None]:
# FASE 2: 1 DirectionalGhost
print("\n" + "="*70)
print("FASE 2: 1 DirectionalGhost")
print("="*70)

results_D= train_with_progress1(agent,
                     num_episodes=500,
                     n_ghosts=1,
                     ghost_cls=DirectionalGhost,
                     layout_name="small_classic",
                     print_every=50)


FASE 2: 1 DirectionalGhost

Episode 50/500
  Win rate (ultimi 50): 74.00%
  Win rate (globale): 74.00%
  Avg score: 617.1 | Avg steps: 129.3
  Epsilon: 0.050
  Q-values appresi: 558

Episode 100/500
  Win rate (ultimi 50): 76.00%
  Win rate (globale): 75.00%
  Avg score: 653.6 | Avg steps: 121.8
  Epsilon: 0.050
  Q-values appresi: 558

Episode 150/500
  Win rate (ultimi 50): 72.00%
  Win rate (globale): 74.00%
  Avg score: 575.3 | Avg steps: 142.7
  Epsilon: 0.050
  Q-values appresi: 558

Episode 200/500
  Win rate (ultimi 50): 76.00%
  Win rate (globale): 74.50%
  Avg score: 618.0 | Avg steps: 142.4
  Epsilon: 0.050
  Q-values appresi: 558

Episode 250/500
  Win rate (ultimi 50): 82.00%
  Win rate (globale): 76.00%
  Avg score: 701.2 | Avg steps: 134.6
  Epsilon: 0.050
  Q-values appresi: 558

Episode 300/500
  Win rate (ultimi 50): 78.00%
  Win rate (globale): 76.33%
  Avg score: 672.6 | Avg steps: 120.8
  Epsilon: 0.050
  Q-values appresi: 558

Episode 350/500
  Win rate (ultimi 5

In [None]:

# TEST con directional ghost
print("\n" + "="*70)
print("TEST: 1 DirectionalGhost")
print("="*70)
test1_wr, test1_score = test_agent(agent,
                                    num_episodes=50,
                                    n_ghosts=1,
                                    ghost_cls=DirectionalGhost,
                                    layout_name="small_classic")


# statistiche finali
print("\n" + "="*70)
print("statistiche finali")
print("="*70)
print(f"Q-values appresi: {len(agent.q_values)}")
print(f"Epsilon finale: {agent.epsilon:.3f}")
print(f"\nRisultati Test:")
print(f"  1 DirectionalGhost: {test1_wr:.1%} win rate, {test1_score:.1f} avg score")




TEST: 1 DirectionalGhost

TESTING AGENT (epsilon=0, greedy policy)

Test 1/50: WIN | Score: 961.0 | Steps: 89
Test 2/50: WIN | Score: 901.0 | Steps: 149
Test 3/50: WIN | Score: 954.0 | Steps: 96
Test 4/50: WIN | Score: 950.0 | Steps: 100
Test 5/50: WIN | Score: 973.0 | Steps: 77
Test 6/50: WIN | Score: 942.0 | Steps: 108
Test 7/50: WIN | Score: 932.0 | Steps: 118
Test 8/50: WIN | Score: 925.0 | Steps: 125
Test 9/50: WIN | Score: 737.0 | Steps: 313
Test 10/50: WIN | Score: 870.0 | Steps: 180
Test 11/50: WIN | Score: 942.0 | Steps: 108
Test 12/50: WIN | Score: 871.0 | Steps: 179
Test 13/50: WIN | Score: 944.0 | Steps: 106
Test 14/50: WIN | Score: 959.0 | Steps: 91
Test 15/50: WIN | Score: 924.0 | Steps: 126
Test 16/50: WIN | Score: 937.0 | Steps: 113
Test 17/50: LOSE | Score: -81.0 | Steps: 111
Test 18/50: WIN | Score: 915.0 | Steps: 135
Test 19/50: WIN | Score: 973.0 | Steps: 77
Test 20/50: WIN | Score: 906.0 | Steps: 144
Test 21/50: WIN | Score: 976.0 | Steps: 74
Test 22/50: WIN | Sco



---

**NOTA:** solo per indagine statistica e per vericare la stabilità del modello testiamo lo stesso agente già addestrato su più seed diversi

In [None]:
# Testiamo lo stesso agente con configurazioni iniziali diverse
import random

for seed in [42, 123, 456]:
    random.seed(seed)  # Cambia spawn iniziali
    wr, _ = test_agent(agent, num_episodes=50,
                       ghost_cls=DirectionalGhost)
    print(f"Seed {seed}: {wr:.1%}")


TESTING AGENT (epsilon=0, greedy policy)

Test 1/50: WIN | Score: 946.0 | Steps: 104
Test 2/50: WIN | Score: 956.0 | Steps: 94
Test 3/50: WIN | Score: 957.0 | Steps: 93
Test 4/50: LOSE | Score: -500.0 | Steps: 1000
Test 5/50: WIN | Score: 916.0 | Steps: 134
Test 6/50: LOSE | Score: -199.0 | Steps: 59
Test 7/50: WIN | Score: 968.0 | Steps: 82
Test 8/50: WIN | Score: 949.0 | Steps: 101
Test 9/50: WIN | Score: 983.0 | Steps: 67
Test 10/50: WIN | Score: 865.0 | Steps: 185
Test 11/50: WIN | Score: 921.0 | Steps: 129
Test 12/50: WIN | Score: 930.0 | Steps: 120
Test 13/50: WIN | Score: 916.0 | Steps: 134
Test 14/50: WIN | Score: 911.0 | Steps: 139
Test 15/50: WIN | Score: 884.0 | Steps: 166
Test 16/50: WIN | Score: 942.0 | Steps: 108
Test 17/50: WIN | Score: 927.0 | Steps: 123
Test 18/50: WIN | Score: 924.0 | Steps: 126
Test 19/50: WIN | Score: 894.0 | Steps: 156
Test 20/50: WIN | Score: 924.0 | Steps: 126
Test 21/50: WIN | Score: 937.0 | Steps: 113
Test 22/50: WIN | Score: 936.0 | Steps: 11



---

testiamo il nostro agente su di un layout diverso (solo con il random).
<br> vedremo:
* utilizziamo lo stesso agente --> testiamo la capacità di generalizzazione, quindi facciamo un test sulla robustesta

In [None]:
# --- stesso agente ---
print("\n" + "="*70)
print("TEST: 1 random ghost, medium_classic")
print("="*70)
test2_wr, test2_score = test_agent(agent,
                                    num_episodes=50,
                                    n_ghosts=1,
                                    ghost_cls=DirectionalGhost,
                                    layout_name="medium_classic")


TEST: 1 random ghost, medium_classic

TESTING AGENT (epsilon=0, greedy policy)

Test 1/50: WIN | Score: 1289.0 | Steps: 181
Test 2/50: WIN | Score: 781.0 | Steps: 689
Test 3/50: LOSE | Score: 173.0 | Steps: 197
Test 4/50: WIN | Score: 1300.0 | Steps: 170
Test 5/50: WIN | Score: 1213.0 | Steps: 257
Test 6/50: WIN | Score: 1262.0 | Steps: 208
Test 7/50: WIN | Score: 1019.0 | Steps: 451
Test 8/50: LOSE | Score: -115.0 | Steps: 145
Test 9/50: WIN | Score: 1239.0 | Steps: 231
Test 10/50: WIN | Score: 1041.0 | Steps: 429
Test 11/50: WIN | Score: 1223.0 | Steps: 247
Test 12/50: WIN | Score: 1110.0 | Steps: 360
Test 13/50: WIN | Score: 1227.0 | Steps: 243
Test 14/50: WIN | Score: 1240.0 | Steps: 230
Test 15/50: WIN | Score: 811.0 | Steps: 659
Test 16/50: WIN | Score: 1131.0 | Steps: 339
Test 17/50: WIN | Score: 1035.0 | Steps: 435
Test 18/50: WIN | Score: 840.0 | Steps: 630
Test 19/50: WIN | Score: 1233.0 | Steps: 237
Test 20/50: WIN | Score: 1291.0 | Steps: 179
Test 21/50: WIN | Score: 1152.

In [None]:
# --- stesso agente ---
print("\n" + "="*70)
print("TEST: 1 random ghost, original_classic")
print("="*70)
test3_wr, test3_score = test_agent(agent,
                                    num_episodes=50,
                                    n_ghosts=1,
                                    ghost_cls=DirectionalGhost,
                                    layout_name="original_classic")


TEST: 1 random ghost, original_classic

TESTING AGENT (epsilon=0, greedy policy)

Test 1/50: LOSE | Score: 1060.0 | Steps: 1000
Test 2/50: WIN | Score: 1936.0 | Steps: 854
Test 3/50: LOSE | Score: 650.0 | Steps: 1000
Test 4/50: LOSE | Score: 1110.0 | Steps: 1000
Test 5/50: LOSE | Score: 630.0 | Steps: 1000
Test 6/50: LOSE | Score: 682.0 | Steps: 448
Test 7/50: LOSE | Score: 790.0 | Steps: 480
Test 8/50: LOSE | Score: 550.0 | Steps: 580
Test 9/50: LOSE | Score: 1140.0 | Steps: 1000
Test 10/50: LOSE | Score: 1030.0 | Steps: 1000
Test 11/50: LOSE | Score: 630.0 | Steps: 1000
Test 12/50: LOSE | Score: 373.0 | Steps: 297
Test 13/50: LOSE | Score: 1100.0 | Steps: 1000
Test 14/50: LOSE | Score: 1100.0 | Steps: 1000
Test 15/50: LOSE | Score: 512.0 | Steps: 568
Test 16/50: LOSE | Score: 630.0 | Steps: 1000
Test 17/50: WIN | Score: 1962.0 | Steps: 828
Test 18/50: LOSE | Score: 630.0 | Steps: 1000
Test 19/50: WIN | Score: 2282.0 | Steps: 508
Test 20/50: WIN | Score: 2239.0 | Steps: 551
Test 21/5



---

questo training sotto (e relativo test) è solo per provare come sarebbe il modello se venissero introdotti due fatasmi nella partita

In [None]:
print("\nFASE 3: Training con 2 DirectionalGhost")
results_2D = train_with_progress1(agent,
                     num_episodes=500,
                     n_ghosts=2,
                     ghost_cls=DirectionalGhost,
                     layout_name="small_classic",
                     print_every=50)

# Test con seed diversi
for seed in [42, 128, 456]:
    import random
    random.seed(seed)
    wr, _ = test_agent(agent, num_episodes=50,
                       n_ghosts=2, ghost_cls=DirectionalGhost)
    print(f"Seed {seed}: {wr:.1%}")


FASE 3: Training con 2 DirectionalGhost

Episode 50/500
  Win rate (ultimi 50): 16.00%
  Win rate (globale): 16.00%
  Avg score: -93.8 | Avg steps: 59.6
  Epsilon: 0.050
  Q-values appresi: 763

Episode 100/500
  Win rate (ultimi 50): 6.00%
  Win rate (globale): 11.00%
  Avg score: -218.3 | Avg steps: 46.3
  Epsilon: 0.050
  Q-values appresi: 825

Episode 150/500
  Win rate (ultimi 50): 12.00%
  Win rate (globale): 11.33%
  Avg score: -124.7 | Avg steps: 60.3
  Epsilon: 0.050
  Q-values appresi: 880

Episode 200/500
  Win rate (ultimi 50): 6.00%
  Win rate (globale): 10.00%
  Avg score: -237.5 | Avg steps: 47.9
  Epsilon: 0.050
  Q-values appresi: 904

Episode 250/500
  Win rate (ultimi 50): 4.00%
  Win rate (globale): 8.80%
  Avg score: -249.5 | Avg steps: 50.3
  Epsilon: 0.050
  Q-values appresi: 914

Episode 300/500
  Win rate (ultimi 50): 10.00%
  Win rate (globale): 9.00%
  Avg score: -176.8 | Avg steps: 49.4
  Epsilon: 0.050
  Q-values appresi: 925

Episode 350/500
  Win rate (u

# ***FASE 4:*** PROGETTAZIONE AGENTE Q-LEARNING MIGLIORATO -> **DQN**

In [None]:
# ---- LIBRERIE NECESSARIE ----
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
import numpy as np
import random
from collections import deque, namedtuple

In [None]:
# ---- DEFINIZIONE DELLA DQN PER PACMAN ----
# struttura MLP profonda con 3 hidden layer: 128 --> 128 --> 64 -> 32 -> 4 e layernorm dopo ogni layer
# ho scelto layernorm invece di batchnorm perché è più stabile con piccoli batch
# inizializzazione con xavier che di solito viene usata insieme alla relu
# di default ho messo che lo state è un vettore di 33 dimensioni
# è la rete neurale vera e propria, prende in input una rappresentazione numerica dello stato di pacman del labirinto
# restituisce i Q-values per ciascuna delle azioni possibili
# q-values = vettore di 4 elementi uno per ogni azioni e rappresenta quanto è "buono" prendere quella direzione (nord, sud, est, ovest)
class PacmanDQN(nn.Module):

    def __init__(self, state_size=33, action_size=4, hidden_size=128):
        super().__init__()

        self.fc1 = nn.Linear(state_size, hidden_size) # 33 -> 128
        self.fc2 = nn.Linear(hidden_size, hidden_size) # 128 -> 128
        self.fc3 = nn.Linear(hidden_size, hidden_size // 2 ) # 128 ->64
        self.fc4 = nn.Linear(hidden_size//2, hidden_size // 4 ) # 64 -> 32
        self.fc5 = nn.Linear(hidden_size // 4, action_size) # 32 -> 4

        # BatchNorm invece di Dropout (più stabile per DQN)
        self.bn1 = nn.LayerNorm(hidden_size)
        self.bn2 = nn.LayerNorm(hidden_size)
        self.bn3 = nn.LayerNorm(hidden_size // 2)
        self.bn4 = nn.LayerNorm(hidden_size // 4)

        self._initialize_weights()

# inizializzazione Xavier per convergenza più veloce
    def _initialize_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0.0)


    def forward(self, state):

        # Auto-reshape per singolo stato
        if state.dim() == 1:
            state = state.unsqueeze(0)
            squeeze = True
        else:
            squeeze = False

        # Forward pass
        x = self.bn1(F.relu(self.fc1(state)))
        x = self.bn2(F.relu(self.fc2(x)))
        x = self.bn3(F.relu(self.fc3(x)))
        x = self.bn4(F.relu(self.fc4(x)))
        q_values = self.fc5(x)

        # Rimuoviamo batch dim se era singolo stato
        if squeeze:
            q_values = q_values.squeeze(0)

        return q_values



# ---- REPLAY BUFFER ----

# è importante che done_batch vada a gestire correttamente i futuri reward altrimenti il modello potrebbe apprendere stati che non ci sono più
# la capacità del buffer messa per default a 10k, si potrebbe pensare di ampliarla a 50k dipende anche dagli step medi che fa
# è una struttura di memoria, serve a salvare e riutilizzare esperienze passate, ovvero le transizioni che pacman ha vissuto durante il gioco
# una transition è una tupla che ci dice "ero nello stato s, ho fatto l’azione a, sono finito nello stato s', ho ricevuto il reward r, e magari il gioco è finito (done=True)"
# ci serve per far sì che pacman non usi esperienze molto correlate tra loro (magari facenti parte dello stesso ep) perché causerebbe instabilità e inefficienza per vari motivi
# con sample(batch) si prendono esperienze da momenti differenti
# la grandezza del batch è un iperparamentro che influenza la stabilità e la velocità di apprendimento. il batch_size è il numero di transizioni che vengono prelevate dal replay buffer per ogni aggiornamento della rete
# ogni batch è un mini-dataset diciamo, dove viene calcolata la loss e vengono aggiornati i pesi
# valori tipici 32, 64, 128 (in questo caso la scelta è fra i primi due)
# diciamo che possiamo vederlo come la "memoria dell'agente"
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)
        self._capacity = capacity


    def capacity(self):
        return self._capacity

    def push(self, state, action, next_state, reward, done):
        self.buffer.append(Transition(state, action, next_state, reward, done))
# preparazione dei dati per aggiornare la rete neurale
    def sample(self, batch_size):
      # selezioniamo batch_size transizioni a caso dal buffer
        transitions = random.sample(self.buffer, batch_size)
        # definizione di un oggetto transition
        batch = Transition(*zip(*transitions))

        # Stack in tensori
        state_batch = torch.stack(batch.state)
        action_batch = torch.tensor(batch.action, dtype=torch.long)
        next_state_batch = torch.stack(batch.next_state)
        reward_batch = torch.tensor(batch.reward, dtype=torch.float32)
        done_batch = torch.tensor(batch.done, dtype=torch.float32)

        return state_batch, action_batch, next_state_batch, reward_batch, done_batch

    def __len__(self):
        return len(self.buffer)

In [None]:
# ---- AGENTE Q-LEARNING DQN ----
class DQNAgent(Agent):

# costruttore, inizializza i paramentri di apprendimento, crea la rete neurale (policy + target), inizializza il replay buffer, parametri ausiliari etc
# viene usato adam come ottimizzatore, classico nelle DQN
# distinzione tra policy_net e target_net permette l'uso di una double dqn
# loss scelta: SmoothL1Loss (rispetto ad mse)
# valori proposti:
      # learning rate: sotto 0.0003, anche se per ora 0.0005 è una soluzione ragionevole --> quanto velocemente la rete modifica i propri pesi dopo ogni aggiornamento
      # tau al posto del target update con valori da provare 0.005 (è quello standard e consigliato), 0.001 molta stabilità ma molto lento
      # epsilon decay per ora ha decadimento esponenziale, si può pensare di cambiarlo in modo lineare --> quanto velocemente passa da esplorazione casuale a sfruttamento
      # si consiglia 0.99 o 0.985 e decadimento lineare
    def __init__(self, state_size=33, action_size=4,
                 alpha=0.0005,  # learning_rate
                 gamma=0.99,
                 epsilon=1.0,
                 epsilon_end=0.01,
                 epsilon_decay=0.995,
                 buffer_size=10000,
                 batch_size=64,
                 tau = 0.005,
                 device=None,
                 verbose=False
                ):

        # Parametri base
        self.gamma = float(gamma)
        self.epsilon_end = float(epsilon_end)
        self.epsilon_decay = float(epsilon_decay)
        self.epsilon = float(epsilon)
        self.verbose = verbose
        self.episode_count = 0


        # Parametri DQN
        self.state_size = state_size
        self.action_size = action_size
        self.batch_size = batch_size
        self.tau = tau
        self.learning_rate = alpha

        # Device
        self.device = device if device else torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Reti neurali
        self.policy_net = PacmanDQN(state_size, action_size).to(self.device)
        self.target_net = PacmanDQN(state_size, action_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval() # la target_net non deve mai essere allenata, quindi è corretto tenerla sempre in .eval().

        # Optimizer
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        self.criterion = nn.SmoothL1Loss() # CAMBIATA LA LOSS PRIMA ERA  nn.MSELoss()

        # Replay buffer
        self.memory = ReplayBuffer(buffer_size)

        # Contatori
        self.steps_done = 0

        # altro che serve
        self.last_scared_timer = None
        self.ate_in_this_scared = False
        self.power_streak = 0

        # Mapping azioni (Berkeley framework)
        self.action_to_idx = {'North': 0, 'East': 1, 'South': 2, 'West': 3}
        self.idx_to_action = {v: k for k, v in self.action_to_idx.items()}

        # Info layout (per normalizzazione)
        self.initial_food_count = None
        self.layout_width = None
        self.layout_height = None

    # ---- METODI COPIATI DA Q-LEARNING TABELLARE ----
# codifica la direzione verso un target (food, capsule) in 4 valori binari (N, E, S, W)
# otteniamo un'info orientata sul labirinto
    def _get_direction_vector(self, from_pos, to_pos):
        dx = to_pos[0] - from_pos[0]
        dy = to_pos[1] - from_pos[1]

        north = 1 if dy > 0 else 0
        south = 1 if dy < 0 else 0
        east = 1 if dx > 0 else 0
        west = 1 if dx < 0 else 0

        return (north, east, south, west)

# evitare mosse suicide filtrando quelle troppo vicine ai fantasmi, è utile perché stabilizza l'apprendimento

    def _get_safe_actions(self, state, legal_actions):
        ghost_states = state.get_ghost_states()
        safe = []

        for action in legal_actions:
            successor = state.generate_successor(0, action)
            new_pos = successor.get_pacman_position()

            is_safe = True
            for ghost_state in ghost_states:
                ghost_pos = ghost_state.configuration.pos  # estraiamo posizione

                # Fantasmi spaventati non sono pericolosi
                if ghost_state.scared_timer > 0:
                    continue

                # Evitiamo celle adiacenti a un fantasma attivo
                if manhattan_distance(new_pos, ghost_pos) <= 1:
                    is_safe = False
                    break

            if is_safe:
                safe.append(action)

        return safe

    # ---- METODI DQN SPECIFICI ----
# info sulla mappa, viene chiamato una volta per episodio
# riporta informazioni statiche sul labirinto
    def _init_layout_info(self, state):
        if self.initial_food_count is None:
            self.initial_food_count = state.get_food().count()

            self.initial_capsule_count = len(state.get_capsules()) # capsule iniziali che il fantasma può magiare

            food_list = state.get_food().as_list()
            if food_list:
                self.layout_width = max(f[0] for f in food_list) + 1
                self.layout_height = max(f[1] for f in food_list) + 1
            else:
                self.layout_width = 20
                self.layout_height = 20

# quando non ci sono azioni sicure, entro un raggio di pericolo di fantasmi attivi
    def _fallback_maximize_distance(self, state, legal_actions, active_ghosts):
        if not active_ghosts:
            return legal_actions
# posizione corrente per ogni fantasma attivo
        ghost_positions = [gs.configuration.pos for gs in active_ghosts]
        best = []
        best_dist = -1

        for action in legal_actions:
            successor = state.generate_successor(0, action)
            new_pos = successor.get_pacman_position()

            dmin = min(manhattan_distance(new_pos, gp) for gp in ghost_positions)

            if dmin > best_dist:
                best_dist = dmin
                best = [action]
            elif dmin == best_dist:
                best.append(action)

        return best

    def get_action(self, state):
        legal = state.get_legal_actions(0)
        if not legal:
            return None

        safe_actions = self._get_safe_actions(state, legal)
        # fallback: nessuna azione sicura
        if not safe_actions:
            safe_actions = legal

         # 1) Epsilon-greedy
        if random.random() < self.epsilon:
            return random.choice(safe_actions)

        # 2) Exploitation
        # estraiamo il vettore feature
        state_features = self.extract_features(state)
        with torch.no_grad():
            self.policy_net.eval()
            state_tensor = torch.FloatTensor(state_features).unsqueeze(0).to(self.device)
            q_values = self.policy_net(state_tensor).cpu().numpy()[0]
            self.policy_net.train()

        # 3) Scegliamo tra le sole safe actions il Q massimo
        best_value = -1e9
        best_actions = []
        for action in safe_actions:
            idx = self.action_to_idx.get(action, 0)
            if idx < len(q_values):
                val = q_values[idx]
                if val > best_value:
                    best_value = val
                    best_actions = [action]
                elif val == best_value:
                    best_actions.append(action)
 # fallback extra: se per qualche motivo non c’è best
        if best_actions:
            return random.choice(best_actions)

        return random.choice(safe_actions)

# estrae le feature, le converte in tensori e le salva nel buffer
    def store_transition(self, state, action, next_state, reward, done):
      # entrambi gli stati vengono trasformati in vettori numerici
      # il buffer conterrà dati già pre-processati
        state_features = self.extract_features(state)
        next_state_features = self.extract_features(next_state)
        # l'azione viene convertita in un indice 0-3
        action_idx = self.action_to_idx.get(action, 0)

        # Clip reward per stabilità
        reward = np.clip(reward, -100, 100)

        # Convertiamo in tensori torch
        state_tensor = torch.FloatTensor(state_features)
        next_state_tensor = torch.FloatTensor(next_state_features)
        # salviamo tutto nel buffer
        self.memory.push(state_tensor, action_idx, next_state_tensor, reward, done)

# semplicemente ci serve per l'ambiente di lavoro e lancia il trainstep()
    def observeTransition(self, state, action, nextState, deltaReward):
        done = nextState.is_win() or nextState.is_lose()
        self.store_transition(state, action, nextState, deltaReward, done)

        # Training step
        return self.train_step()

# estrae un batch dal replay buffer e applica il training step
    def train_step(self):
        # Il buffer deve essere già pien
        if len(self.memory) < self.batch_size:
            return None
        # 1) Estraiamo un batch
        state_batch, action_batch, next_state_batch, reward_batch, done_batch = self.memory.sample(self.batch_size)
        # 2) Spostiamo tutto sul device (tutti tensori pytorch)
        state_batch = state_batch.to(self.device)                   # stati correnti [B, state_size]
        next_state_batch = next_state_batch.to(self.device)         # stati successivi [B, state_size]
        action_batch = action_batch.to(self.device).unsqueeze(1)    # azioni fatte [B, 1]
        reward_batch = reward_batch.to(self.device)                 # reward [B]
        done_batch = done_batch.to(self.device)                     # segnala il fine episodio [B]

        # 3) Q(s,a) corrente
        # shape: [B, 4] -> gather -> [B,1]
        current_q = self.policy_net(state_batch).gather(1, action_batch)
        # 4) Double DQN: Q_next = r + gamma * Q_target(s', argmax_a Q_policy(s'))
        with torch.no_grad():
            # azione migliore secondo policy_net
            next_q_policy = self.policy_net(next_state_batch)
            best_actions = next_q_policy.argmax(1, keepdim=True)  # shape: [batch_size, 1]

            # valutiamo quelle azioni con target_net
            next_q_target = self.target_net(next_state_batch).gather(1, best_actions).squeeze(1)

            # gamma * Q_target * (1 - done), (secondo bellman)
            target_q = reward_batch + (1 - done_batch) * self.gamma * next_q_target


        # 5) Loss Huber (SmoothL1)
        loss = self.criterion(current_q.view(-1), target_q)

        # 6) Backprop
        self.optimizer.zero_grad()
        loss.backward()

        #  clipping gradiente per stabilità
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)
        self.optimizer.step()

        # 7) Soft update della target_net
        self.soft_update(tau=0.005)
        self.steps_done += 1


        return loss.item()

# copia i pesi della policy nella rete target
    def soft_update(self, tau=0.005):
        for target_param, policy_param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(tau * policy_param.data + (1.0 - tau) * target_param.data)
# Aggiorniamo epsilon durante il training
    def set_epsilon(self, eps):
        self.epsilon = float(eps)

# semplicemente per il tracciamento interno
# chiamata di inizio episodio
    def startEpisode(self):
        self.episode_count += 1
        self.last_scared_timer = None
        self.ate_in_this_scared = False
        self.power_streak = 0
        if self.verbose and self.episode_count % 10 == 0:
            print(f"\n=== EPISODE {self.episode_count} (DQN) ===")
# chiamata di fine episodio
    def stopEpisode(self):
        self.ate_in_this_scared = False
        self.power_streak = 0
        self.last_scared_timer = None
        # il decadimento dell'epsilon qui non c'è perché è nel training
        if self.verbose and self.episode_count % 10 == 0:
            print(f"Episode {self.episode_count} ended | "
                  f"Buffer: {len(self.memory)} | "
                  f"Epsilon: {self.epsilon:.3f}")

# salvano e ripristinano lo stato del modello
    def save(self, filepath):
        torch.save({
            'policy_net': self.policy_net.state_dict(),
            'target_net': self.target_net.state_dict(),
            'optimizer': self.optimizer.state_dict(),
            'epsilon': self.epsilon,
            'episode_count': self.episode_count,
            'steps_done': self.steps_done
        }, filepath)

    def load(self, filepath):
        checkpoint = torch.load(filepath, map_location=self.device)
        self.policy_net.load_state_dict(checkpoint['policy_net'])
        self.target_net.load_state_dict(checkpoint['target_net'])
        self.optimizer.load_state_dict(checkpoint['optimizer'])
        self.epsilon = checkpoint['epsilon']
        self.episode_count = checkpoint['episode_count']
        self.steps_done = checkpoint['steps_done']



---

di seguito la classe che ci permette di estrarre le feature -> trasforma lo stato di pacman (posizioni, fantasmi, cibo, capsule, muri) in un vettore numerico continuo di dimensione fissa (33) che la rete neurale può usare.
<br> stiamo descrivendo ciò che è "decision-relevant" per pacman e ogni feature cerca di rappresentare la risposta alla domanda "quale informazione aiuta a scegliere la prossima azione?"

all'inizio vengono inizializzate le dimensioni del layout e i conteggi iniziali (una sola volta). Subito dopo vengono estratti tutti i “pezzi grezzi” dello stato: posizione di Pacman, lista del cibo, stati dei fantasmi, azioni legali, capsule e muri, così da non dover richiamare continuamente lo state
<br> le varie feature sono estratte in "blocchi"
* **food (4 feature)** --> dice alla rete dove andare per fare progresso, le quattro feature direzionali (N, E, S, W) non indicano se puoi andare lì, ma se andare in quella direzione ti avvicina al cibo più vicino, mentre la quinta feature è la distanza normalizzata dal cibo più vicino.
* **ghost primary threat (10 feature)**--> solo le feature dedicate alla presenza del fantasma nel labirinto, servono per mettere in atto una strategia difensiva; viene costruita una lista, ordinata per distanza, arricchita con distanza, posizione, stato scared. le prime 8 feature ci dicono per ogni direzione legale “se vado lì, mi avvicino a un fantasma?”. Inoltre vengono separati i fantasmi normali e scared, cosa cruciale per non confondere fuga e caccia. le ultime due feature descrivono situazioni del tipo "sono vicino a un fantasma non scared e andando a est peggioro la situazione".
* **dead-end detection (2 feature)** --> riguarda la struttura del labirinto, non dinamica. insegna alla rete che determinate posizioni a prescindere dai fantasmi sono "pericolose".
* **capsule info (5 feature)** --> la rete impara dove sono le capsule, quanto sono lontane e in che direzioni si trovano. È importante che capsule e food abbiano feature simili ma semantica diversa, perché poi il reward e il contesto decidono quando una capsula è desiderabile.
* **power-up state (5 feature)** -->  queste feature descrivono non dove pacman si trova, ma in che fase del gioco si trova. stessa posizione + stessa geometria ≠ stessa decisione, se cambia lo stato dei fantasmi.
    * scared_ratio --> quanto il gioco è "sbilanciato" a favore di pacman, vale 0 se nessun fantasma è scared, 1 se lo sono tutti (può assumere anche valori intermedi). serve per far capire alla rete che le regole del gioco sono cambiate e quindi lo sono anche le priorità
    * scared_time_norm --> quanto durerà ancora il vantaggio, se restano molti step allora si può pensare ad una strategia offensiva altrimenti di difesa; viene normalizzato su 40 step
    * hunt_opportunity --> c'è un fantasma scared e ci dice quanto è conveniente attarccarlo in quel momento e combina la distanza, stato scared e prossimità reale
    * capsule_urgency --> "prendere subito una capsula o rimandare?" unendo la minaccia di un fantasma vicino non scarede e la raggiungibiità della capsula
    * capsules_eaten_ratio --> memoria globale dello stato delle risorse
* **global contex (6 feature)** --> abbiamo 6 feature che insieme, permettono alla rete di distinguere stati che geometricamente sembrano simili ma strategicamente sono opposti.
    * food_eaten_ratio --> misura il progresso reale verso la vittoria serve a distinguere inizio partita da fine partita
    * score_norm --> Il punteggio incorpora molte informazioni: capsule prese, fantasmi mangiati, tempo. Normalizzandolo, diamo alla rete una traccia di “come sto andando” senza doverle ricostruire tutto. È utile anche per stabilizzare le policy: evita che la rete faccia mosse suicide quando è già in netto vantaggio
    * urgency --> quanti fantasmi attivi sono molto vicini; A differenza del ghost danger direzionale, qui non stiamo dicendo da dove arriva il pericolo, ma quanto è concentrato.
    * safety --> quanto "spazio di manovra" ha pacman;
    * pressure_norm --> quanti fantasmi sono entro un raggio di 5, normalizzato, è una misura di pressione ambientale più ampia rispetto a urgency (utile soprattutto quando si hanno più fantasmi)
    * escape_difficulty --> è legata al numero di azioni legali disponibili. Un corridoio con una sola uscita è fragile anche se in questo istante non c’è pericolo. È una feature strutturale che migliora moltissimo la stabilità del comportamento.



l'assert len(features) == 33 --> è un controllo logico: serve per evitare (nel caso peggiore in assoluto) di allenare un modello con feature sbagliate e senza accorgersene

In [None]:

class ScalableFeatureExtractor:
    def __init__(self, agent):
        self.agent = agent

    def extract_features(self, state):

        self.agent._init_layout_info(state)
        # prendiamo tutto quello che ci serve
        pacman_pos = state.get_pacman_position()
        food = state.get_food().as_list()
        ghost_states = state.get_ghost_states()
        ghosts = [g.get_position() for g in ghost_states]
        legal = state.get_legal_actions(0)
        capsules = state.get_capsules()
        walls = state.get_walls()

        features = [] # le feature verranno inserite in ordine fisso

        # ---- FOOD FEATURES (5) ----
        if food:
          # prendere il cibo più vicino
            closest_food = min(food, key=lambda f: manhattan_distance(pacman_pos, f))
            food_dist = manhattan_distance(pacman_pos, closest_food)

          #  (4) valori (N,E,S,W) che codificano la direzione verso il target.
            features.extend(self.agent._get_direction_vector(pacman_pos, closest_food))

            # Distanza normalizzata, clippata a 1 (1)
            food_dist_norm = min(food_dist / (self.agent.layout_width + self.agent.layout_height), 1.0)
            features.append(food_dist_norm)
        else:
            features.extend([0, 0, 0, 0, 1.0])

        # ---- GHOST PRIMARY THREAT (10) ----

        # Creiamo una lista ordinata di fantasmi per distanza
        ghost_distances = []
        for i, ghost_pos in enumerate(ghosts):
          # distanza pacman-fantasma
            dist = manhattan_distance(pacman_pos, ghost_pos)
            # flag
            is_scared = ghost_states[i].scared_timer > 0 if i < len(ghost_states) else False
            ghost_distances.append((dist, ghost_pos, is_scared, i))
        # ghost_distances[0] è il fantasma più vicino.
        ghost_distances.sort(key=lambda x: x[0])  # Ordina per distanza

        # inizializziamo danger vectors
        ghost_danger_normal = [0.0, 0.0, 0.0, 0.0]  # N, E, S, W
        ghost_danger_scared = [0.0, 0.0, 0.0, 0.0]

        # Analizziamo PRIMARY ghost (più vicino)
        if ghost_distances:
            closest_dist, closest_pos, closest_scared, _ = ghost_distances[0]

            # Calcoliamo danger per direzione (solo se vicino)
            if closest_dist <= 4:
                directions = ['North', 'East', 'South', 'West']
                for j, dir_name in enumerate(directions):
                    if dir_name not in legal:
                        continue

                    successor = state.generate_successor(0, dir_name)
                    new_pos = successor.get_pacman_position()
                    new_dist = manhattan_distance(new_pos, closest_pos)

                    if new_dist < closest_dist:
                        danger_score = 1.0 - (new_dist / 5.0) # cresce se pacman è vicino
                        if closest_scared:
                            ghost_danger_scared[j] = danger_score
                        else:
                            ghost_danger_normal[j] = danger_score

            features.extend(ghost_danger_normal)  # (4)
            features.extend(ghost_danger_scared)  # (4)

            # Distanza normalizzata (1)
            ghost_dist_norm = min(closest_dist / 10.0, 1.0)
            features.append(ghost_dist_norm)

            # Flag scared (1)
            features.append(1.0 if closest_scared else 0.0)
        else:
            # Nessun fantasma (caso improbabile, non dovrebbe accadere)
            features.extend([0.0] * 10)

        # ---- DEAD-END DETECTION (2) ----
        num_legal_moves = len([a for a in legal if a != 'Stop']) # quante mosse reali si hanno senza stop
        is_dead_end = 1.0 if num_legal_moves == 1 else 0.0 # corridoio chiuso (1)
        is_junction = 1.0 if num_legal_moves >= 3 else 0.0 # incrocio dove si hanno almeno tre scelte (1)
        features.extend([is_dead_end, is_junction])

        # ---- CAPSULE INFO (5) ----
        if capsules:
            closest_capsule = min(capsules, key=lambda c: manhattan_distance(pacman_pos, c))
            capsule_dist = manhattan_distance(pacman_pos, closest_capsule)

            # Direzione capsule (4)
            features.extend(self.agent._get_direction_vector(pacman_pos, closest_capsule))

            # Distanza normalizzata (1)
            capsule_dist_norm = min(capsule_dist / (self.agent.layout_width + self.agent.layout_height), 1.0)
            features.append(capsule_dist_norm)
        else:
            features.extend([0, 0, 0, 0, 1.0])

        # ---- POWER-UP STATE (5) ----
        # Quanti fantasmi sono scared?
        num_scared = sum(1 for gs in ghost_states if gs.scared_timer > 0)
        scared_ratio = num_scared / max(len(ghost_states), 1) # (1)
        features.append(scared_ratio)

        # Tempo rimanente (max tra tutti i fantasmi)
        max_scared_timer = max((gs.scared_timer for gs in ghost_states), default=0)
        scared_time_norm = max_scared_timer / 40.0 # (1)
        features.append(scared_time_norm)

        # "Opportunità di caccia" - scared ghost vicino?
        hunt_opportunity = 0.0
        for i, gs in enumerate(ghost_states):
            if gs.scared_timer > 0:
                ghost_pos = ghosts[i]
                dist = manhattan_distance(pacman_pos, ghost_pos)
                if dist <= 5:
                    hunt_opportunity = max(hunt_opportunity, 1.0 - (dist / 5.0)) # (1)
        features.append(hunt_opportunity)

        # "Urgenza capsule" - fantasma vicino E capsule disponibile?
        capsule_urgency = 0.0
        if capsules and ghost_distances:
            # Usiamo la distanza del fantasma normale più vicino
            closest_normal = next((g for g in ghost_distances if not g[2]), None)
            if closest_normal:
                closest_dist_normal = closest_normal[0]
                if closest_dist_normal <= 3:
                    closest_capsule = min(capsules, key=lambda c: manhattan_distance(pacman_pos, c))
                    capsule_dist = manhattan_distance(pacman_pos, closest_capsule)

                    ghost_urgency = 1.0 - (closest_dist_normal / 3.0)
                    capsule_reachability = 1.0 - min(capsule_dist / 10.0, 1.0)

                    capsule_urgency = ghost_urgency * capsule_reachability # (1)
        features.append(capsule_urgency)

        # progresso capsule (quante sono già state consumate)
        capsules_eaten_ratio = 1.0 - (len(capsules) / max(self.agent.initial_capsule_count, 1)) # (1)
        features.append(capsules_eaten_ratio)

        # ---- GLOBAL CONTEXT (6) ----
        # progresso cibo (quanto cibo è stato mangiato)
        food_eaten_ratio = 1.0 - (len(food) / max(self.agent.initial_food_count, 1)) # (1)
        features.append(food_eaten_ratio)

        # Score normalizzato
        score_norm = np.clip(state.get_score() / 1000.0, -1.0, 1.0) # (1)
        features.append(score_norm)

        # Urgency (numero fantasmi attivi vicini)
        ghosts_nearby = sum(1 for g in ghost_distances if g[0] <= 2 and not g[2])
        urgency = min(ghosts_nearby / max(len(ghosts), 1), 1.0) # (1)
        features.append(urgency)

        # Safety (spazio libero da fantasmi attivi)
        safe_neighbors = 0
        # guardiamo le 4 celle adiacenti
        for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]:
            nx, ny = pacman_pos[0] + dx, pacman_pos[1] + dy
            if 0 <= nx < self.agent.layout_width and 0 <= ny < self.agent.layout_height:
                if not walls[nx][ny]:
                    safe_from_all = True
                    for dist, g_pos, is_scared, _ in ghost_distances:
                        if not is_scared:  # Solo fantasmi attivi
                            if manhattan_distance((nx, ny), g_pos) <= 2:
                                safe_from_all = False
                                break
                    if safe_from_all:
                        safe_neighbors += 1
        safety = safe_neighbors / 4.0 # (1)
        features.append(safety)

        # quanti fantasmi entro raggio 5?
        pressure = sum(1 for g in ghost_distances if g[0] <= 5)
        pressure_norm = min(pressure / max(len(ghosts), 1), 1.0) # (1)
        features.append(pressure_norm)

        # poche mosse disponibili di "fuga"
        escape_routes = num_legal_moves
        escape_difficulty = 1.0 - (escape_routes / 4.0) # (1)
        features.append(escape_difficulty)

        assert len(features) == 33, f"Feature count mismatch: {len(features)} != 33"
        return np.array(features, dtype=np.float32)


# ==========================================
# INTEGRAZIONE CON DQNAgent
# ==========================================
class DQNAgentScalable(DQNAgent):

    def __init__(self, state_size=33, **kwargs):
        super().__init__(state_size=state_size, **kwargs)
        self.feature_extractor = ScalableFeatureExtractor(self)

    def extract_features(self, state):
        return self.feature_extractor.extract_features(state)

nota da ricordarsi
if getattr(self.agent, "use_33", False)

In [None]:
# ---- FUNZIONE DI TRAINING ----
# decay_episodes = numero di episodi in cui il valore di epsilon passa da epsilon start a end quindi man mano che si avvicina al quel numero, esplora sempre meno
# di solito viene scelto così decay_episodes ≈ ⅓ – ½ di num_episodes

def train_with_progress2(
    agent,
    num_episodes=100,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_schedule=None,
    seed=None,
    print_every=10,
    max_steps=1000,
    use_powerup_rewards=False,
    epsilon_start=1.0,
    epsilon_end=0.05,
    decay_episodes=300
):

    if seed is not None: # per la riproducibilità dei vari training
        # Python standard
        random.seed(seed)
        # NumPy
        np.random.seed(seed)
        # Torch CPU
        torch.manual_seed(seed)
        # Torch GPU
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
    # set up ambiente
    layout = get_layout(layout_name)
    rules = ClassicGameRules(timeout=0)
    # definizione delle variabili
    wins = 0
    deaths = 0
    timeouts = 0

    results = []
    recent_wins = []
    recent_scores = []
    recent_steps = []
    losses = deque(maxlen=1000) # teniamo solo le ultime loss
# per ogni episodio
    for ep in range(1, num_episodes + 1):
        agent.startEpisode()
        # reset interno dell'agente
        agent.ate_in_this_scared = False
        agent.power_streak = 0
        ghosts_eaten_total = 0

        if epsilon_schedule is not None:
            agent.set_epsilon(epsilon_schedule(ep)) #strategia esterna
        else: # decadimento lineare
            if ep <= decay_episodes:
                new_eps = epsilon_start - (epsilon_start - epsilon_end) * (ep - 1) / (decay_episodes - 1)
            else:
                new_eps = epsilon_end
            agent.set_epsilon(new_eps)
        # inizializziamo i fantasmi, un nuovo gioco berkeley e lo stato iniziale
        ghosts = [ghost_cls(i+1) for i in range(n_ghosts)]
        game = rules.new_game(layout, agent, ghosts, display=None, quiet=True, catch_exceptions=False)
        state = game.state
        steps = 0
        capsules_wasted = 0

        # loop temporale dell'episodio
        while not (state.is_win() or state.is_lose()) and steps < max_steps:
            steps += 1

            # ---- Turno Pacman ----
            # ci servono dopo per il calcolo del reward
            old_score = state.get_score()
            old_food_count = state.get_food().count()
            old_caps = len(state.get_capsules())
            old_ghost_states = state.get_ghost_states()
            old_scared_count = sum(1 for gs in old_ghost_states if gs.scared_timer > 0)
            old_scared_active = old_scared_count > 0

            #azione di pacman
            action = agent.get_action(state)
            next_state = state.generate_successor(0, action)


            # aggiornamento post azione
            new_score = next_state.get_score()
            new_food_count = next_state.get_food().count()

            new_caps = len(next_state.get_capsules())
            new_ghost_states = next_state.get_ghost_states()
            new_scared_count = sum(1 for gs in new_ghost_states if gs.scared_timer > 0)
            new_scared_active = new_scared_count > 0
            # calcolo reward
            reward = 0.0

            # Base reward (delta score normalizzato)
            score_delta = new_score - old_score
            reward += score_delta / 10.0

            if next_state.is_win():
                reward += 25.0
            elif next_state.is_lose():
                reward -= 18.0
            elif new_food_count < old_food_count:
                reward += 2.0
            elif steps > 500:
                penalty_factor = 1 + (steps - 500) / 200.0
                reward -= 0.005 * penalty_factor  # Cresce gradualmente, efficienza, non dove vagare a vuoto
            else:

                reward -= 0.01

            if use_powerup_rewards: # se abbiamo i power up attivi
                pac_pos = next_state.get_pacman_position()
                ghosts_pos = [g.get_position() for g in new_ghost_states]
                # Coefficienti
                R_capsule_base  = +3.0
                R_capsule_waste = -1.0
                r_scared_step   = +0.20
                R_ghost_base    = +8.0
                R_waste_scared  = -2.0

                # Capsule prese
                if new_caps < old_caps:
                    # Calcola urgenza
                    non_scared_pos = [g.get_position() for g in old_ghost_states if g.scared_timer == 0]
                    if not non_scared_pos:
                        non_scared_pos = ghosts_pos  # Fallback

                    if non_scared_pos:
                        dmin = min(manhattan_distance(pac_pos, g) for g in non_scared_pos)
                        urgenza = max(0.0, 1.0 - dmin / 4.0)
                    else:
                        urgenza = 0.0

                    reward += R_capsule_base + 0.6 * urgenza  # Normalizzato

                    # pacman viene penalizzato solo quando quell'azione di prendere la capsula non vede vicino di almeno 8 celle il fantasma
                    max_threat_radius = 5 if layout_name == "small_classic" else 6 # perché small_classic è 7x7
                    any_ghost_nearby = any(manhattan_distance(pac_pos, g) <= max_threat_radius for g in ghosts_pos)

                    if not new_scared_active and not any_ghost_nearby:
                        reward += R_capsule_waste * 0.5
                        capsules_wasted += 1

                # Piccolo bonus per ogni step con scared attivo
                if new_scared_active:
                    reward += r_scared_step

                # Ghost eating
                ghosts_eaten = old_scared_count - new_scared_count

                # Fallback: se score salta molto ma conteggio non cambia
                if ghosts_eaten == 0 and score_delta >= 190:
                    if score_delta >= 390:
                        ghosts_eaten = 2
                    else:
                        ghosts_eaten = 1

                if ghosts_eaten > 0:
                    agent.ate_in_this_scared = True
                    ghosts_eaten_total += ghosts_eaten
                    for _ in range(ghosts_eaten):
                        combo_mult = 2 ** agent.power_streak
                        reward += R_ghost_base * combo_mult #per aver mangiato i fantasmi
                        agent.power_streak += 1

            if next_state.is_win() or next_state.is_lose(): # vince o muore prima che i fantasmi si muovano
                reward = np.clip(reward, -20.0, 20.0)
                loss = agent.observeTransition(state, action, next_state, reward)
                if loss is not None:
                    losses.append(loss)
                state = next_state
                break

            # ---- turno fantasmi ----
            ghost_turn_state = next_state
            for g_idx, ghost in enumerate(ghosts, start=1):
                g_action = ghost.get_action(ghost_turn_state)
                ghost_turn_state = ghost_turn_state.generate_successor(g_idx, g_action)
                if ghost_turn_state.is_win() or ghost_turn_state.is_lose():
                    break #se uccide pacman

            # score dopo che  i fantasmi si sono mossi
            post_score = ghost_turn_state.get_score()
            reward += (post_score - new_score) / 10.0

            # muore o vince a causa dei fantasmi
            if ghost_turn_state.is_win():
                reward += 25.0
            elif ghost_turn_state.is_lose():
                reward -= 18.0

            # caso con i power up attivi
            if use_powerup_rewards and not (ghost_turn_state.is_win() or ghost_turn_state.is_lose()):
                post_ghost_states = ghost_turn_state.get_ghost_states()
                now_scared_active = any(gs.scared_timer > 0 for gs in post_ghost_states) #c'è almeno un fantasma ancora scared?

                if now_scared_active:
                    agent.last_scared_timer = max(gs.scared_timer for gs in post_ghost_states)
                else:
                    if agent.last_scared_timer is not None and agent.last_scared_timer <= 2: # se si era in fase scared

                        if not agent.ate_in_this_scared: # è stata "sprecata" l'occasione
                            reward += (-2.0)
                        # reset interno
                        agent.ate_in_this_scared = False
                        agent.power_streak = 0
                    agent.last_scared_timer = None

            # clip e salvataggio transizione
            reward = np.clip(reward, -20.0, 20.0)
            loss = agent.observeTransition(state, action, ghost_turn_state, reward)
            if loss is not None:
                losses.append(loss)

            # prepariamo il prossimo ciclo
            state = ghost_turn_state

        # Statistiche finali episodio
        final_score = state.get_score()
        win = state.is_win()
        lose = state.is_lose()


        wins += 1 if win else 0
        deaths += 1 if lose else 0
        timeouts += 1 if (steps >= max_steps) and not (win or lose) else 0
        win_rate = wins / ep
        death_rate = deaths / ep

        results.append({
            "episode": ep,
            "score": final_score,
            "win": win,
            "lose": lose,
            "steps": steps,
            "epsilon": agent.epsilon,
            "buffer_size": len(agent.memory),
            "timeout": timeouts,
            "ghosts_eaten": ghosts_eaten_total,
            "capsules_wasted": capsules_wasted,

        })

        recent_wins.append(1 if win else 0)
        recent_scores.append(final_score)
        recent_steps.append(steps)

        # ultimi print_every risultati
        if len(recent_wins) > print_every:
            recent_wins.pop(0)
            recent_scores.pop(0)
            recent_steps.pop(0)

        agent.stopEpisode()

        # statistiche "locali"
        if ep % print_every == 0:
            local_win_rate = sum(recent_wins) / len(recent_wins)
            avg_score = sum(recent_scores) / len(recent_scores)
            avg_steps = sum(recent_steps) / len(recent_steps)

            # Calcoliamo la media loss degli ultimi print_every episodi
            losses_list = list(losses)
            if len(losses_list) > 0:
                recent_losses = losses_list[-print_every:] if len(losses_list) >= print_every else losses_list
                avg_loss = sum(recent_losses) / len(recent_losses)
            else:
                avg_loss = 0.0

            print(f"\n{'='*60}")
            print(f"Episode {ep}/{num_episodes}")
            print(f"  Win rate (ultimi {print_every}): {local_win_rate:.2%}")
            print(f"  Win rate (globale): {win_rate:.2%}")
            print(f"  Avg score: {avg_score:.1f} | Avg steps: {avg_steps:.1f}")
            print(f"  Epsilon: {agent.epsilon:.3f}")
            print(f"  avg loss: {avg_loss}")
            print(f"{'='*60}")

    # statistiche finali
    print(f"\n{'='*60}")
    print(f"TRAINING COMPLETATO")
    print(f"{'='*60}")
    print(f"Episodi totali: {num_episodes}")
    print(f"Vittorie: {wins} ({win_rate:.2%})")
    print(f"Sconfitte: {deaths} ({death_rate:.2%})")
    print(f"timeouts: {timeouts} ")
    print(f"Buffer finale: {len(agent.memory)}")
    print(f"Epsilon finale: {agent.epsilon:.3f}")
    print(f"{'='*60}\n")

    return results



In [None]:
def test_dqn_agent(agent, num_episodes=10, layout_name="small_classic", n_ghosts=1,
                   track_powerups=False, ghost_cls=RandomGhost):

    # Salviamo e impostiamo epsilon a 0
    original_epsilon = agent.epsilon
    agent.set_epsilon(0.0) # niente esplorazione
    agent.policy_net.eval() # no dropout etc

    layout = get_layout(layout_name)
    rules = ClassicGameRules(timeout=0)

    wins = 0
    total_score = 0
    total_steps = 0
    total_capsules_eaten = 0
    total_ghosts_eaten = 0

    print(f"\n{'='*60}")
    print(f"TESTING DQN AGENT (epsilon=0, greedy policy)")
    print(f"{'='*60}\n")

    for ep in range(1, num_episodes + 1):
      # inizializzazione episodio
        ghosts = [ghost_cls(i + 1) for i in range(n_ghosts)]
        game = rules.new_game(layout, agent, ghosts, display=None, quiet=True, catch_exceptions=False)
        state = game.state

        steps = 0
        prev_capsules = len(state.get_capsules())
        prev_score = state.get_score()
        prev_scared_active = any(gs.scared_timer > 0 for gs in state.get_ghost_states())
        ghosts_eaten_in_ep = 0

        while not (state.is_win() or state.is_lose()) and steps < 1000:
            steps += 1

            # turno pacman
            action = agent.get_action(state)
            state = state.generate_successor(0, action)

            if track_powerups:
                curr_capsules = len(state.get_capsules())
                if curr_capsules < prev_capsules:
                    total_capsules_eaten += 1
                prev_capsules = curr_capsules

                new_score = state.get_score()
                delta = new_score - prev_score
                curr_scared_active = any(gs.scared_timer > 0 for gs in state.get_ghost_states())

                # Contiamo i fantasmi mangiati
                if delta >= 190:
                    ghosts_in_step = 0
                    if delta >= 390:
                        ghosts_in_step = 2
                    elif delta >= 190:
                        ghosts_in_step = 1

                    if ghosts_in_step > 0 and (curr_scared_active or prev_scared_active):
                        ghosts_eaten_in_ep += ghosts_in_step
                        total_ghosts_eaten += ghosts_in_step

                prev_score = new_score
                prev_scared_active = curr_scared_active

            if state.is_win() or state.is_lose():
                break

            # turno fantasmi
            for g_idx, ghost in enumerate(ghosts, start=1):
                g_action = ghost.get_action(state)
                state = state.generate_successor(g_idx, g_action)
                if state.is_win() or state.is_lose():
                    break

        # Fine episodio
        win = state.is_win()
        score = state.get_score()
        if win:
            wins += 1
        total_score += score
        total_steps += steps

        result = "WIN" if win else "LOSE"
        print(f"Test {ep}/{num_episodes}: {result} | Score: {score:.1f} | Steps: {steps} | "
              f"Ghosts eaten: {ghosts_eaten_in_ep}")

    # statistiche varie
    win_rate = wins / num_episodes
    avg_score = total_score / num_episodes
    avg_steps = total_steps / num_episodes

    print(f"\n{'='*60}")
    print(f"TEST RESULTS")
    print(f"  Win rate: {win_rate:.2%} ({wins}/{num_episodes})")
    print(f"  Avg score: {avg_score:.1f}")
    print(f"  Avg steps: {avg_steps:.1f}")
    if track_powerups:
        print(f"  Avg capsules eaten: {total_capsules_eaten / num_episodes:.2f}")
        print(f"  Avg ghosts eaten (scared): {total_ghosts_eaten / num_episodes:.2f}")
    print(f"{'='*60}\n")

    # Ripristino stato agente
    agent.set_epsilon(original_epsilon)
    agent.policy_net.train()

    if track_powerups:
        return win_rate, avg_score, avg_steps, total_capsules_eaten / num_episodes, total_ghosts_eaten / num_episodes
    else:
        return win_rate, avg_score, avg_steps


**TRAINING E TEST: 1 random ghost, small_classic (no power up)**

In [None]:
# FASE 1: impara i "fondamentali" (senza power-up)

agent = DQNAgentScalable(
    state_size=33,
    action_size=4,
    alpha=4e-5,
    gamma=0.99,
    epsilon=1.0,
    epsilon_end=0.09,
    buffer_size=30000,
    batch_size=64,
    tau=0.002,
    verbose=True

)


print("\nFASE 1: Training con 1 RandomGhost")
results_A = train_with_progress2(
    agent,
    num_episodes=90,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=RandomGhost,
    epsilon_start=1.0,
    epsilon_end=0.5,
    decay_episodes=60,
    use_powerup_rewards=False,
    print_every=50,
    seed=42
)

print("\nFASE 1B: Training con 1 RandomGhost")
agent.epsilon = 0.9
results_B = train_with_progress2(
    agent,
    num_episodes=150,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=RandomGhost,
    epsilon_start=0.9,
    epsilon_end=0.09,
    decay_episodes=100,
    use_powerup_rewards=False,
    print_every=50,
    seed=42
)


FASE 1: Training con 1 RandomGhost

=== EPISODE 10 (DQN) ===
Episode 10 ended | Buffer: 5255 | Epsilon: 0.924

=== EPISODE 20 (DQN) ===
Episode 20 ended | Buffer: 9966 | Epsilon: 0.839

=== EPISODE 30 (DQN) ===
Episode 30 ended | Buffer: 13408 | Epsilon: 0.754

=== EPISODE 40 (DQN) ===
Episode 40 ended | Buffer: 16357 | Epsilon: 0.669

=== EPISODE 50 (DQN) ===
Episode 50 ended | Buffer: 18895 | Epsilon: 0.585

Episode 50/90
  Win rate (ultimi 50): 66.00%
  Win rate (globale): 66.00%
  Avg score: 403.3 | Avg steps: 377.9
  Epsilon: 0.585
  avg loss: 0.6051684737205505

=== EPISODE 60 (DQN) ===
Episode 60 ended | Buffer: 20764 | Epsilon: 0.500

=== EPISODE 70 (DQN) ===
Episode 70 ended | Buffer: 23341 | Epsilon: 0.500

=== EPISODE 80 (DQN) ===
Episode 80 ended | Buffer: 25102 | Epsilon: 0.500

=== EPISODE 90 (DQN) ===
Episode 90 ended | Buffer: 27473 | Epsilon: 0.500

TRAINING COMPLETATO
Episodi totali: 90
Vittorie: 66 (73.33%)
Sconfitte: 24 (26.67%)
timeouts: 0 
Buffer finale: 27473
Ep

In [None]:
test_dqn_agent(agent, num_episodes=30, layout_name="small_classic", n_ghosts=1, ghost_cls=RandomGhost)


TESTING DQN AGENT (epsilon=0, greedy policy)

Test 1/30: WIN | Score: 1312.0 | Steps: 138 | Ghosts eaten: 0
Test 2/30: WIN | Score: 954.0 | Steps: 96 | Ghosts eaten: 0
Test 3/30: WIN | Score: 910.0 | Steps: 140 | Ghosts eaten: 0
Test 4/30: WIN | Score: 972.0 | Steps: 78 | Ghosts eaten: 0
Test 5/30: WIN | Score: 1077.0 | Steps: 173 | Ghosts eaten: 0
Test 6/30: WIN | Score: 953.0 | Steps: 97 | Ghosts eaten: 0
Test 7/30: LOSE | Score: -355.0 | Steps: 275 | Ghosts eaten: 0
Test 8/30: WIN | Score: 963.0 | Steps: 87 | Ghosts eaten: 0
Test 9/30: WIN | Score: 1061.0 | Steps: 189 | Ghosts eaten: 0
Test 10/30: WIN | Score: 868.0 | Steps: 182 | Ghosts eaten: 0
Test 11/30: WIN | Score: 964.0 | Steps: 86 | Ghosts eaten: 0
Test 12/30: WIN | Score: 653.0 | Steps: 397 | Ghosts eaten: 0
Test 13/30: WIN | Score: 936.0 | Steps: 114 | Ghosts eaten: 0
Test 14/30: WIN | Score: 963.0 | Steps: 87 | Ghosts eaten: 0
Test 15/30: WIN | Score: 967.0 | Steps: 83 | Ghosts eaten: 0
Test 16/30: WIN | Score: 1152.0 | 

(0.9333333333333333, 860.1, 167.9)

**TRAINING E TEST: 1 directional ghost, small_classic (no power up)**

In [None]:
print("\nFASE 2A: Training con 1 DirectionalGhost")
# manteniamo una % del buffer
n_keep = int(len(agent.memory.buffer) * 0.2)
agent.memory.buffer = deque(list(agent.memory.buffer)[-n_keep:], maxlen=50000)
# reset epsilon + tau leggermente più alto
agent.epsilon = 1.0
agent.tau = 0.004  # più aggressivo per DirectionalGhost
# Riduciamo il  learning rate
agent.learning_rate = 3e-5
agent.optimizer = optim.Adam(agent.policy_net.parameters(), lr=3e-5)
agent.episode_count = 0

results_A = train_with_progress2(
    agent,
    num_episodes=150,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_start=1.0,           #
    epsilon_end=0.5,
    decay_episodes=90,
    use_powerup_rewards=False,
    print_every=50,
    max_steps=950,
    seed = 42
)

agent.tau = 0.004
agent.epsilon = 0.7
results_B = train_with_progress2(
    agent,
    num_episodes=300,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_start=0.7,
    epsilon_end=0.05,
    decay_episodes=300,
    use_powerup_rewards=False,
    print_every=50,
    max_steps=950,
    seed = 42
)

agent.tau = 0.003
agent.epsilon = 0.3
results_C = train_with_progress2(
    agent,
    num_episodes=250,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_start=0.3,
    epsilon_end=0.02,
    decay_episodes=150,
    use_powerup_rewards=False,
    print_every=50,
    seed = 42
)



FASE 2A: Training con 1 DirectionalGhost

=== EPISODE 10 (DQN) ===
Episode 10 ended | Buffer: 7482 | Epsilon: 0.949

=== EPISODE 20 (DQN) ===
Episode 20 ended | Buffer: 8898 | Epsilon: 0.893

=== EPISODE 30 (DQN) ===
Episode 30 ended | Buffer: 10369 | Epsilon: 0.837

=== EPISODE 40 (DQN) ===
Episode 40 ended | Buffer: 11982 | Epsilon: 0.781

=== EPISODE 50 (DQN) ===
Episode 50 ended | Buffer: 13335 | Epsilon: 0.725

Episode 50/150
  Win rate (ultimi 50): 12.00%
  Win rate (globale): 12.00%
  Avg score: -59.5 | Avg steps: 146.7
  Epsilon: 0.725
  avg loss: 2.4036529588699342

=== EPISODE 60 (DQN) ===
Episode 60 ended | Buffer: 14417 | Epsilon: 0.669

=== EPISODE 70 (DQN) ===
Episode 70 ended | Buffer: 15658 | Epsilon: 0.612

=== EPISODE 80 (DQN) ===
Episode 80 ended | Buffer: 17097 | Epsilon: 0.556

=== EPISODE 90 (DQN) ===
Episode 90 ended | Buffer: 18349 | Epsilon: 0.500

=== EPISODE 100 (DQN) ===
Episode 100 ended | Buffer: 19627 | Epsilon: 0.500

Episode 100/150
  Win rate (ultimi 

In [None]:
test_dqn_agent(agent, num_episodes=50, layout_name="small_classic", n_ghosts=1, ghost_cls=DirectionalGhost)


TESTING DQN AGENT (epsilon=0, greedy policy)

Test 1/50: LOSE | Score: -39.0 | Steps: 159 | Ghosts eaten: 0
Test 2/50: WIN | Score: 1173.0 | Steps: 77 | Ghosts eaten: 0
Test 3/50: LOSE | Score: 19.0 | Steps: 61 | Ghosts eaten: 0
Test 4/50: WIN | Score: 1365.0 | Steps: 85 | Ghosts eaten: 0
Test 5/50: WIN | Score: 1296.0 | Steps: 154 | Ghosts eaten: 0
Test 6/50: WIN | Score: 789.0 | Steps: 261 | Ghosts eaten: 0
Test 7/50: WIN | Score: 1337.0 | Steps: 113 | Ghosts eaten: 0
Test 8/50: WIN | Score: 1053.0 | Steps: 197 | Ghosts eaten: 0
Test 9/50: WIN | Score: 1173.0 | Steps: 77 | Ghosts eaten: 0
Test 10/50: WIN | Score: 1327.0 | Steps: 123 | Ghosts eaten: 0
Test 11/50: WIN | Score: 937.0 | Steps: 113 | Ghosts eaten: 0
Test 12/50: WIN | Score: 1142.0 | Steps: 108 | Ghosts eaten: 0
Test 13/50: WIN | Score: 915.0 | Steps: 135 | Ghosts eaten: 0
Test 14/50: WIN | Score: 931.0 | Steps: 119 | Ghosts eaten: 0
Test 15/50: WIN | Score: 1070.0 | Steps: 180 | Ghosts eaten: 0
Test 16/50: WIN | Score: 8

(0.9, 1016.42, 131.38)

# **FASE 5:** USO DEI POWER UP

**TRAINING E TEST: 1 directional ghost, small_classic**

In [None]:
agent3 = DQNAgentScalable(
    state_size=33,
    action_size=4,
    alpha=2e-5,
    gamma=0.99,
    epsilon=1.0,
    epsilon_end=0.7,
    buffer_size=20000,
    batch_size=64,
    tau=0.003,
    verbose=True
)

print("agente ricaricato con successo!")

agente ricaricato con successo!


In [None]:


print("\nFASE 3A: Training con 1 DirectionalGhost, con POWER UP")

print("\n FASE 1 ESPLORATIVA")
results_S = train_with_progress2(
    agent3,
    num_episodes=250,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_start=1.0,      # esplorazione totale
    epsilon_end=0.7,        # ancora esplorativo
    decay_episodes=200,
    use_powerup_rewards=True,
    print_every=50,
    seed=42
)

print("\n FASE 2 ESPLORATIVA")
# settiamo di nuovo i parametri che ci interessano
agent3.epsilon = 0.8
agent3.tau = 0.001
agent3.gamma = 0.99
agent3.epsilon_end = 0.2

results_M = train_with_progress2(
    agent3,
    num_episodes=400,
    layout_name="small_classic",
    n_ghosts=1,
    ghost_cls=DirectionalGhost,
    epsilon_start=0.8,
    epsilon_end=0.2,
    decay_episodes=320,
    use_powerup_rewards=True,
    print_every=50,
    seed=42
)


FASE 3A: Training con 1 DirectionalGhost, con POWER UP

 FASE 1 ESPLORATIVA

=== EPISODE 10 (DQN) ===
Episode 10 ended | Buffer: 1506 | Epsilon: 0.986

=== EPISODE 20 (DQN) ===
Episode 20 ended | Buffer: 3000 | Epsilon: 0.971

=== EPISODE 30 (DQN) ===
Episode 30 ended | Buffer: 4788 | Epsilon: 0.956

=== EPISODE 40 (DQN) ===
Episode 40 ended | Buffer: 6155 | Epsilon: 0.941

=== EPISODE 50 (DQN) ===
Episode 50 ended | Buffer: 7103 | Epsilon: 0.926

Episode 50/250
  Win rate (ultimi 50): 16.00%
  Win rate (globale): 16.00%
  Avg score: -36.1 | Avg steps: 142.1
  Epsilon: 0.926
  avg loss: 0.9246874403953552

=== EPISODE 60 (DQN) ===
Episode 60 ended | Buffer: 8716 | Epsilon: 0.911

=== EPISODE 70 (DQN) ===
Episode 70 ended | Buffer: 10174 | Epsilon: 0.896

=== EPISODE 80 (DQN) ===
Episode 80 ended | Buffer: 12139 | Epsilon: 0.881

=== EPISODE 90 (DQN) ===
Episode 90 ended | Buffer: 13866 | Epsilon: 0.866

=== EPISODE 100 (DQN) ===
Episode 100 ended | Buffer: 15603 | Epsilon: 0.851

Epis

In [None]:
test_dqn_agent(agent3, num_episodes=50, layout_name="small_classic", n_ghosts=1, ghost_cls=DirectionalGhost, track_powerups=True )


TESTING DQN AGENT (epsilon=0, greedy policy)

Test 1/50: WIN | Score: 1171.0 | Steps: 79 | Ghosts eaten: 1
Test 2/50: LOSE | Score: 27.0 | Steps: 213 | Ghosts eaten: 1
Test 3/50: WIN | Score: 1116.0 | Steps: 134 | Ghosts eaten: 1
Test 4/50: WIN | Score: 1115.0 | Steps: 135 | Ghosts eaten: 1
Test 5/50: WIN | Score: 1171.0 | Steps: 79 | Ghosts eaten: 1
Test 6/50: WIN | Score: 1375.0 | Steps: 75 | Ghosts eaten: 2
Test 7/50: WIN | Score: 1136.0 | Steps: 114 | Ghosts eaten: 1
Test 8/50: WIN | Score: 1155.0 | Steps: 95 | Ghosts eaten: 1
Test 9/50: WIN | Score: 1155.0 | Steps: 95 | Ghosts eaten: 1
Test 10/50: WIN | Score: 1165.0 | Steps: 85 | Ghosts eaten: 1
Test 11/50: WIN | Score: 967.0 | Steps: 83 | Ghosts eaten: 0
Test 12/50: WIN | Score: 1149.0 | Steps: 101 | Ghosts eaten: 1
Test 13/50: WIN | Score: 1339.0 | Steps: 111 | Ghosts eaten: 2
Test 14/50: WIN | Score: 1149.0 | Steps: 101 | Ghosts eaten: 3
Test 15/50: WIN | Score: 914.0 | Steps: 136 | Ghosts eaten: 0
Test 16/50: WIN | Score: 96

(0.94, 1038.36, 103.84, 1.08, 1.04)