In [1]:
from pocket_cube.cube import Cube
from pocket_cube.cube import Move
from collections import deque
from heapq import heappop, heappush
from math import sqrt, log
from random import choice

import timeit

%matplotlib notebook

# Creating a scrambled cube 

In [2]:
cube = Cube("F' R U R U F' U'")

## 2D Cube visualization 

In [3]:
cube.render()

<IPython.core.display.Javascript object>

## Text representation 

In [4]:
cube.render_text()

  YB
  BB
OWRRYYOG
GOWWROBY
  GG
  RW


## 3D Cube visualization

For an interactive view of the cube, don't forget to use `%matplotlib notebook`

In [5]:
cube.render3D()

<IPython.core.display.Javascript object>

# Creating an unscrambled cube 

In [6]:
cube = Cube(scrambled=False)

In [7]:
cube.render3D()

<IPython.core.display.Javascript object>

## Applying moves on the cube 

In [8]:
cube = cube.move(Move.R)
# cube = cube.move(Move.F)

In [9]:
cube.render3D()

<IPython.core.display.Javascript object>

In [57]:
# cube.render3D_moves(cube.state, [Move.Fp, Move.Rp])

<IPython.core.display.Javascript object>

<matplotlib.animation.FuncAnimation at 0x14ea091cc40>

In [40]:
# Numarul total de patratele diferit de cel din goal state
# Impartirea la 8 este necesara pentru ca euristica sa fie admisibila
# - se muta cate 8 cubulete la o miscare 
def distance_to_all_correct(state, goal_state):
    return sum(s != g for s, g in zip(state, goal_state)) / 8

In [12]:
# Numarul perechilor adiacente care nu corespund solutiei
# Neadmisibila pentru ca la o miscare se modifica mai multe perechi
def adjacent_pair_count(state, goal_state):
    return sum((s1 != g1) or (s2 != g2) for (s1, s2, g1, g2) 
               in zip(state, state[1:], goal_state, goal_state[1:]))

In [13]:
def is_final(rubik_cube):
    return all(rubik_cube.state == rubik_cube.goal_state)

In [41]:
def get_neighbours(rubik_cube):
    return list((Cube.move_state(rubik_cube.clone_state(), Move.R), 
                 Cube.move_state(rubik_cube.clone_state(), Move.Rp),
                 Cube.move_state(rubik_cube.clone_state(), Move.F), 
                 Cube.move_state(rubik_cube.clone_state(), Move.Fp),
                 Cube.move_state(rubik_cube.clone_state(), Move.U), 
                 Cube.move_state(rubik_cube.clone_state(), Move.Up)))

In [42]:
def astar(rubik_cube, h):
    # Frontiera, ca listă (heap) de tupluri (cost-total-estimat, nod)
    frontier = []
    heappush(frontier, (0 + h(rubik_cube.state, rubik_cube.goal_state), rubik_cube.state))
    
    # Nodurile descoperite ca dictionar nod -> (parinte, cost-pana-la-nod)
    discovered = {Cube.hash_state(rubik_cube.state): (None, 0)}
    
    # Numarul de stari descoperite si lungimea caii solutiei
    num_states_discovered = 0
    solution_path_length = 0
    
    def astar_search():
        nonlocal num_states_discovered, solution_path_length

        while frontier:
            # Extrag nodul cu cel mai mic cost total din frontiera
            current_cost, current_node = heappop(frontier)
            rubik_cube.state = current_node
            if is_final(rubik_cube):
                # Calculez lungimea caii solutiei
                solution_path_length = discovered[Cube.hash_state(current_node)][1]
                break

            for neighbour_state in get_neighbours(rubik_cube):
                cost_to_neighbour = discovered[Cube.hash_state(current_node)][1] + 1
                # Verific daca vecinul nu a fost descoperit sau daca are un cost mai mic
                if (Cube.hash_state(neighbour_state) not in discovered 
                        or cost_to_neighbour < discovered[Cube.hash_state(neighbour_state)][1]):
                    discovered[Cube.hash_state(neighbour_state)] = (current_node, cost_to_neighbour)
                    heappush(frontier, (cost_to_neighbour + h(neighbour_state, rubik_cube.goal_state), tuple(neighbour_state)))
                    num_states_discovered += 1

    time = timeit.timeit(astar_search, number=1)
                
    return num_states_discovered, solution_path_length, time

