Import Packages

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
import random
from queue import PriorityQueue
from time import perf_counter_ns, perf_counter

Check for GPUs

In [2]:
# Check for GPU
if tf.config.list_physical_devices('GPU'):
    print("GPU is available.")
else:
    print("GPU is not available, using CPU.")

GPU is not available, using CPU.


Defining the 15 Puzzle Environment

In [44]:

NANO_TO_SEC = 1_000_000_000
INF = 100_000

class Puzzle15:
    def __init__(self):
        self.goal_state = tuple(range(1, 16)) + (0,)
        self.goal = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 0, 15]])
        self.DIRECTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # Up, Down, Left, Right


    def is_solved(self, state):
        return state == self.goal_state

    def get_possible_moves(self, state):
        zero_pos = np.where(state == 0)[0]
        if len(zero_pos) == 1:  # Check if only one element exists
            row, col = zero_pos[0], 0  # Assume column is 0 (should only be one zero)
        else:
            row, col = zero_pos[0], zero_pos[1]
        moves = []

        if row > 0: moves.append(-4)  # Up
        if row < 3: moves.append(4)   # Down
        if col > 0: moves.append(-1)  # Left
        if col < 3: moves.append(1)   # Right

        return moves

    def apply_move(self, state, move):
        zero_idx = state.index(0)
        new_idx = zero_idx + move
        new_state = list(state)
        new_state[zero_idx], new_state[new_idx] = new_state[new_idx], new_state[zero_idx]
        return tuple(new_state)

    def get_cost_to_goal(self, state):
        # Manhattan distance heuristic
        distance = 0
        for i, tile in enumerate(state):
            if tile == 0:
                continue
            goal_pos = tile - 1
            current_row, current_col = divmod(i, 4)
            goal_row, goal_col = divmod(goal_pos, 4)
            distance += abs(current_row - goal_row) + abs(current_col - goal_col)
        return distance

    def encode_state(self, state):
        return np.array(state)

    def checkWin(self, state):
        return self.is_solved(state)

    def simulateMove(self, state, direction):
        zero_idx = state.index(0)
        new_idx = zero_idx + direction[0] * 4 + direction[1]
        if new_idx < 0 or new_idx >= 16 or (direction == (-1, 0) and zero_idx // 4 == 0) or (direction == (1, 0) and zero_idx // 4 == 3) or (direction == (0, -1) and zero_idx % 4 == 0) or (direction == (0, 1) and zero_idx % 4 == 3):
            return False, None
        new_state = list(state)
        new_state[zero_idx], new_state[new_idx] = new_state[new_idx], new_state[zero_idx]
        return True, tuple(new_state)

    def hash(self, state, group):
        return tuple(sorted([state[i] for i in group]))

puzzle = Puzzle15()



GenerateTaskPrac Algorithm

In [45]:
def generate_task_prac(puzzle, num_tasks_per_iter, length_inc):
    tasks = []
    for inc in length_inc:
        for _ in range(num_tasks_per_iter):
            state = puzzle.goal.copy()
            for _ in range(inc):
                moves = puzzle.get_possible_moves(state)
            tasks.append(state)
    return tasks

'''initial_state = np.array([[1, 2, 3, 4], 
                          [5, 6, 7, 8], 
                          [9, 10, 11, 12], 
                          [13, 14, 0, 15]])'''
#puzzle = Puzzle15(initial_state)
puzzle = Puzzle15()
tasks_prac = generate_task_prac(puzzle, num_tasks_per_iter=10, length_inc=[1, 2, 4, 6, 8, 10])

print("Generated tasks using GenerateTaskPrac:", len(tasks_prac))

Generated tasks using GenerateTaskPrac: 60


IDA* Adapted

In [26]:
def ida_star_adapted(puzzle, initial_state):
    if puzzle.checkWin(initial_state):
        return []
    
    t1 = perf_counter_ns()
    bound = hScore_adapted(puzzle, initial_state)
    path = [initial_state]
    dirs = []
    while True:
        rem = search_adapted(puzzle, path, 0, bound, dirs)
        if rem == True:
            tDelta = (perf_counter_ns() - t1) / NANO_TO_SEC
            print(f"Took {tDelta} seconds to find a solution of {len(dirs)} moves")
            return dirs
        elif rem == INF:
            return None
        bound = rem

def search_adapted(puzzle, path, g, bound, dirs):
    cur = path[-1]
    f = g + hScore_adapted(puzzle, cur)

    if f > bound:
        return f

    if puzzle.checkWin(cur):
        return True
    min = INF

    for dir in puzzle.DIRECTIONS:
        if dirs and (-dir[0], -dir[1]) == dirs[-1]:
            continue
        validMove, simPuzzle = puzzle.simulateMove(cur, dir)

        if not validMove or simPuzzle in path:
            continue

        path.append(simPuzzle)
        dirs.append(dir)

        t = search_adapted(puzzle, path, g + 1, bound, dirs)
        if t == True:
            return True
        if t < min:
            min = t

        path.pop()
        dirs.pop()

    return min

def hScore_adapted(puzzle, state):
    h = 0
    for i in range(4):
        for j in range(4):
            if state[i * 4 + j] != 0:
                destPos = ((state[i * 4 + j] - 1) // 4, (state[i * 4 + j] - 1) % 4)
                h += abs(destPos[0] - i) + abs(destPos[1] - j)
    return h

# Generate a random task
task = puzzle.generate_task()
print(f"Generated task: {task}")

# Test the adapted IDA* implementation with Puzzle15 class
start_time = perf_counter()
solution_adapted = ida_star_adapted(puzzle, task)
end_time = perf_counter()
print(f"Adapted IDA* solution: {solution_adapted}")
print(f"Adapted IDA* time: {end_time - start_time} seconds")


Generated task: (9, 6, 3, 4, 12, 13, 7, 8, 11, 10, 14, 15, 0, 1, 2, 5)
Took 177.6869758 seconds to find a solution of 47 moves
Adapted IDA* solution: [(-1, 0), (-1, 0), (0, 1), (1, 0), (1, 0), (0, 1), (0, 1), (-1, 0), (-1, 0), (0, -1), (0, -1), (1, 0), (0, -1), (-1, 0), (-1, 0), (0, 1), (1, 0), (1, 0), (1, 0), (0, -1), (-1, 0), (0, 1), (0, 1), (1, 0), (0, -1), (-1, 0), (0, 1), (-1, 0), (0, -1), (0, -1), (1, 0), (0, 1), (-1, 0), (0, -1), (-1, 0), (0, 1), (1, 0), (0, -1), (1, 0), (0, 1), (0, 1), (-1, 0), (0, 1), (1, 0), (0, -1), (1, 0), (0, 1)]
Adapted IDA* time: 177.68737340000007 seconds


FFNN and WUNN Neural Network Training Functions

In [29]:
def create_ffnn(input_dim=16, hidden_dim=20, dropout_rate=0.25):
    model = Sequential([
        Dense(hidden_dim, activation='relu', input_shape=(input_dim,)),
        Dropout(dropout_rate),
        Dense(1)  # Single output neuron
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
    return model

def create_wunn(input_dim=16, hidden_dim=20):
    model = Sequential([
        Dense(hidden_dim, activation='relu', input_shape=(input_dim,)),
        Dense(2)  # Two outputs: mean and log variance
    ])
    model.compile(optimizer=Adam(learning_rate=0.000001), loss='mse')
    return model

def train_nn(model, memory_buffer, iterations, batch_size=None, kappa=None, update_beta=False, gamma=None, beta=None):
    x_train, y_train = zip(*memory_buffer)
    x_train = np.array(x_train)
    y_train = np.array(y_train)

    model.fit(x_train, y_train, epochs=iterations, batch_size=batch_size, verbose=0)


In [30]:
def ida_star_adapted(puzzle, initial_state, model):
    heuristic_func = lambda state: model.predict(np.array([puzzle.encode_state(state)]))[0][0]
    
    if puzzle.checkWin(initial_state):
        return []
    
    t1 = perf_counter_ns()
    bound = heuristic_func(initial_state)
    path = [initial_state]
    dirs = []
    while True:
        rem = search_adapted(puzzle, path, 0, bound, dirs, heuristic_func)
        if rem == True:
            tDelta = (perf_counter_ns() - t1) / NANO_TO_SEC
            print(f"Took {tDelta} seconds to find a solution of {len(dirs)} moves")
            return dirs
        elif rem == INF:
            return None
        bound = rem

def search_adapted(puzzle, path, g, bound, dirs, heuristic_func):
    cur = path[-1]
    f = g + heuristic_func(cur)

    if f > bound:
        return f

    if puzzle.checkWin(cur):
        return True
    min = INF

    for dir in puzzle.DIRECTIONS:
        if dirs and (-dir[0], -dir[1]) == dirs[-1]:
            continue
        validMove, simPuzzle = puzzle.simulateMove(cur, dir)

        if not validMove or simPuzzle in path:
            continue

        path.append(simPuzzle)
        dirs.append(dir)

        t = search_adapted(puzzle, path, g + 1, bound, dirs, heuristic_func)
        if t == True:
            return True
        if t < min:
            min = t

        path.pop()
        dirs.pop()

    return min


In [31]:
def solve_task_with_ida_star(task, model, alpha, tmax, max_steps, K):
    puzzle = Puzzle15()
    heuristic_func = lambda state: model.predict(np.array([puzzle.encode_state(state)]))[0][0]
    return ida_star_adapted(puzzle, task, model)

LearnHeauristicPrac Algorithm

In [32]:
def learn_heuristic_prac(num_iter, num_tasks_per_iter, num_tasks_per_iter_thresh, alpha0, delta, epsilon, beta0, gamma, kappa, max_steps, memory_buffer_max_records, train_iter, max_train_iter, minibatch_size, tmax, mu0, sigma0_2, q, K):
    puzzle = Puzzle15()
    ffnn = create_ffnn()
    wunn = create_wunn()

    memory_buffer = []
    yq = -float('inf')
    alpha = alpha0
    beta = beta0
    update_beta = True

    for n in range(num_iter):
        num_solved = 0
        for i in range(num_tasks_per_iter):
            task = puzzle.generate_task()
            solved, plan = solve_task_with_ida_star(task, ffnn, alpha, tmax, max_steps, K)
            if solved:
                num_solved += 1
                for state in plan:
                    if puzzle.checkWin(state):
                        cost_to_goal = puzzle.get_cost_to_goal(state)
                        memory_buffer.append((puzzle.encode_state(state), cost_to_goal))

        if len(memory_buffer) > memory_buffer_max_records:
            memory_buffer = memory_buffer[-memory_buffer_max_records:]

        if num_solved < num_tasks_per_iter_thresh:
            alpha = max(alpha - delta, 0.5)
            update_beta = False
        else:
            update_beta = True

        train_nn(ffnn, memory_buffer, train_iter)
        train_nn(wunn, memory_buffer, max_train_iter, batch_size=minibatch_size, kappa=kappa, update_beta=update_beta, gamma=gamma, beta=beta)
        yq = np.quantile([y for _, y in memory_buffer], q)

    return ffnn, wunn



Neural network training algorithm adapted.


Training and Testing

In [None]:
# Define the parameters
params = {
    'num_iter': 10,
    'num_tasks_per_iter': 10,
    'num_tasks_per_iter_thresh': 6,
    'alpha0': 0.99,
    'delta': 0.05,
    'epsilon': 0.005,
    'beta0': 0.005,
    'gamma': 0.64,
    'kappa': 0,
    'max_steps': 1000,
    'memory_buffer_max_records': 1000,
    'train_iter': 100,
    'max_train_iter': 100,
    'minibatch_size': 10,
    'tmax': 60,
    'mu0': 0,
    'sigma0_2': 10,
    'q': 0.95,
    'K': 100
}

# Train the heuristic
ffnn, wunn = learn_heuristic_prac(**params)

# Evaluate on test set
test_tasks = [Puzzle15().generate_task() for _ in range(10)]
results = []
for task in test_tasks:
    solved, plan = solve_task_with_ida_star(task, ffnn, params['alpha0'], params['tmax'], params['max_steps'], params['K'])
    results.append((solved, plan))

# Analyze results
def analyze_results(results):
    generated_nodes = []
    planning_times = []
    suboptimalities = []
    optimal_solved = 0

    for solved, plan in results:
        if solved:
            num_steps = len(plan)
            generated_nodes.append(num_steps)
            planning_times.append(num_steps)  # Simplification: use plan length as proxy for time
            suboptimalities.append((num_steps - 1) / 53.05)  # Assuming 53.05 is the average optimal cost-to-goal
            if num_steps == 53.05:
                optimal_solved += 1

    print(f"Generated Nodes: {np.mean(generated_nodes)}")
    print(f"Planning Time: {np.mean(planning_times)}")
    print(f"Suboptimality: {np.mean(suboptimalities) * 100:.2f}%")
    print(f"Optimal Solved: {optimal_solved / len(results) * 100:.2f}%")

analyze_results(results)


