In [1]:
# Standard
from collections import namedtuple, deque
import random

# Third-party
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import trange
import gc
# knister api
from api import KnisterGame

In [2]:
device = torch.device("cpu")
print(device)

cpu


## Q network

In [5]:
class QNet(nn.Module):
    # Policy Network
    def __init__(self, n_state_vars, n_actions, dim_hidden=256):
        super(QNet, self).__init__()

        self.fc = nn.Sequential(
            nn.Linear(n_state_vars, dim_hidden),
            nn.ReLU(),
            nn.Linear(dim_hidden, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions)
        )

    def forward(self, x):
        # Passes the input through the network layers to output Q-values
        return self.fc(x)

##  Replay Buffer

In [7]:
class ReplayBuffer:
    def __init__(self, n_actions, memory_size, batch_size):
        # Initialize actions, batch and experience template
        self.n_actions = n_actions
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        # Initialize the memory
        self.memory = deque(maxlen=memory_size)  

    def __len__(self):
        return len(self.memory)

    def add(self, state, action, reward, next_state, done):
        # Store experience in memory
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        # Sample a batch of experiences
        experiences = random.sample(self.memory, k=self.batch_size)  

        # Convert to tensors for training
        states = torch.from_numpy(
            np.vstack([e.state for e in experiences if e is not None])
        ).float().to(device)

        actions = torch.from_numpy(
            np.vstack([e.action for e in experiences if e is not None])
        ).long().to(device)

        rewards = torch.from_numpy(
            np.vstack([e.reward for e in experiences if e is not None])
        ).float().to(device)

        next_states = torch.from_numpy(
            np.vstack([e.next_state for e in experiences if e is not None])
        ).float().to(device)

        dones = torch.from_numpy(
            np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)
        ).float().to(device)

        # Return the tuple with all tensors
        return (states,actions,rewards,next_states,dones)

## DQN

In [8]:
class DQN:
    def __init__(
        self, n_states, n_actions, batch_size=64, learning_rate=1e-4,
        learn_step=5, gamma=0.99, mem_size=int(1e5), tau=1e-3
    ):
        # Core parameters for learning and updating the Q-network
        self.n_states = n_states
        self.n_actions = n_actions
        self.batch_size = batch_size
        self.gamma = gamma  # Discount factor for future rewards
        self.learn_step = learn_step  # Frequency of learning steps
        self.tau = tau  # Rate for soft updating the target network

        # Initialize the policy network (net_eval) and target network (net_target)
        self.net_eval = QNet(n_states,n_actions).to(device)  
        self.net_target = QNet(n_states,n_actions).to(device)  
        self.optimizer = optim.Adam(self.net_eval.parameters(),lr=learning_rate) 
        self.criterion =  nn.MSELoss() 

        # Initialize memory for experience replay
        self.memory = ReplayBuffer(n_actions,mem_size,batch_size)  
        self.counter = 0  # Tracks learning steps for periodic updates

    def getAction(self, state,available_actions, epsilon):
        # Select action using an epsilon-greedy strategy to balance exploration and exploitation
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)

        self.net_eval.eval()  # Set network to evaluation mode
        with torch.no_grad():
            action_values = self.net_eval(state)
        self.net_eval.train()  # Return to training mode

        # Choose random action with probability epsilon, otherwise choose best predicted action
        if random.random() < epsilon:
            action = random.choice(available_actions)
        else:
            # Calculation of q_values
            q_values = action_values.cpu().data.numpy().squeeze()
            # create a mask with -infinity
            mask = np.full(q_values.shape, -1e5)
            # coping only the valid actions in the mask
            mask[available_actions] = q_values[available_actions]
            # Argmax on the mask will always choose a valid action
            action = np.argmax(mask)
        
        return action

    def save2Memory(self, state, action, reward, next_state, done):
        # Save experience to memory and, if ready, sample from memory and
        #  update the network
        self.memory.add(state, action, reward, next_state, done)
        self.counter += 1  # Increment step counter

        # Perform learning every 'learn_step' steps if enough experiences are in memory
        if (self.counter % self.learn_step == 0 and len(self.memory) >= self.batch_size): 
            experiences= self.memory.sample() 
            self.learn(experiences)

    def learn(self, experiences):
        # Perform a learning step by minimizing the difference between
        #  predicted and target Q-values
        states, actions, rewards, next_states, dones = experiences
        
        ## ADDING MASK FOR INVALID ACTIONS ##
        q_target_predictions = self.net_target(next_states).detach()
        # retrieve the grids of the next states
        next_states_grids = next_states[:, :25]
        # Create the mask for valid actions
        invalid_mask = (next_states_grids != 0)
        # Set Q-values of invalid actions to -inf
        q_target_predictions[invalid_mask] = -1e5

        # Compute target Q-values from net_target for stability in training
        q_target = q_target_predictions.max(1)[0].unsqueeze(1)
       
        y_j = rewards + (self.gamma * q_target * (1 - dones))
            # Bellman equation for target Q-value
        q_eval = self.net_eval(states).gather(1, actions)
            # Q-value predictions from policy network

        # Compute loss and backpropagate to update net_eval
        loss = self.criterion(q_eval, y_j)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update target network with soft update for smooth learning
        self.targetUpdate()

    def targetUpdate(self):
        # Soft update to gradually shift target network parameters toward
        #  policy network parameters
        params = zip(self.net_eval.parameters(), self.net_target.parameters())
        for eval_param, target_param in params:
            target_param.data.copy_(
                self.tau  * eval_param.data + (1.0-self.tau) * target_param.data # Hint: use the `self.tau` rate for soft updating
            )