In [45]:
cube = Cube("R U' R' F' U")
# cube = Cube("F' R U R U F' U'")
# cube = Cube("F U U F' U' R R F' R")
# cube = Cube("U' R U' F' R F F U' F U U")

num_states_discovered, solution_path_length, time = astar(cube, distance_to_all_correct)

print(f"Num states discovered: {num_states_discovered}")
print(f"Solution path length: {solution_path_length}")
print(f"Time taken: {time} seconds")

Num states discovered: 330
Solution path length: 5
Time taken: 0.04036739998264238 seconds


In [17]:
def bidirectional_bfs(start_cube, goal_cube):
    # Coada pentru start state, stocheaza tupluri (stare, adancime) -> bfs de la inceput
    start_queue = deque([(start_cube.state, 0)]) 
    # Coada pentru goal state, stocheaza tupluri (stare, adancime) -> bfs de la final
    goal_queue = deque([(goal_cube.goal_state, 0)])
    
    start_discovered = {Cube.hash_state(start_cube.state): 0} 
    goal_discovered = {Cube.hash_state(goal_cube.goal_state): 0}
    
    num_states_discovered = 0
    solution_path_length = 0
    
    # Numarul de stari descoperite si lungimea caii solutiei
    def bidirectional_bfs_search(): 
        nonlocal num_states_discovered, solution_path_length
        
        while start_queue and goal_queue:
            # Extrage starea si adancimea pentru start state
            start_state, start_depth = start_queue.popleft()
            # Extrage starea si adancimea pentru goal state
            goal_state, goal_depth = goal_queue.popleft()
            
            start_cube.state = start_state
            goal_cube.state = goal_state
             
            # Daca una dintre noile stari se afla deja in strarile descoperite de
            # celalalt bfs -> return solutie
            if (Cube.hash_state(start_state) in goal_discovered
                    or Cube.hash_state(goal_state) in start_discovered):
                solution_path_length = start_depth + goal_depth + 1
                break
            
            # continuare bfs de la inceput
            for neighbour_state in get_neighbours(start_cube):
                if Cube.hash_state(neighbour_state) not in start_discovered:
                    start_discovered[Cube.hash_state(neighbour_state)] = start_depth + 1
                    start_queue.append((neighbour_state, start_depth + 1))
                    num_states_discovered += 1
            
            # continuare bfs de la final
            for neighbour_state in get_neighbours(goal_cube):
                if Cube.hash_state(neighbour_state) not in goal_discovered:
                    goal_discovered[Cube.hash_state(neighbour_state)] = goal_depth + 1
                    goal_queue.append((neighbour_state, goal_depth + 1))
                    num_states_discovered += 1
    
    time = timeit.timeit(bidirectional_bfs_search, number=1)
                
    return num_states_discovered, solution_path_length, time

In [47]:
cube = Cube("R U' R' F' U")
# cube = Cube("F' R U R U F' U'")
# cube = Cube("F U U F' U' R R F' R")
# cube = Cube("U' R U' F' R F F U' F U U")

num_states_discovered, solution_path_length, time = bidirectional_bfs(cube, Cube(scrambled=False))

print(f"Num states discovered: {num_states_discovered}")
print(f"Solution path length: {solution_path_length}")
print(f"Time taken: {time} seconds")

Num states discovered: 288
Solution path length: 5
Time taken: 0.01467820000834763 seconds


In [49]:
# Constante pentru MCTS
N = 'N'
Q = 'Q'
PARENT = 'parent'
ACTIONS = 'actions'

