### Handle imports

In [1]:
import csv
import json
import os
import sys
import time

import numpy as np
from tqdm.auto import tqdm

### Define Puzzle objects

In [2]:
class Move:
    
    def __init__(self, name: str, permutation: np.ndarray):
        self.name = name
        self.permutation = permutation
        self.is_reverse = self.name.startswith("-")
    
    def get_reverse(self):
        assert not self.is_reverse, "move is already a reverse move"
        return Move("-" + self.name, np.argsort(self.permutation))
    
    def get_zero_move(self):
        return Move("0", np.arange(len(self.permutation)))
    
    def __getitem__(self, i):
        return self.permutation[i]
        
    def __call__(self, a: np.ndarray, inplace: bool = True):
        assert isinstance(a, np.ndarray), "argument must be a numpy array"
        if inplace:
            a[:] = a[self.permutation]
        else:
            return a[self.permutation]
        
    def __len__(self):
        return len(self.permutation)
        
        
class MoveSet:
    
    class MoveSetIter:
        
        def __init__(self, *moves: Move):
            self.moves = list(moves)
            self.index, self.stop_index = -1, len(moves)
            
        def __next__(self):
            self.index += 1
            if self.index == self.stop_index:
                raise StopIteration
            return self.moves[self.index]
        
        def __iter__(self):
            return self
            
    
    def __init__(self, type_: str, *moves: Move):
        self.type_ = type_
        self.moves = {
            move.name: move
            for move in moves
        }
        
    def __iter__(self):
        return iter(MoveSet.MoveSetIter(*self.moves.values()))
        
    def __getitem__(self, name: str):
        return self.moves[name]
    
    def __len__(self):
        return len(self.moves)


class Puzzle:
    
    def __init__(self, id: int, type_: str,
                 solution_state: list[str], initial_state: list[str],
                 num_wildcards: int):
        
        self.id = id
        self.type_ = type_
        
        # set initial and solution states
        num = 1  # must start from 1! Else the solver breaks
        sym2num: dict[str, int] = dict()  # unique symbol (letter) to number mapping
        for sym in solution_state:
            if sym not in sym2num:
                sym2num[sym] = num
                num += 1
        
        self.solution_state = np.array([sym2num[sym] for sym in solution_state], dtype=np.uint)
        self.initial_state = np.array([sym2num[sym] for sym in initial_state], dtype=np.uint)
        self.current_state = self.initial_state.copy()
        self.num_wildcards = num_wildcards
        
        self.solution_path: list[np.ndarray] = [self.current_state]
        self.applied_moves: list[np.ndarray] = []
        
    def apply_moves(self, *moves: Move):
        for move in moves:
            self.current_state = move(self.current_state, inplace=False)
            self.solution_path.append(self.current_state)
            self.applied_moves.append(move.permutation.copy())
        
    def print_solution_path(self, include_moves: bool = False, symbol_dict: dict[int, str] | None = None):
        n_moves = len(self.applied_moves)
        for i, state in enumerate(self.solution_path):
            print(f"State {i}:", end="")
            if symbol_dict is not None:
                print([symbol_dict[s] for s in state], end="")
            else:
                print(state, end="")
            print()
            
            if include_moves and i < n_moves:
                print(f"\tMove ({i}->{i+1}):", self.applied_moves[i])
                
    def is_solved(self) -> bool:
        return np.array_equal(self.current_state, self.solution_state)
                
    def reset(self):
        self.current_state = self.initial_state
        self.solution_path = [self.current_state]
        self.applied_moves = []
        
    def __len__(self):    
        return len(self.current_state)
                

In [29]:
import concurrent.futures


class Node:
    
    ID = 0
    
    def __init__(self, state: np.ndarray, root_state: np.ndarray = None):
        self.state = state
        self.root_state = root_state
        self.parent = None
        self.children = []
        self.id = Node.ID
        Node.ID += 1
        
        self.max_build_iters = 0
        self.build_iter = 0
        
    def set_parent(self, parent):
        self.parent = parent
        
    def add_child(self, *children):
        self.children.extend(children)
        
    def is_terminal(self):
        return self.root_state is not None and np.array_equal(self.state, self.root_state)
    
    def reset(self):
        self.parent = None
        self.children = []
    
    def __hash__(self):
        return hash(self.id)