### Function that calculate the enviroment state

In [9]:
def get_obs_state(env):
    # --- 1. NORMALIZED GRID ---
    # 25 values float between 0.0 and 1.0 (or 0 and 12/12)
    grid = env.get_grid().flatten() / 12.0
    
    # --- 2. DICE (One-Hot Encoded) ---
    # The dice ranges from 2 to 12 (11 possible values)
    dice_val = int(env.get_current_roll())
    dice_one_hot = np.zeros(11, dtype=np.float32)
    
    # If 2 comes out -> index 0, If 12 comes out -> index 10
    if 2 <= dice_val <= 12:
        dice_one_hot[dice_val - 2] = 1.0
        
    # --- 3. CONCATENATION ---
    # 25 (grid) + 11 (dice) = 36 total inputs
    obs = np.concatenate([grid, dice_one_hot])
    
    return obs.astype(np.float32)

In [None]:
CHECKPOINT_NAME = 'checkpoint.pth'
RECENT_EPISODES = 100  # Number of episodes for average score in early stopping
MIN_EPISODES_FOR_STOP = 100  # Ensures enough episodes before evaluating target

def train(
        env, agent, n_episodes, max_steps,
        eps_start, eps_end, eps_decay,
        target_score, do_store_checkpoint
):
    # Initialize score history and epsilon (exploration rate)
    game_score_hist = []
    epsilon_hist = []
    epsilon = eps_start

    # Progress bar format for tracking training progress
    bar_format = '{l_bar}{bar:10}| {n:4}/{total_fmt}'\
                 ' [{elapsed:>7}<{remaining:>7}, {rate_fmt}{postfix}]'
    pbar = trange(n_episodes, unit="ep", bar_format=bar_format, ascii=True)

    for idx_epi in pbar:
        # Reset the environment for a new episode
        env.new_game()
        state = get_obs_state(env)
        score = 0.0

        for idx_step in range(max_steps):
            # Select an action based on the current policy (epsilon-greedy)
            action = agent.getAction(state,env.get_available_actions(), epsilon)  
            
            # Execute the chosen action in the environment            
            env.choose_action(action)

            #change of state
            next_state = get_obs_state(env)
            
            #Total reward
            reward = env.get_last_reward() 
            
            # Check if the episode is finished
            done = env.has_finished()
            
            # Store experience in memory and update the agent
            agent.save2Memory(state,action,reward,next_state,done)  
            
            # Move to the next state
            state = next_state  
            
            score += reward 
            # Check if the episode is finished
            if done:
                break
        
        # Track scores and decay epsilon for less exploration over time
        # Saves the local score of each episode to see it in the graph
        game_score = env.get_total_reward()
        game_score_hist.append(game_score)
        epsilon_hist.append(epsilon)
        score_avg = np.mean(game_score_hist[-RECENT_EPISODES:])
        epsilon =max(epsilon * eps_decay,eps_end)

        # Update the progress bar with the current score and average
        pbar.set_postfix_str(
            f"Score: {score: 7.2f}, 100 score avg: {score_avg: 7.2f}, Eps: {epsilon: .4f}"
        )
        pbar.update(0)

        # Clear GPU memory periodically to prevent memory issues
        if idx_epi % 1000 == 0:
            torch.cuda.empty_cache()
            gc.collect()
            
        # Early stopping condition if target score is achieved
        if len(game_score_hist) >= 100 and score_avg >= target_score:
            print("\nTarget Reached!")
            break

    # Print completion message based on early stopping or max episodes
    if (idx_epi + 1) < n_episodes:
        print("\nTraining complete - target reached!")
    else:
        print("\nTraining complete - maximum episodes reached.")

    # Save the trained model if specified
    if do_store_checkpoint:
        torch.save(agent.net_eval.state_dict(), CHECKPOINT_NAME)

    return game_score_hist, epsilon_hist