In [50]:
# Functie ce intoarce un nod nou, eventual copilul unui nod dat ca argument
def init_node(parent = None):
    return {N: 0, Q: 0, PARENT: parent, ACTIONS: {}}

In [51]:
CP = 1.0 / sqrt(2.0)

# Functie ce alege o actiune dintr-un nod
def select_action(node, c = CP):
    """
    Se caută acțiunea a care maximizează expresia:
    Q_a / N_a  +  c * sqrt(2 * log(N_node) / N_a)
    """
    N_node = node[N]
    max_score = -1
    best_action = None

    for a, n in node[ACTIONS].items():
        crt_score = n[Q] / n[N] + c * sqrt(2 * log(N_node) / n[N])

        if max_score < crt_score:
            max_score = crt_score
            best_action = a

    return best_action

In [22]:
def mcts(initial_cube, budget, h, c, max_reward):
    tree = init_node(Cube.hash_state(initial_cube.state))
    
    num_states_discovered = 0
    solution_discovered = False

    def mcts_search():
        nonlocal num_states_discovered, solution_discovered

        for _ in range(budget):
            rubik_cube = initial_cube.clone()
            node = tree
    
            # Coboram in arbore pana cand ajungem la o stare finala
            # sau la un nod cu actiuni neexplorate.
            # Variabilele state si node se 'muta' impreuna.
            while (not is_final(rubik_cube) 
                   and all(Cube.hash_state(action) in node[ACTIONS] for action in get_neighbours(rubik_cube))):
                new_action_hash = select_action(node, c)
                new_action = Cube.inverse_hash_state(new_action_hash)
                rubik_cube.state = new_action
                node = node[ACTIONS][new_action_hash]
    
            # Dacă am ajuns intr-un nod care nu este final si din care nu s-au
            # `incercat` toate actiunile, construim un nod nou.
            if not is_final(rubik_cube):
                new_action = choice(list(filter(lambda a: Cube.hash_state(a) not in node[ACTIONS], get_neighbours(rubik_cube))))
                num_states_discovered += 1
                rubik_cube.state = new_action
                node = init_node(node)
                node[PARENT][ACTIONS][Cube.hash_state(new_action)] = node
                
            # Se simuleaza o desfasurare a jocului până la ajungerea intr-o starea finala. 
            # Se evaluează recompensa în acea stare. Limitare la 14 mutari pentru ca un 
            # astfel de cub rubic poate fi mereu rezolvat in 14 mutari
            limit = 14
            while not is_final(rubik_cube) and limit >= 0:
                rubik_cube.state = choice(get_neighbours(rubik_cube))
                num_states_discovered += 1
                limit -= 1
            
            # S-a gasit solutie
            if is_final(rubik_cube):
                solution_discovered = True
                
            reward = max_reward - h(rubik_cube.state, rubik_cube.goal_state)
    
            # Se actualizeaza toate nodurile de la node catre radacina:
            #  - se incrementeaza valoarea N din fiecare nod
            #  - se adauga recompensa la valoarea Q
            crt_node = node
            while isinstance(crt_node, dict):
                crt_node[N] += 1
                crt_node[Q] += reward
                crt_node = crt_node[PARENT]
                
    time = timeit.timeit(mcts_search, number=1)

    final_action = select_action(tree, 0.0)
    
    return solution_discovered, num_states_discovered, time, tree[ACTIONS][final_action]

In [23]:
# Calculeaza lungimea path ului solutiei unui tree returnat de MCTS
def get_solution_path_length(tree, solution_path_length = 1):
    if not tree:
        return solution_path_length
    
    actions = tree.get(ACTIONS, {})
    
    if actions:
        best_action = max(actions.keys(), key=lambda a: actions[a].get(Q, 0) / actions[a].get(N, 1))
        
        if best_action:
            solution_path_length += 1
            solution_path_length = get_solution_path_length(actions[best_action], solution_path_length)
            
    return solution_path_length