class MoveTree:
    
    def __init__(self, moveset: MoveSet, initial_state: np.ndarray | None = None):
        self.moveset = moveset
        move_len = len(list(self.moveset.moves.values())[0])
        self.initial_state = np.array(range(move_len), dtype=np.uint) \
            if initial_state is None else initial_state.copy()
        assert self.initial_state.size == move_len
        
        self.root: Node = Node(self.initial_state.copy())
        self.branches: dict[tuple[Node, Node], Move] = dict()
        
    def build(self, max_depth: int = 10):
        self.branches = dict()
        self.root.reset()
        
        num_moves = len(self.moveset.moves.values())
        self.max_build_iters = sum(num_moves ** i for i in range(1, max_depth + 1))
        self.build_iter = 0
        self.build_progress = iter(tqdm(range(self.max_build_iters)))
        
        self.terminal_nodes: list[Node] = []
        if self.root.is_terminal():
            self.terminal_nodes.append(self.root)
        
        self.add_children(self.root, 1, max_depth=max_depth)
        
        while True:
            try:
                next(self.build_progress)
            except StopIteration:
                break
        
    def add_children(self, node: Node, depth: int, max_depth: int = 10):
        if depth > max_depth:
            return

        for move in self.moveset:
            next(self.build_progress)
            self.build_iter += 1
            
            new_state = move(node.state, inplace=False)
            new_node = Node(new_state, self.initial_state)
            node.add_child(new_node)
            new_node.set_parent(node)
            
            self.branches[node, new_node] = move
            if new_node.is_terminal():
                self.terminal_nodes.append(new_node)
            else:
                self.add_children(new_node, depth + 1, max_depth=max_depth)
                
    def get_terminal_nodes(self) -> list[Node]:
        return self.terminal_nodes
        
    # def get_terminal_nodes_rc(self, terminal_nodes: list[Node], node: Node):
    #     if node.is_terminal():
    #         terminal_nodes.append(node)
    #         return
    #     
    #     for child in node.children:
    #         self.get_terminal_nodes_rc(terminal_nodes, child)
    
    # def add_children(self, node: Node, depth: int, max_depth: int = 100):
    #     if depth > max_depth:
    #         return
    # 
    #     with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
    #         futures = []
    #         for move in self.moveset:
    #             if not move.is_reverse:
    #                 futures.append(executor.submit(self.create_child, node, move, depth, max_depth))
    # 
    #         concurrent.futures.wait(futures)
    # 
    #         for future in futures:
    #             new_node = future.result()
    #             if new_node is not None:
    #                 node.add_child(new_node)
    # 
    # def create_child(self, node: Node, move, depth: int, max_depth: int):
    #     new_state = move(node.state, inplace=False)
    #     new_node = Node(new_state, self.initial_state)
    #     new_node.set_parent(node)
    #     if not new_node.is_terminal():
    #         self.add_children(new_node, depth + 1, max_depth=max_depth)
    #     return new_node
        

### Load data

In [4]:
def get_reader(path: str):
    return csv.reader(open(path, mode="r", encoding="utf-8"), delimiter=",")


Map symbols (strings) to ids (integers)

Load puzzles

In [5]:
reader = get_reader("data/puzzles.csv")
next(reader)  # skip header

id_puzzle: dict[int, Puzzle] = dict()
for line in reader:
    id = int(line[0])
    type_ = line[1]
    solution_state = line[2].split(";")
    initial_state = line[3].split(";")
    num_wildcards = int(line[4])
    
    id_puzzle[id] = Puzzle(id, type_, solution_state, initial_state, num_wildcards)


Load moves

In [6]:
# set csv max field size to handle reading in the puzzle info
csv_limit = sys.maxsize
while True:
    try:
        csv.field_size_limit(csv_limit)
        break
    except OverflowError:
        csv_limit = int(csv_limit / 10)
        