In [11]:
def plotScore(scores):
    # Plot the agent's score history to visualize learning progress
    plt.figure()
    plt.plot(scores)
    plt.title("Score History")
    plt.xlabel("Episodes")
    plt.ylabel("Score")
    plt.show()

In [12]:
def plotEpsilon(epsilons):
    # Plot the agent's epsilon history to visualize exploration rate changes
    plt.figure()
    plt.plot(epsilons)
    plt.title("Epsilon History")
    plt.xlabel("Episodes")
    plt.ylabel("Epsilon")
    plt.show()

In [None]:
def plotAll(scores, epsilons):
    # Plot  score, and epsilon histories in a single figure with subplots
    plt.figure(figsize=(12, 8))

    plt.subplot(2, 1, 1)
    plt.plot(scores)
    plt.title("Score History")
    plt.xlabel("Episodes")
    plt.ylabel("Score")

    plt.subplot(2, 1, 2)
    plt.plot(epsilons)
    plt.title("Epsilon History")
    plt.xlabel("Episodes")
    plt.ylabel("Epsilon")
    
    plt.tight_layout()
    plt.show()

## **Section 4 - Time to learn (training)**

In [14]:
BATCH_SIZE = 64         # Number of experiences sampled per learning step
LR = 5e-4                # Learning rate for optimizer
EPISODES = 500000        # Maximum number of episodes to train (~1 or 2 minute for 100 episodes)
TARGET_SCORE = 65        # Early stop if average score reaches this value
GAMMA = 0.99             # Discount factor for future rewards
MEMORY_SIZE = 100000      # Maximum capacity of replay memory
LEARN_STEP = 10          # Frequency (in steps) of learning updates
TAU = 1e-3               # Soft update rate for the target network
SAVE_CHKPT = True       # Option to save trained model checkpoint
#Exploration parameters
MAX_STEPS = 25           # Maximum steps per episode
EPS_START = 1             # Initial epsilon for exploration (100% exploration at start)
EPS_END = 0.20            # Minimum epsilon (final exploration rate)
# Epsilon decay rate (controls exploration reduction)
EPS_DECAY = 0.99995 # after 80000 it goes to EPS_END

In [15]:
# 1.Create the Knister game environment

env = KnisterGame()

# 2. Define state and action dimensions
# Input: 25 squares + 11 dice = 36
n_states = 36 
# Output: 25 possible positions to write
n_actions = 25 

# 3. Initialize the Agent
agent = DQN(
    n_states=n_states,
    n_actions=n_actions,
    batch_size=BATCH_SIZE,
    learning_rate=LR, 
    mem_size=MEMORY_SIZE,
    gamma=GAMMA,
    learn_step=LEARN_STEP,
    tau=TAU
)

# Training

In [None]:
# 4. Start the Training (passing the correct linear decay)
game_score_hist,epsilon_hist = train(env, agent, n_episodes=EPISODES, max_steps=MAX_STEPS,
                   eps_start=EPS_START, eps_end=EPS_END, eps_decay=EPS_DECAY,
                   target_score=TARGET_SCORE, do_store_checkpoint=SAVE_CHKPT)

# Plot of the results

In [None]:
# 5. Plot the results
plotAll(game_score_hist, epsilon_hist)
# Free up GPU memory if using CUDA
if str(device) == "cuda":
    torch.cuda.empty_cache()

Fast test to see the average done by the agent