In [52]:
cube = Cube("R U' R' F' U")
# cube = Cube("F' R U R U F' U'")
# cube = Cube("F U U F' U' R R F' R")
# cube = Cube("U' R U' F' R F F U' F U U")

total_success_tests = 0
total_time_for_success = 0
total_num_states_for_success = 0
total_solution_path_length_for_success = 0

for i in range(20):
    solution_discovered, num_states_discovered, time, tree \
        = mcts(cube, budget=5000, h=distance_to_all_correct, c=0.5, max_reward=3)
    
    if solution_discovered:
        total_success_tests += 1
        total_num_states_for_success += num_states_discovered
        total_solution_path_length_for_success += get_solution_path_length(tree)
        total_time_for_success += time

print("Total success tests: ", total_success_tests)
if total_success_tests != 0:
    print("Avg num_states_for_success: ", total_num_states_for_success / total_success_tests)
    print("Avg solution_path_length_for_success: ", total_solution_path_length_for_success / total_success_tests)
    print("Avg time_for_success: ", total_time_for_success / total_success_tests)

Total success tests:  18
Avg num_states_for_success:  58443.11111111111
Avg solution_path_length_for_success:  5.444444444444445
Avg time_for_success:  9.834580133329533


In [24]:
def build_pattern_database_dict(rubik_cube):
    queue = deque([(rubik_cube.goal_state, 0)])
    discovered = {Cube.hash_state(rubik_cube.goal_state): 0}

    def build_pattern_database_bfs():        
        while queue:
            current_state, depth = queue.popleft()
    
            if depth > 7:
                break
    
            rubik_cube.state = current_state
            for neighbour_state in get_neighbours(rubik_cube):
                if Cube.hash_state(neighbour_state) not in discovered:
                    discovered[Cube.hash_state(neighbour_state)] = depth + 1
                    queue.append((neighbour_state, depth + 1))
                    
    time = timeit.timeit(build_pattern_database_bfs, number=1)
    
    return discovered, time

In [56]:
def pattern_database_heuristic(state, goal_state, pattern_database, h):
    if Cube.hash_state(state) in pattern_database:
        return pattern_database[Cube.hash_state(state)]
    else:
       return h(state, goal_state)

In [26]:
# Am modificat parametrii agoritmului A* pentru a putea folosi noua euristica
def astar_pattern_database(rubik_cube, pattern_database, h_pattern_databse, h):
    frontier = []
    heappush(frontier, (0 + h(rubik_cube.state, rubik_cube.goal_state), rubik_cube.state))
    
    discovered = {Cube.hash_state(rubik_cube.state): (None, 0)}
    
    num_states_discovered = 0
    solution_path_length = 0
    
    def astar_search():
        nonlocal num_states_discovered, solution_path_length

        while frontier:
            current_cost, current_node = heappop(frontier)
            rubik_cube.state = current_node
            if is_final(rubik_cube):
                solution_path_length = discovered[Cube.hash_state(current_node)][1]
                break

            for neighbour_state in get_neighbours(rubik_cube):
                cost_to_neighbour = discovered[Cube.hash_state(current_node)][1] + 1

                if (Cube.hash_state(neighbour_state) not in discovered 
                        or cost_to_neighbour < discovered[Cube.hash_state(neighbour_state)][1]):
                    discovered[Cube.hash_state(neighbour_state)] = (current_node, cost_to_neighbour)
                    heappush(frontier, (cost_to_neighbour + h_pattern_databse(neighbour_state, rubik_cube.goal_state, pattern_database, h), tuple(neighbour_state)))
                    num_states_discovered += 1

    time = timeit.timeit(astar_search, number=1)
                
    return num_states_discovered, solution_path_length, time

In [53]:
cube = Cube("R U' R' F' U")
# cube = Cube("F' R U R U F' U'")
# cube = Cube("F U U F' U' R R F' R")
# cube = Cube("U' R U' F' R F F U' F U U")

pattern_database, time_pattern_database = build_pattern_database_dict(cube.clone())

