# Introduction

This notebook takes an RL approach using Deep Q Networks to solve the challenge. <br>
So far I have not had much luck, only occasionally reaching better solutions for wreaths, but no solutions for any other problems. <br>

I am relatively new to RL and wanted to explore it via this challenge, so any feedback and comments are very appreciated. <br>
This is also my first public Kaggle Notebook, so any advice on structure/conventions in Kaggle are also welcome!

# Hyperparameters and Constants

In [None]:
puzzle_info_path = '/kaggle/input/santa-2023/puzzle_info.csv'
puzzles_path = '/kaggle/input/santa-2023/puzzles.csv'
sample_submission_path = '/kaggle/input/santa-2023/sample_submission.csv'

In [None]:
import os

NUM_EPISODES = 1
MAX_STEPS = 1_000
CHECKPOINT_AT = 25

BATCH_SIZE = 32
GAMMA = 0.995
GAMMA_LARGE = 0.9995
EPS_START = 0.990
EPS_END = 0.001
EPS_DECAY = 1000
TAU = 0.1
LR = 1e-4

TRAIN = True
CONTINUED_RUN = False
BASE_DIR = os.getcwd()

EXP_PATH = os.path.join(BASE_DIR, 'experiments')

# Utilities

In [None]:
import re

import math
import random

from collections import namedtuple, deque

import numpy as np
import pandas as pd

In [None]:
pz_info = pd.read_csv(puzzle_info_path)
puzzles = pd.read_csv(puzzles_path)
ss = pd.read_csv(sample_submission_path)

ss['puzzle_type'] = puzzles['puzzle_type']

puzzles['initial_state'] = puzzles['initial_state'].apply(lambda x: x.split(';'))
puzzles['solution_state'] = puzzles['solution_state'].apply(lambda x: x.split(';'))
puzzles['moves_in_ss'] = ss['moves'].apply(lambda x: x.split('.')).apply(len)

legal_moves = {}
for puzzle_type in pz_info['puzzle_type'].unique():
    moves = eval(pz_info[pz_info['puzzle_type'] == puzzle_type]['allowed_moves'].values[0])
    moves_ = moves.copy()
    for m, d in moves_.items():
        moves[f'-{m}'] = list(np.argsort(d))
    legal_moves[puzzle_type] = moves

all_moves = set()
for puzzle_type in legal_moves.keys():
    for k, v in legal_moves[puzzle_type].items():
        all_moves.add(k)
all_moves = sorted(list(all_moves))

# ACTION_SPACE_SIZE = 268
# len(all_moves)

# OBS_SPACE_SIZE = 6534
# max(len(list(legal_moves[puzzle_type].values())[0]) for puzzle_type in legal_moves.keys())

NUM_PUZZLES = 398
# puzzles.shape[0]

In [None]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
def epsilon_decay(step, max_step):
    return EPS_END + (EPS_START - EPS_END) * math.exp(-1. * step / max_step)

## Dividing the puzzles

I am dividing the problems into 5 categories. These categories will be trained and tested in 5 different environments. <br>
The 5 categories are: small cubes, large cubes, small globes, large globes, and wreaths

In [None]:
def divide_puzzles(puzzles):
    small_cubes = ['cube_2/2/2', 'cube_3/3/3', 'cube_4/4/4', 'cube_5/5/5']
    small_cubes_condition = puzzles['puzzle_type'].str.contains(small_cubes[0])
    for sc in small_cubes[1:]:
        small_cubes_condition |= puzzles['puzzle_type'].str.contains(sc)
    large_cubes_condition = puzzles['puzzle_type'].str.contains('cube') & ~small_cubes_condition
    
    small_globes = ['globe_1/8', 'globe_2/6', 'globe_3/4','globe_1/16']
    small_globes_condition = puzzles['puzzle_type'].str.contains(small_globes[0])
    for sg in small_globes[1:]:
        small_globes_condition |= puzzles['puzzle_type'].str.contains(sg)
    large_globes_condition = puzzles['puzzle_type'].str.contains('globe') & ~small_globes_condition
    
    wreaths_condition = puzzles['puzzle_type'].str.contains('wreath')
    
    return [
        puzzles[small_cubes_condition],
        puzzles[large_cubes_condition],
        puzzles[small_globes_condition],
        puzzles[large_globes_condition],
        puzzles[wreaths_condition]
    ]