# read puzzle info
reader = get_reader("data/puzzle_info.csv")
next(reader)  # skip header

type_moveset: dict[str, MoveSet] = dict()
for line in reader:
    type_ = line[0]
    moves = json.loads(line[1].replace("'", '"'), )
    move_objects = []
    for name, permutation in moves.items():
        move = Move(name, np.array(permutation))
        reverse_move = move.get_reverse()
        move_objects.append(move)
        move_objects.append(reverse_move)
        
    if type_ in type_moveset:
        raise AssertionError(f"type {type_} already in type_moveset")
    
    type_moveset[type_] = MoveSet(type_, *move_objects)

Load sample submission

In [7]:
puzzleid_moveids: dict[int, list[str]] = dict()
reader = get_reader("submissions/sample_submission.csv")
next(reader)  # skip header

for line in reader:
    id = int(line[0])
    moveids = line[1].split(".")
    puzzleid_moveids[id] = moveids
    

Verify sample submission

In [8]:
def score_submission(puzzleid_moveids: dict[int, list[str]]):
    score = 0
    for id, moveids in tqdm(puzzleid_moveids.items()):
        puzzle = id_puzzle[id]
        moveset = type_moveset[puzzle.type_]
        for moveid in moveids:
            try:
                puzzle.apply_moves(moveset[moveid])
            except KeyError:
                raise AssertionError(f"move with id {moveid} is not an allowed move for puzzle {id}")
        num_wrong_facelets = np.sum(puzzle.solution_state != puzzle.current_state)
        if num_wrong_facelets > puzzle.num_wildcards:
            raise AssertionError(f"submitted moves do not solve puzzle {id}")
        
        puzzle.reset()
        
        score += len(moveids)
        
    return score

In [9]:
# --- SCORE SAMPLE SUBMISSION ---
# [puzzle.reset() for puzzle in id_puzzle.values()]
# print("Sample submission score:", score_submission(puzzleid_moveids))

### Building the MILP puzzle solver

Handle Gurobi imports

In [10]:
import gurobipy as gp
from gurobipy import GRB

