# PPO Implementation for the 15 Puzzle Game

This notebook implements a Proximal Policy Optimization (PPO) agent to solve the 15 Puzzle Game.

In [None]:
!pip install gym torch numpy

In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import random
from collections import deque


## Define the 15 Puzzle Environment

In [None]:
class FifteenPuzzleEnv:
    def __init__(self):
        self.size = 4
        self.reset()

    def reset(self):
        self.state = np.arange(self.size * self.size)
        np.random.shuffle(self.state)
        return self.get_state()

    def get_state(self):
        return self.state.copy()

    def is_done(self):
        return np.array_equal(self.state, np.arange(self.size * self.size))

    def get_possible_actions(self):
        empty_pos = np.where(self.state == 0)[0][0]
        x, y = divmod(empty_pos, self.size)
        actions = []
        if x > 0: actions.append('up')
        if x < self.size - 1: actions.append('down')
        if y > 0: actions.append('left')
        if y < self.size - 1: actions.append('right')
        return actions

    def step(self, action):
        empty_pos = np.where(self.state == 0)[0][0]
        x, y = divmod(empty_pos, self.size)
        new_pos = empty_pos
        if action == 'up': new_pos -= self.size
        elif action == 'down': new_pos += self.size
        elif action == 'left': new_pos -= 1
        elif action == 'right': new_pos += 1
        self.state[empty_pos], self.state[new_pos] = self.state[new_pos], self.state[empty_pos]
        reward = 1 if self.is_done() else -0.1
        done = self.is_done()
        return self.get_state(), reward, done


## Define the PPO Agent

In [None]:
class ActorCritic(nn.Module):
    def __init__(self, input_dim, action_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
        )
        self.policy_head = nn.Linear(128, action_dim)
        self.value_head = nn.Linear(128, 1)

    def forward(self, x):
        x = self.fc(x)
        return self.policy_head(x), self.value_head(x)


## Training Loop (To Be Completed)

In [None]:
# You would implement the PPO training loop here,
# including rollout collection, advantage estimation,
# and policy/value network updates.

## PPO Utilities

In [None]:
def compute_returns(rewards, dones, values, gamma=0.99):
    returns = []
    G = 0
    for r, d in zip(reversed(rewards), reversed(dones)):
        if d:
            G = 0
        G = r + gamma * G
        returns.insert(0, G)
    return torch.tensor(returns, dtype=torch.float32)


## PPO Training Loop

In [None]:
env = FifteenPuzzleEnv()
action_space = ['up', 'down', 'left', 'right']
input_dim = env.size * env.size
action_dim = len(action_space)
model = ActorCritic(input_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=3e-4)

def select_action(state):
    state_tensor = torch.tensor(state, dtype=torch.float32)
    logits, value = model(state_tensor)
    probs = torch.softmax(logits, dim=-1)
    dist = Categorical(probs)
    action = dist.sample()
    return action.item(), dist.log_prob(action), value

# Hyperparameters
epochs = 1000
gamma = 0.99

for epoch in range(epochs):
    state = env.reset()
    log_probs = []
    values = []
    rewards = []
    dones = []
    actions_taken = []
    states = []
    done = False
    total_reward = 0

    while not done:
        states.append(state)
        available_actions = env.get_possible_actions()
        action_mask = [1 if a in available_actions else 0 for a in action_space]
        logits, value = model(torch.tensor(state, dtype=torch.float32))
        masked_logits = logits + torch.tensor(action_mask, dtype=torch.float32).log()
        dist = Categorical(logits=masked_logits)
        action = dist.sample()
        next_state, reward, done = env.step(action_space[action.item()])

        log_probs.append(dist.log_prob(action))
        values.append(value)
        rewards.append(reward)
        dones.append(done)
        actions_taken.append(action.item())
        state = next_state
        total_reward += reward

    returns = compute_returns(rewards, dones, values, gamma)
    log_probs = torch.stack(log_probs)
    values = torch.cat(values).squeeze()
    advantage = returns - values

    policy_loss = -(log_probs * advantage.detach()).mean()
    value_loss = advantage.pow(2).mean()
    loss = policy_loss + 0.5 * value_loss

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Total Reward: {total_reward:.2f}, Loss: {loss.item():.4f}")

## Complete PPO Training Loop

In [None]:

class PPOAgent:
    def __init__(self, input_dim, action_dim, action_space):
        self.model = ActorCritic(input_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=3e-4)
        self.action_space = action_space
        self.eps_clip = 0.2
        self.gamma = 0.99
        self.entropy_coef = 0.01
        self.value_coef = 0.5

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        logits, value = self.model(state)
        probs = torch.softmax(logits, dim=-1)
        dist = Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action), dist.entropy(), value

    def compute_returns(self, rewards, dones, values, next_value):
        returns = []
        R = next_value
        for step in reversed(range(len(rewards))):
            R = rewards[step] + self.gamma * R * (1 - dones[step])
            returns.insert(0, R)
        return torch.tensor(returns)

    def update(self, trajectories):
        states = torch.stack(trajectories['states'])
        actions = torch.tensor(trajectories['actions'])
        old_log_probs = torch.stack(trajectories['log_probs'])
        returns = torch.tensor(trajectories['returns'])
        values = torch.stack(trajectories['values']).squeeze()

        advantages = returns - values.detach()

        for _ in range(4):  # Multiple epochs
            logits, new_values = self.model(states)
            new_probs = torch.softmax(logits, dim=-1)
            dist = Categorical(new_probs)
            new_log_probs = dist.log_prob(actions)
            entropy = dist.entropy().mean()

            ratios = torch.exp(new_log_probs - old_log_probs)
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages

            policy_loss = -torch.min(surr1, surr2).mean()
            value_loss = (returns - new_values.squeeze()).pow(2).mean()
            loss = policy_loss + self.value_coef * value_loss - self.entropy_coef * entropy

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

# Train PPO on 15 Puzzle
env = FifteenPuzzleEnv()
agent = PPOAgent(input_dim=16, action_dim=4, action_space=['up', 'down', 'left', 'right'])

for episode in range(1000):
    state = env.reset()
    state = torch.FloatTensor(state)
    trajectories = {'states': [], 'actions': [], 'log_probs': [], 'rewards': [], 'dones': [], 'values': [], 'returns': []}

    total_reward = 0
    for t in range(200):
        possible_actions = env.get_possible_actions()
        action_map = ['up', 'down', 'left', 'right']
        action_mask = [1 if a in possible_actions else 0 for a in action_map]

        action, log_prob, entropy, value = agent.select_action(state)
        while not action_mask[action]:
            action, log_prob, entropy, value = agent.select_action(state)

        next_state, reward, done = env.step(action_map[action])
        next_state = torch.FloatTensor(next_state)

        trajectories['states'].append(state)
        trajectories['actions'].append(action)
        trajectories['log_probs'].append(log_prob)
        trajectories['rewards'].append(reward)
        trajectories['dones'].append(done)
        trajectories['values'].append(value)

        state = next_state
        total_reward += reward

        if done:
            break

    next_value = agent.model(state.unsqueeze(0))[1].detach().item()
    trajectories['returns'] = agent.compute_returns(trajectories['rewards'], trajectories['dones'], trajectories['values'], next_value)

    agent.update(trajectories)

    if episode % 50 == 0:
        print(f"Episode {episode}, Total Reward: {total_reward}")


## Evaluation and Visualization

In [None]:

import matplotlib.pyplot as plt

# Evaluate the trained agent
def evaluate_agent(agent, episodes=10):
    rewards = []
    steps = []
    for ep in range(episodes):
        state = env.reset()
        state = torch.FloatTensor(state)
        total_reward = 0
        step_count = 0
        for t in range(200):
            with torch.no_grad():
                logits, _ = agent.model(state.unsqueeze(0))
                probs = torch.softmax(logits, dim=-1)
                dist = Categorical(probs)
                action = dist.sample().item()

            action_map = ['up', 'down', 'left', 'right']
            possible_actions = env.get_possible_actions()
            action_mask = [1 if a in possible_actions else 0 for a in action_map]

            while not action_mask[action]:
                action = dist.sample().item()

            next_state, reward, done = env.step(action_map[action])
            state = torch.FloatTensor(next_state)
            total_reward += reward
            step_count += 1
            if done:
                break
        rewards.append(total_reward)
        steps.append(step_count)
        print(f"Evaluation Episode {ep+1}: Reward = {total_reward}, Steps = {step_count}")
    return rewards, steps

# Run evaluation
rewards, steps = evaluate_agent(agent, episodes=20)

# Plot results
plt.figure()
plt.plot(rewards)
plt.title("Rewards over Evaluation Episodes")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.grid(True)
plt.show()

plt.figure()
plt.plot(steps)
plt.title("Steps to Solve Puzzle per Evaluation Episode")
plt.xlabel("Episode")
plt.ylabel("Steps")
plt.grid(True)
plt.show()