def combine_solutions(*sols):
    return pd.concat(*sols).sort_index()

# Environment

In [None]:
import string
import logging

from collections import defaultdict

import numpy as np
import pandas as pd

import gymnasium as gym
from gymnasium import spaces

## Reward function

In [None]:
def reward_function(curr_state, prev_state, solution, num_wildcards, moves_played, moves_in_ss, solving_penalty=0.005):
    mult = (100 * moves_in_ss) / (len(solution) * moves_played)
    curr_r = np.sum(curr_state == solution)
    if curr_r >= len(solution) - num_wildcards:
        return np.round(curr_r * mult, 6)
    prev_r = np.sum(prev_state == solution)
    return np.round(max(curr_r, prev_r) * mult * solving_penalty, 6)

In [None]:
def apply_move(state, move, puzzle_type):
    return state[legal_moves[puzzle_type][move]]

def is_solution(state, solution, num_wc):
    return True if np.sum(state != solution) - num_wc <= 0 else False

def rNone():
    return None

In [None]:
class SantaEnv(gym.Env):   
    def __init__(self,
                 puzzles,
                 env_name = None,
                 render_mode=None):
        super().__init__()
        self.render_mode = render_mode
        self.alphabet = {k:i for i, k in enumerate(string.ascii_letters)}
        
        self.legal_moveset = []
        for pzt in puzzles.puzzle_type.unique():
            self.legal_moveset.extend(list(legal_moves[pzt].keys()))
        self.legal_moveset = list(set(self.legal_moveset))
        
        self.action_to_move = {i:k for i, k in enumerate(self.legal_moveset)}
        
        self.testing = False
        
        self.puzzles = puzzles
        self.env_name = env_name if env_name is not None else 'not_named'
                
        self.state_size = max(puzzles.initial_state.apply(len))
        self.action_size = len(self.legal_moveset)
        self.observation_space = spaces.Box(0, self.state_size-1, shape=(self.state_size,), dtype=np.int64)
        self.action_space = spaces.Discrete(self.action_size)
        
        self.reward_range = (0, self.state_size*10+1)
        self.discount = 1
        
        self.solution_dict = defaultdict(rNone)
        # {-1: {"id": -1, "moves": "f1.f0", "puzzle_type":"placeholder"}}
        
        self.step_counter = 0
        self.solution_state = None
        self.current_state = None
        self.prev_state = None
        self.moves_in_ss = 0
        
        self.num_wc = 0
        self.current_puzzle = None
        self.current_puzzle_id = None
        self.current_puzzle_type = None
        
        self.valid_moves = None
        self.actions_taken = []
        
        self.max_step = MAX_STEPS
    
    def state_to_numeric(self, state):
        if state is None:
            return None
        obs_mask = np.ones((self.state_size,)) * -1
        prefix = np.array([self.alphabet[k] if k in string.ascii_letters else int(k[1:])+len(string.ascii_letters) for k in state])
        obs_mask[:len(prefix)] = prefix
        return obs_mask
    
    def _get_obs(self):
        return self.state_to_numeric(self.current_state)
    
    def _get_info(self):
        return {}
    
    def reset(self, index=None, seed=None):
        super().reset(seed=seed)
        
        if index is None:
            self.current_puzzle = self.puzzles.iloc[np.random.randint(0, self.puzzles.shape[0]-1)]
        else:
            self.current_puzzle = self.puzzles.loc[index]
        
        self.current_puzzle_id = self.current_puzzle.id
        if not self.testing:
            print(f"<{self.env_name}> Puzzle ID: {self.current_puzzle_id}")

        self.current_puzzle_type = self.current_puzzle.puzzle_type
        self.current_state = np.array(self.current_puzzle.initial_state)
        self.solution_state = np.array(self.current_puzzle.solution_state)

        self.num_wc = self.current_puzzle.num_wildcards
        self.valid_moves = list(legal_moves[self.current_puzzle_type].keys())
        self.moves_in_ss = self.current_puzzle.moves_in_ss
        self.actions_taken = []
                
        self.step_counter = 0
        self.max_step = max(MAX_STEPS, int(self.moves_in_ss * 0.75))
        self.done = False
        self.discount = 1
        
        obs = self._get_obs()
        info = self._get_info()
        
        return obs, info
    
    def step(self, action, discount = GAMMA):
        self.prev_state = self.current_state.copy()
        self.current_state = apply_move(self.current_state,
                                            self.action_to_move[action],
                                            self.current_puzzle_type)        
        self.step_counter += 1
        self.actions_taken.append(action)
        
        terminated = False
        truncated = False
        if is_solution(self.current_state, self.solution_state, self.num_wc):
            if not self.testing:
                print(f"<{self.env_name}> {self.current_puzzle_id} solved!")
            
            moves = ".".join([self.action_to_move[a] for a in self.actions_taken])
            if self.solution_dict[self.current_puzzle_id] is None or \
                len(self.solution_dict[self.current_puzzle_id]['moves'].split('.')) > len(moves.split('.')):
                self.solution_dict[self.current_puzzle_id] = {"id": self.current_puzzle_id,
                                                              "moves": moves,
                                                              "puzzle_type": self.current_puzzle_type}
            terminated = True

        if self.step_counter > self.max_step and not terminated:
            if not self.testing:
                print(f"<{self.env_name}> Max Steps Reached!")
            truncated = True
        
        reward = self.discount * reward_function(self.current_state, self.prev_state, self.solution_state, self.num_wc, self.step_counter+1, self.moves_in_ss)
        self.discount *= discount
        obs = self._get_obs()
        info = self._get_info()
        
        return obs, np.round(reward, 6), terminated, truncated, info
    
    def close(self):
        super().close()
    
    def action_mask(self):
        return np.array([1 if k in self.valid_moves else 0 for k in self.legal_moveset])
    
    def get_random_valid_action(self):
        return np.random.choice(np.argwhere(self.action_mask() == 1).squeeze())
    
    def get_computed_moves(self):
        moves = pd.DataFrame.from_dict(self.solution_dict, orient='index')
        return pd.concat([moves, ss.iloc[self.puzzles.index]]).drop_duplicates('id', keep='first').sort_index()