In [11]:
class PuzzleSolver:
    
    LICENSE_ID = 2457954  # PC: 2456730
    
    THREAD_COUNT = 32
    TIME_LIMIT = 300
    
    BIG_M = int(1e5)
    T_INIT = int(1e2)
    
    def __init__(self, 
                 puzzle: Puzzle, 
                 moveset: MoveSet, 
                 tl: int = TIME_LIMIT,
                 T: int = T_INIT, 
                 verbosity: int = 1):
        self.puzzle = puzzle
        self.moveset = moveset
        self.moves: dict[int, Move] = dict()
        self.moveidx_movename: dict[int, str] = dict()
        self.moveidx_reversemoveidx: dict[int, int] = dict()
        
        for k, move in enumerate(moveset):
            self.moves[k + 1] = move
            self.moveidx_movename[k + 1] = move.name
            
        self.moves[0] = self.moves[1].get_zero_move()
        self.moveidx_movename[0] = self.moves[0].name
        
        for k, move in self.moves.items():
            for m, reversemove in self.moves.items():
                if k != m:
                    if np.array_equal(np.argsort(move.permutation), 
                                      reversemove.permutation):
                        self.moveidx_reversemoveidx[k] = m
        
        # initialize environment
        self.env = gp.Env(params={
            "LicenseID": PuzzleSolver.LICENSE_ID,
            "Threads": PuzzleSolver.THREAD_COUNT,
            "TimeLimit": tl,
            "OutputFlag": int(bool(verbosity)),
            "MIPFocus": 1,  # 0: Balanced, 1: Feasible, 2: Optimal, 3: Bound
            # "Heuristics": 0.8,
            "Presolve": 2,  # 0: No, 1: Regular, 2: Aggressive
            # "Cuts": 1,
        })
        self.env.start()
        
        # initialize model
        self.model: gp.Model | None = None
        
        # initialize constants
        self.T = T  # max no. moves
        self.N = len(puzzle)  # i, j = 1, ..., N
        self.K = len(self.moves)  # k = 1, ..., K
        self.W = puzzle.num_wildcards  # max. no. wildcards
        
        # initialize variable dicts
        self.p: dict[tuple[int, int], gp.Var] = dict()  # puzzle piece [INT]: (t, i)
        self.m: dict[tuple[int, int, int], gp.Var] = dict()  # executed move [BIN]: (t, j, i)
        self.mu: dict[tuple[int, int], gp.Var] = dict()  # move selector [BIN]: (t, k)
        self.M: dict[tuple[int, int, int], gp.Var] = dict()  # allowed move [BIN]: (k, j, i)
        self.w: dict[int, gp.Var] = dict()  # wildcard use [BIN]: i
        
        self.verbosity = verbosity
        
    def set_verbosity(self, level: int):
        self.verbosity = level
        
    def initialize(self):
        model_name = f"PuzzleSolver_P{self.puzzle.id}_[T{self.T}, N{self.N}, K{self.K}]"
        self.log(1, f"Initializing solver {model_name} ...")
        
        # (re)initialize model and variables
        self.model = gp.Model(model_name, env=self.env)
        self.p, self.m, self.mu, self.M, self.w = dict(), dict(), dict(), dict(), dict()
        
        for i in range(self.N):
            self.w[i] = self.model.addVar(
                name="w_{%d}" % i, vtype=GRB.BINARY
            )
        
        for t in tqdm(range(self.T + 1)):
            for i in range(self.N):
                if t == 0:
                    lb = ub = self.puzzle.initial_state[i]
                else:
                    lb, ub = 0.0, float("inf")
                    
                self.p[t, i] = self.model.addVar(
                    name="p_{%d, %d}" % (t, i), vtype=GRB.INTEGER, 
                    lb=lb, ub=ub
                )
                
                if t < self.T:
                    for j in range(self.N):
                        self.m[t, j, i] = self.model.addVar(
                            name="m_{%d, %d, %d}" % (t, j, i), vtype=GRB.BINARY)
                        
            if t < self.T:
                for k in range(self.K):
                    self.mu[t, k] = self.model.addVar(
                        name="mu_{%d, %d}" % (t, k), vtype=GRB.BINARY
                    )
            
        self.log(1, 
                 ("Initialized model with %d variables." % 
                  sum((len(self.p), len(self.m), len(self.mu), len(self.M)))))
                    
    def add_constraints(self):
        self.log(1, "Adding constraints ...")
        
        # final solution/wildcard constraints
        for i in range(self.N):
            # (1) final solution must be exact unless wildcard
            self.model.addConstr(
                self.p[self.T, i] + PuzzleSolver.BIG_M * self.w[i] >= 
                self.puzzle.solution_state[i]
            )
            
            self.model.addConstr(
                self.p[self.T, i] - PuzzleSolver.BIG_M * self.w[i] <= 
                self.puzzle.solution_state[i]
            )
            
        # (2) may use at most W wildcards
        self.model.addConstr(
            self.W >= gp.quicksum(self.w[i] for i in range(self.N)),
        )
        
        for t in tqdm(range(self.T)):
            # piece repositioning constraint: 
            # p_{t+1, i} == \sum_{j=1}^N m_{t, j, i} * p_{t, i}
            self.model.addConstrs(
                self.p[t + 1, i] == gp.quicksum(self.m[t, j, i] * self.p[t, j] 
                                                for j in range(self.N))
                for i in range(self.N)
            )
            
            # move selection constraint:
            # m_{t, j, i} == \sum_{k=1}^K \mu_{t, k} M_{k, j, i}
            self.model.addConstrs(
                self.m[t, j, i] == gp.quicksum((self.moves[k][i] == j) * self.mu[t, k]
                                               for k in range(self.K))
                for j in range(self.N) for i in range(self.N)
            )
            
            self.model.addConstrs(
                1 == gp.quicksum(self.m[t, j, i] for i in range(self.N))
                for j in range(self.N)
            )

            self.model.addConstrs(
                1 == gp.quicksum(self.m[t, j, i] for j in range(self.N))
                for i in range(self.N)
            )
            
            # exactly one move per time step:
            # \sum_{k=1}^K \mu_{t, k} == 1
            self.model.addConstr(
                1 == gp.quicksum(self.mu[t, k] for k in range(self.K))
            )

            # only zero-moves after the first zero-move:
            # \mu_{t+1, 0} == \mu_{t, 0}
            if t < self.T - 1:
                self.model.addConstr(
                    self.mu[t + 1, 0] >= self.mu[t, 0],
                )
               
            # don't follow move with reverse of move 
            if t < self.T - 1:
                for k in range(1, self.K):
                    m = self.moveidx_reversemoveidx[k]
                    self.model.addConstr(
                        self.mu[t + 1, m] <= 1 - self.mu[t, k]
                    )
        
        n_constraints = ((self.N * 2 + 1) +  # final solution constraints
                         (self.T * self.N) +  # piece repositioning constraints
                         (self.T * self.N ** 2) +  # move selection constraints
                         (self.T * self.K) +  # move-step constraints
                         (self.T - 1))  # zero-move constraints
        self.log(1, "Added %d constraints." % n_constraints)
        
    def build(self):
        self.initialize()
        self.add_constraints()
        self.model.setObjective(
            gp.quicksum(self.mu[t, k] for t in range(self.T - 1) 
                        for k in range(1, self.K)),  # zero-moves are free
            sense=GRB.MINIMIZE
        )
            
    def optimize(self) -> int:  # 0 if optimal, 1 if timeout, -1 if other
        self.model.optimize()
        if self.model.status == GRB.OPTIMAL:
            return 0
        
        try:
            self.model.getAttr("X", self.model.getVars())
            return 1 if self.model.status == GRB.TIME_LIMIT else -1
        except gp.GurobiError:
            return -1
    
    def get_optimized_states(self) -> list[np.ndarray]:
        states: list[np.ndarray] = list()
        for t in range(self.T + 1):
            states.append(np.array([self.p[t, i].x for i in range(self.N)], dtype=np.uint))
            if self.mu[t, 0].x > 0.5:
                break
        return states
        
    def get_optimized_moves(self) -> list[Move]:
        moves: list[Move] = list()
        for t in range(self.T):
            if self.mu[t, 0].x > 0.5:
                break
        
            for k in range(1, self.K):
                if self.mu[t, k].x > 0.5:
                    moves.append(self.moveset[self.moveidx_movename[k]])
                    
        return moves
    
    def get_final_state(self) -> np.ndarray:
        return np.array([self.p[self.T, i].x for i in range(self.N)], dtype=np.uint)
    
    def get_solution_offset(self) -> np.ndarray:
        return np.array([(self.p[self.T, i].x - self.puzzle.solution_state[i]) for i in range(self.N)], dtype=np.int8)
    
    def get_wildcard_usage(self) -> list[bool]:
        return [self.w[i].x > 0.5 for i in range(self.N)]
        
    def log(self, 
            verbosity_level: int,
            *values: object, 
            sep: str | None = " ", 
            end: str | None = "\n"):
        if verbosity_level <= self.verbosity:
            print(*values, sep=sep, end=end)
        

