In [None]:
import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import DQN, PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import CheckpointCallback

# Load Sudoku dataset
file_path = '/kaggle/input/drl-proj/sudoku.csv'
data = pd.read_csv(file_path)

# Custom Sudoku Environment
class SudokuEnv(gym.Env):
    def __init__(self):
        super(SudokuEnv, self).__init__()
        self.action_space = spaces.Discrete(81 * 9)
        self.observation_space = spaces.Box(0, 9, shape=(81,), dtype=np.int32)
        self.current_puzzle = None
        self.solution = None

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        index = np.random.randint(len(data))
        puzzle = data.iloc[index]['quizzes']
        solution = data.iloc[index]['solutions']
        self.current_puzzle = np.array([int(x) for x in puzzle], dtype=np.int32).reshape(81)
        self.solution = np.array([int(x) for x in solution], dtype=np.int32).reshape(81)
        return self.current_puzzle, {}

    def step(self, action):
        cell = action // 9
        value = (action % 9) + 1

        reward = 1
        terminated = False

        if self.current_puzzle[cell] == 0:
            if self.solution[cell] == value:
                self.current_puzzle[cell] = value
                reward += 10  # Larger reward for correct moves

                # Additional reward for completing rows, columns, or boxes
                row = cell // 9
                col = cell % 9
                if all(self.current_puzzle[row * 9:(row + 1) * 9] == self.solution[row * 9:(row + 1) * 9]):
                    reward += 5
                if all(self.current_puzzle[col::9] == self.solution[col::9]):
                    reward += 5

                box_row, box_col = row // 3, col // 3
                box_cells = [
                    self.current_puzzle[(box_row * 3 + i) * 9 + (box_col * 3):(box_row * 3 + i) * 9 + (box_col * 3 + 3)]
                    for i in range(3)
                ]
                if np.array_equal(np.concatenate(box_cells).flatten(), self.solution[(box_row * 3) * 9 + box_col * 3:(box_row * 3 + 3) * 9 + (box_col + 1) * 3].flatten()):
                    reward += 5

                terminated = np.array_equal(self.current_puzzle, self.solution)
            else:
                reward -= 2  # Smaller penalty for incorrect moves
                terminated = True
        else:
            reward -= 1  # Penalty for trying to fill a non-empty cell

        # Bonus for overall progress
        correct_cells = np.sum(self.current_puzzle == self.solution)
        reward += correct_cells * 0.05  # Small reward for every correct cell

        truncated = False  # Sudoku puzzles do not have a time limit by default

        return self.current_puzzle, reward, terminated, truncated, {}

# Initialize and check the environment
env = SudokuEnv()
check_env(env)

# Training Configuration for DQN
model_dqn = DQN(
    'MlpPolicy',
    env,
    verbose=1,
    learning_rate=0.0001,  # Smaller learning rate for stability
    batch_size=64,         # Increased batch size
    buffer_size=100000,    # Replay buffer size
    exploration_fraction=0.1,  # Exploration-exploitation balance
    gradient_steps=1,      # Gradient updates per step
    target_update_interval=1000,  # Target network update frequency
    policy_kwargs={"net_arch": [128, 128]}  # Neural network architecture
)

checkpoint_callback_dqn = CheckpointCallback(save_freq=1000, save_path='./models/dqn/')
model_dqn.learn(total_timesteps=10000, callback=checkpoint_callback_dqn)

# Training Configuration for PPO
model_ppo = PPO(
    'MlpPolicy',
    env,
    verbose=1,
    learning_rate=0.0003,  # Adjusted learning rate
    n_steps=2048,          # Number of steps to run for each environment per update
    batch_size=64,         # Batch size for updates
    ent_coef=0.01,         # Entropy coefficient for exploration-exploitation balance
    clip_range=0.2,        # PPO clipping range
    policy_kwargs={"net_arch": [128, 128]}  # Neural network architecture
)

checkpoint_callback_ppo = CheckpointCallback(save_freq=1000, save_path='./models/ppo/')
model_ppo.learn(total_timesteps=10000, callback=checkpoint_callback_ppo)

# Evaluate Models
def evaluate_model(model, env, num_episodes=100):
    rewards = []
    successes = 0
    for _ in range(num_episodes):
        obs, _ = env.reset()
        done = False
        total_reward = 0
        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, truncated, _ = env.step(action)
            total_reward += reward
        rewards.append(total_reward)
        if total_reward > 0:  # Assuming positive reward means success
            successes += 1
    success_rate = successes / num_episodes
    return np.mean(rewards), success_rate

reward_dqn, success_rate_dqn = evaluate_model(model_dqn, env)
reward_ppo, success_rate_ppo = evaluate_model(model_ppo, env)

# Compare results
print(f"Average Reward for DQN: {reward_dqn}")
print(f"Success Rate for DQN: {success_rate_dqn * 100:.2f}%")
print(f"Average Reward for PPO: {reward_ppo}")
print(f"Success Rate for PPO: {success_rate_ppo * 100:.2f}%")

# Determine which algorithm is better
if reward_dqn > reward_ppo:
    print("DQN performed better than PPO.")
elif reward_ppo > reward_dqn:
    print("PPO performed better than DQN.")
else:
    print("Both algorithms performed equally well.")

# Print final rewards and success rates for both algorithms
def print_final_results():
    print("Final Results:")
    print(f"DQN Final Average Reward: {reward_dqn}")
    print(f"DQN Success Rate: {success_rate_dqn * 100:.2f}%")
    print(f"PPO Final Average Reward: {reward_ppo}")
    print(f"PPO Success Rate: {success_rate_ppo * 100:.2f}%")

# Call the function to display results
print_final_results()


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 2.25     |
|    ep_rew_mean      | 2.82     |
|    exploration_rate | 0.991    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 3000     |
|    time_elapsed     | 0        |
|    total_timesteps  | 9        |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 1.62     |
|    ep_rew_mean      | 1.78     |
|    exploration_rate | 0.988    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2036     |
|    time_elapsed     | 0        |
|    total_timesteps  | 13       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 1.5      |
|    ep_rew_mean      | 2.48  

In [None]:
def display_final_solution(model, env):
    obs, _ = env.reset()
    done = False
    print("Initial Puzzle:")
    print(env.current_puzzle.reshape(9, 9))  # Display the initial puzzle
    while not done:
        action, _ = model.predict(obs)
        obs, _, done, _, _ = env.step(action)
    print("\nFinal Solution:")
    print(env.solution.reshape(9, 9))  # Display the correct solution for comparison

display_final_solution(model_dqn, env)


Initial Puzzle:
[[2 0 6 0 3 0 0 9 1]
 [0 0 3 0 0 0 8 4 0]
 [0 1 0 7 0 0 0 0 6]
 [4 0 5 0 0 9 0 6 8]
 [0 2 0 0 0 5 0 3 0]
 [0 0 1 0 4 0 0 0 0]
 [1 9 0 6 0 0 0 2 7]
 [0 7 0 4 0 1 0 0 5]
 [0 0 4 2 8 0 0 0 0]]

Final Solution:
[[2 4 6 5 3 8 7 9 1]
 [7 5 3 9 1 6 8 4 2]
 [8 1 9 7 2 4 3 5 6]
 [4 3 5 1 7 9 2 6 8]
 [9 2 7 8 6 5 1 3 4]
 [6 8 1 3 4 2 5 7 9]
 [1 9 8 6 5 3 4 2 7]
 [3 7 2 4 9 1 6 8 5]
 [5 6 4 2 8 7 9 1 3]]


  and should_run_async(code)