# DQN and DQNAgent

In [None]:
import math
import random
import logging

from itertools import count

import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
                
        self.layer1 = nn.Linear(n_observations, 256)
        self.layer2 = nn.Linear(256, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [None]:
class DQNAgent(nn.Module):
    def __init__(self,
                 env,
                 criterion = None):
        super().__init__()
        
        self.env = env
        self.state_size = env.state_size
        self.action_size = env.action_size

        self.criterion = nn.SmoothL1Loss() if criterion is None else criterion()
        
        self.memory = ReplayMemory(1000)
        
        self.policy_net = DQN(self.state_size, self.action_size).to(device)
        self.target_net = DQN(self.state_size, self.action_size).to(device)
        
        self.target_net.load_state_dict(self.policy_net.state_dict())
        
        self.target_net.eval()
        self.optimizer = torch.optim.AdamW(self.policy_net.parameters(), lr=LR, amsgrad=True)
        
        self.gamma = GAMMA_LARGE if 'large' in self.env.env_name else GAMMA
            
    def select_action(self, state, testing=False):
        global device
        if testing or random.random() > epsilon_decay(self.env.step_counter, self.env.max_step):
            with torch.no_grad():
                a = self.policy_net(state).squeeze() * torch.tensor(self.env.action_mask()).to(device)
                a = a.masked_fill(a == 0, -torch.inf)
                return a.argmax().view(1, 1)
        else:
            return torch.tensor([[self.env.get_random_valid_action()]], device=device, dtype=torch.long)
        
    def isNone(self, s):
        return s is not None
    
    def optimize_agent(self):
        global device
        
        if len(self.memory) < BATCH_SIZE:
            return
        
        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        
        non_final_mask = torch.tensor(tuple(map(self.isNone, batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        
        next_state_values = torch.zeros(BATCH_SIZE, device=device)
        
        with torch.no_grad():
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1).values
        expected_state_action_values = (next_state_values) + reward_batch
        
        loss = self.criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        # In-place gradient clipping
        torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        self.optimizer.step()
    
    def train_agent(self, num_episodes=NUM_EPISODES):
        global device
        
        self.rewards_per_ep = []
        for i_ep in range(num_episodes):
            print(f"<{self.env.env_name}> Episode {i_ep}")
            
            state, _ = self.env.reset()
            state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            ep_reward = 0
            
            for t in count():
                action = self.select_action(state)
                observation, reward, terminated, truncated, _ = self.env.step(action.item(), self.gamma)
                ep_reward += reward
                reward = torch.tensor([reward], device=device)
                done = terminated or truncated
                
                if terminated:
                    next_state = None
                    print(f"<{self.env.env_name}> {self.env.solution_dict.keys()}")
                else:
                    next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
                
                self.memory.push(state, action, next_state, reward)
                state = next_state
                
                self.optimize_agent()
                
                target_net_state_dict = self.target_net.state_dict()
                policy_net_state_dict = self.policy_net.state_dict()
                for key in policy_net_state_dict:
                    target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
                self.target_net.load_state_dict(target_net_state_dict)
                
                
                if done:
                    self.rewards_per_ep.append(ep_reward)
                    print(f"<{self.env.env_name}> Episode {i_ep} ID: {self.env.current_puzzle_id} Mean Reward per Step: {ep_reward/t}")
                    break
    
    def test_agent(self):
        self.env.testing = True
        
        self.rewards_per_pz = []
        for pz in list(self.env.puzzles.index):
            state, _ = self.env.reset(pz)
            state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            
            ep_reward = 0
            while True:
                action = self.select_action(state, True)
                observation, reward, terminated, truncated, _ = self.env.step(action.item(), self.gamma)
                ep_reward += reward
                
                if terminated:
                    print(f"<{self.env.env_name}> Puzzle {pz} solved! Mean Reward per Step: {ep_reward/self.env.step_counter}")
                    break
                elif truncated:
                    print(f"<{self.env.env_name}> Puzzle {pz} not solved!")
                    break
            
            self.rewards_per_pz.append(ep_reward)
        self.env.testing = False

# Main

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
envs = [SantaEnv(pzs, name) for pzs, name in zip(divide_puzzles(puzzles), \
            ['small_cubes', 'large_cubes', 'small_globes', 'large_globes', 'wreaths'])]
dqns = [DQNAgent(env).to(device) for env in envs]

In [None]:
if TRAIN:
    for agent in dqns:
        agent.train_agent()
else:
    print("Skipping training...")

In [None]:
for agent in dqns:
    agent.test_agent()

In [None]:
moves = combine_solutions([env.get_computed_moves() for env in envs]) 

In [None]:
ss['move_length'] = ss['moves'].apply(lambda x: x.split('.')).apply(len)
moves['move_length'] = moves['moves'].apply(lambda x: x.split('.')).apply(len)

bool_mask = ss['move_length'] > moves['move_length']
submission = pd.concat([moves[bool_mask], ss]).drop_duplicates('id', keep='first').sort_index()

In [None]:
submission.drop(['puzzle_type', 'move_length'], axis=1, inplace=True)
submission.to_csv('./submission.csv', index=False)

# References

1. https://www.kaggle.com/code/maximeszymanski/ppo-deep-reinforcement-learning/notebook
2. https://www.kaggle.com/code/squarehare/q-learning-reinforcement-learning
3. https://www.kaggle.com/code/robikscube/santa-2023-polytope-permutation-first-look
4. https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
5. https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/