In [12]:
def get_write_path(puzzle_id: int) -> str:
    return f"solutions/puzzle_{puzzle_id}.csv"


def log_optimized_move_path(puzzle: Puzzle, moves: list[Move], tl: int, T: int, status: int):
    with open(get_write_path(puzzle.id), mode="w", newline="") as f:
        writer = csv.writer(f, delimiter=",")
        writer.writerow(["id", "tl", "T", "status", "moves"])
        writer.writerow([
            puzzle.id, 
            str(tl),
            str(T), 
            str(status), 
            ".".join(move.name for move in moves)
        ])


def solve_and_log(puzzle_id: int, max_tries: int = 3):
    if os.path.exists(get_write_path(puzzle_id)):
        return
    
    puzzle = id_puzzle[puzzle_id]
    moveset = type_moveset[puzzle.type_]
    
    tl = PuzzleSolver.TIME_LIMIT * 12  # (from 5 minutes (default) to 60 minutes)
    T = PuzzleSolver.T_INIT // 2  # (from 100 steps (default) to 50 steps)
    tries = 0
    status = -1
    while status == -1 and tries < max_tries:
        solver = PuzzleSolver(
            puzzle, moveset, tl=tl, T=T, verbosity=1
        )
        solver.build()
        status = solver.optimize()
        if status == 0 or (status == 1 and tries == max_tries - 1):
            # verify solution:
            puzzle.reset()
            puzzle.apply_moves(*solver.get_optimized_moves())
            assert np.sum(puzzle.current_state != puzzle.solution_state) <= puzzle.num_wildcards, \
                    "optimized moves do not solve the puzzle"
            puzzle.reset()
            
            log_optimized_move_path(
                puzzle, solver.get_optimized_moves(), tl, T, status
            )
            
        # tl *= 6
        # if status != 1:  # found no feasible solutions with a time horizon of T
        T *= 10
            
        tries += 1
            