num_states_discovered, solution_path_length, time = astar_pattern_database(
    cube, pattern_database, pattern_database_heuristic, distance_to_all_correct)

print(f"Num states discovered: {num_states_discovered}")
print(f"Solution path length: {solution_path_length}")
print(f"Time taken: {time} seconds")
print(f"Time taken for build pattern_database: {time_pattern_database}")

Num states discovered: 26
Solution path length: 5
Time taken: 0.0034201000235043466 seconds
Time taken for build pattern_database: 7.843828300014138


In [54]:
# Am modificat parametrii agoritmului MCTS pentru a putea folosi noua euristica
def mcts_pattern_database(initial_cube, budget, pattern_database, h_pattern_databse, h, c, max_reward):
    tree = init_node(Cube.hash_state(initial_cube.state))
    
    num_states_discovered = 0
    solution_discovered = False

    def mcts_search():
        nonlocal num_states_discovered, solution_discovered

        for _ in range(budget):
            rubik_cube = initial_cube.clone()
            node = tree
    
            while (not is_final(rubik_cube) 
                   and all(Cube.hash_state(action) in node[ACTIONS] for action in get_neighbours(rubik_cube))):
                new_action_hash = select_action(node, c)
                new_action = Cube.inverse_hash_state(new_action_hash)
                rubik_cube.state = new_action
                node = node[ACTIONS][new_action_hash]
    
            if not is_final(rubik_cube):
                new_action = choice(list(filter(lambda a: Cube.hash_state(a) not in node[ACTIONS], get_neighbours(rubik_cube))))
                num_states_discovered += 1
                rubik_cube.state = new_action
                node = init_node(node)
                node[PARENT][ACTIONS][Cube.hash_state(new_action)] = node
                
            limit = 14
            while not is_final(rubik_cube) and limit >= 0:
                rubik_cube.state = choice(get_neighbours(rubik_cube))
                num_states_discovered += 1
                limit -= 1
            
            if is_final(rubik_cube):
                solution_discovered = True
                
            reward = max_reward - h_pattern_databse(rubik_cube.state, rubik_cube.goal_state, pattern_database, h)
    
            crt_node = node
            while isinstance(crt_node, dict):
                crt_node[N] += 1
                crt_node[Q] += reward
                crt_node = crt_node[PARENT]
                
    time = timeit.timeit(mcts_search, number=1)

    final_action = select_action(tree, 0.0)
    return solution_discovered, num_states_discovered, time, tree[ACTIONS][final_action]

In [57]:
cube = Cube("R U' R' F' U")
# cube = Cube("F' R U R U F' U'")
# cube = Cube("F U U F' U' R R F' R")
# cube = Cube("U' R U' F' R F F U' F U U")

total_success_tests = 0
total_time_for_success = 0
total_num_states_for_success = 0
total_solution_path_length_for_success = 0
pattern_database, time_pattern_database = build_pattern_database_dict(cube.clone())

for i in range(20):
    solution_discovered, num_states_discovered, time, tree = mcts_pattern_database(
        cube, budget=5000, pattern_database=pattern_database, h_pattern_databse=pattern_database_heuristic,
        h=distance_to_all_correct, c=0.5, max_reward=7)
    
    if solution_discovered:
        total_success_tests += 1
        total_num_states_for_success += num_states_discovered
        total_solution_path_length_for_success += get_solution_path_length(tree)
        total_time_for_success += time

print("Total success tests: ", total_success_tests)
if total_success_tests != 0:
    print("Avg num_states_for_success: ", total_num_states_for_success / total_success_tests)
    print("Avg solution_path_length_for_success: ", total_solution_path_length_for_success / total_success_tests)
    print("Avg time_for_success: ", total_time_for_success / total_success_tests)
    
print(f"Time taken for build pattern_database: {time_pattern_database}")

Total success tests:  3
Avg num_states_for_success:  79993.0
Avg solution_path_length_for_success:  10.0
Avg time_for_success:  10.858679833298083
Time taken for build pattern_database: 8.378113099955954