In [17]:
def test_knister_agent_fast(env, agent,num_tests, max_steps):
    print("--- INIZIO PARTITA DI TEST ---")
    
    step = 1
    average_score = []
    for i in range(num_tests):
        env.new_game()
        state = get_obs_state(env)  
        for step in range(max_steps):#Episode steps
            # No casual moves only what it has learned
            action = agent.getAction(state,env.get_available_actions(), epsilon=0.0)
            # Execute the move
            env.choose_action(action)
            # Change state
            state = get_obs_state(env)
            # Check if game is finished
            done = env.has_finished()
            step += 1
            if done:
                break    
        average_score.append(env.get_total_reward())
        #print(f"Final Score: {env.get_total_reward()}")
    print(f"Average Score: {sum(average_score) / len(average_score) if average_score else 0}")

In [18]:
NUM_TEST_EPISODES = 500   # Number of episodes to test the agent
MAX_STEPS_TEST = 25    # Maximum steps per test episode
if SAVE_CHKPT:
    agent.net_eval.load_state_dict(torch.load(CHECKPOINT_NAME))
test_knister_agent_fast(env, agent,NUM_TEST_EPISODES, MAX_STEPS_TEST)

--- INIZIO PARTITA DI TEST ---
Average Score: 42.74


Test con passi per vedere cosa fa il modello

In [18]:
def test_knister_agent(env, agent,num_tests, max_steps):
    print("--- INIZIO PARTITA DI TEST ---")
    
    step = 1
    average_score = []
    for i in range(num_tests):
        env.new_game()
        state = get_obs_state(env)  
        for step in range(max_steps):#episode steps
            # No casual moves only what it has learned
            action = agent.getAction(state,env.get_available_actions(), epsilon=0.0)
            # Execute the move
            env.choose_action(action)
            # Change state
            state = get_obs_state(env)
            # Check if game is finished
            done = env.has_finished()
            
            # Retrieve info from the real game for display
            current_grid = env.get_grid()
            dice_value = env.get_current_roll()
            score = env.get_total_reward()
            
            print(f"\nStep {step}")
            print(f"Dice rolled: {dice_value}")
            print(f"The agent chose position: {action} (Row {action//5}, Col {action%5})")
            print("Current grid:")
            print(current_grid)
            print(f"Current score: {score}")
            
            step += 1
            if done:
                break    
        average_score.append(env.get_total_reward())
        #print(f"Final Score: {env.get_total_reward()}")
    print(f"Average Score: {sum(average_score) / len(average_score) if average_score else 0}")

In [39]:
NUM_TEST_EPISODES = 1   # Number of episodes to test the agent
MAX_STEPS_TEST = 25    # Maximum steps per test episode
if SAVE_CHKPT:
    agent.net_eval.load_state_dict(torch.load(CHECKPOINT_NAME))
test_knister_agent(env, agent,NUM_TEST_EPISODES, MAX_STEPS_TEST)

--- INIZIO PARTITA DI TEST ---

Step 0
Dice rolled: 7
The agent chose position: 4 (Row 0, Col 4)
Current grid:
[[0 0 0 0 6]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
Current score: 0

Step 1
Dice rolled: 4
The agent chose position: 12 (Row 2, Col 2)
Current grid:
[[0 0 0 0 6]
 [0 0 0 0 0]
 [0 0 7 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
Current score: 0

Step 2
Dice rolled: 5
The agent chose position: 15 (Row 3, Col 0)
Current grid:
[[0 0 0 0 6]
 [0 0 0 0 0]
 [0 0 7 0 0]
 [4 0 0 0 0]
 [0 0 0 0 0]]
Current score: 0

Step 3
Dice rolled: 7
The agent chose position: 23 (Row 4, Col 3)
Current grid:
[[0 0 0 0 6]
 [0 0 0 0 0]
 [0 0 7 0 0]
 [4 0 0 0 0]
 [0 0 0 5 0]]
Current score: 0

Step 4
Dice rolled: 7
The agent chose position: 20 (Row 4, Col 0)
Current grid:
[[0 0 0 0 6]
 [0 0 0 0 0]
 [0 0 7 0 0]
 [4 0 0 0 0]
 [7 0 0 5 0]]
Current score: 2

Step 5
Dice rolled: 8
The agent chose position: 0 (Row 0, Col 0)
Current grid:
[[7 0 0 0 6]
 [0 0 0 0 0]
 [0 0 7 0 0]
 [4 0 0 0 0]
 [7 0 0 5 0]]
Curr