In [13]:
from IPython.display import clear_output


for id in sorted(id_puzzle.keys()):
    if id < 338:  # globes
        continue
        
    solve_and_log(id, max_tries=1)
    
    clear_output(wait=True) 
    

Set parameter Username
Set parameter TimeLimit to value 3600
Set parameter MIPFocus to value 1
Set parameter LicenseID to value 2457954
Set parameter Presolve to value 2
Set parameter Threads to value 32
Academic license - for non-commercial use only - expires 2024-12-20
Initializing solver PuzzleSolver_P338_[T50, N32, K37] ...


  0%|          | 0/51 [00:00<?, ?it/s]

Initialized model with 54682 variables.
Adding constraints ...


  0%|          | 0/50 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [30]:
fst_type = list(type_moveset.keys())[2]
fst_moveset = type_moveset[fst_type]
print(fst_type)

movetree = MoveTree(fst_moveset)

cube_4/4/4


In [31]:
print(len(movetree.moveset.moves.values()))

24


In [33]:
movetree.build(max_depth=4)

  0%|          | 0/346200 [00:00<?, ?it/s]

In [34]:
print(movetree.root.children[0].state)

[ 0  1  2  3  4  5  6  7  8  9 10 11 79 75 71 67 28 24 20 16 29 25 21 17
 30 26 22 18 31 27 23 19 12 33 34 35 13 37 38 39 14 41 42 43 15 45 46 47
 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 80 68 69 70 81
 72 73 74 82 76 77 78 83 44 40 36 32 84 85 86 87 88 89 90 91 92 93 94 95]


In [35]:
terminal_nodes = movetree.get_terminal_nodes()
print(len(terminal_nodes))

744


In [39]:
nodename_paths: set[str] = set()
for terminal_node in terminal_nodes:
    node_path = [terminal_node]
    
    node = terminal_node
    while node.parent is not None:
        node = node.parent
        node_path.append(node)
        
    node_path = list(reversed(node_path))
    nodename_path = []
    for i, node in enumerate(node_path[:-1]):
        nodename_path.append(movetree.branches[node, node_path[i+1]].name)
        
    nodename_paths.add(".".join(nodename_path))
    
print(len(nodename_paths))
nodename_paths_iter = iter(nodename_paths)
for _ in range(10):
    print(len(next(nodename_paths_iter).split(".")))
    print(next(nodename_paths_iter))
    

744
4
r1.f1.-f1.-r1
4
d0.f1.-f1.-d0
4
f3.-r3.r3.-f3
4
r1.r1.r1.r1
4
r0.r0.r0.r0
4
-d0.-d0.d0.d0
4
d0.f3.-f3.-d0
4
-f2.f2
4
-d0.d1.d0.-d1
4
-r2.-r1.r2.r1
