In [41]:
# Imports
import random
import time
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp

from collections import deque
from scipy.special import softmax
from tensorflow.keras import layers, models, initializers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense


# **Setting Up**
Setting up includes the set up of the environment, the search algorithm and creating the neural networks.

## The 15-Puzzle Environment.

In [42]:
class Puzzle15:
    def __init__(self):
        self.goal_state = np.arange(1, 17).reshape(4, 4)
        self.goal_state[3, 3] = 0

    def valid_moves(self, state):
        blank_pos = np.argwhere(state == 0)[0]
        moves = []
        if blank_pos[0] > 0: moves.append('up')
        if blank_pos[0] < 3: moves.append('down')
        if blank_pos[1] > 0: moves.append('left')
        if blank_pos[1] < 3: moves.append('right')
        return moves

    def move(self, state, direction):
        blank_pos = np.argwhere(state == 0)[0]
        new_state = state.copy()
        if direction == 'up':
            target = (blank_pos[0] - 1, blank_pos[1])
        elif direction == 'down':
            target = (blank_pos[0] + 1, blank_pos[1])
        elif direction == 'left':
            target = (blank_pos[0], blank_pos[1] - 1)
        elif direction == 'right':
            target = (blank_pos[0], blank_pos[1] + 1)
        new_state[blank_pos[0], blank_pos[1]], new_state[target[0], target[1]] = new_state[target[0], target[1]], new_state[blank_pos[0], blank_pos[1]]
        return new_state

    def state_to_features(self, state):
        return state.flatten()

    def reverse_neighbors(self, state):
        neighbors = []
        for move in self.valid_moves(state):
            neighbors.append(self.move(state, move))
        return neighbors

    def cost_to_goal(self, state):
        return np.sum(state != self.goal_state) - 1


# The IDA* algorithm.

In [43]:
# IDA* Algorithm

class IDAStar:
    def __init__(self, heuristic):
        self.heuristic = heuristic
        self.env = Puzzle15()

    def search(self, task, t_max):
        start_state = task['start']
        threshold = self.heuristic(task['start'])
        path = [task['start']]
        while True:
            print(f"Starting search with threshold: {threshold}")
            temp, plan = self.search_recursive(path, 0, threshold, t_max)
            if temp == "FOUND":
                return plan
            if temp == float('inf'):
                return None
            threshold = temp

    def search_recursive(self, path, g, threshold, t_max):
        node = path[-1]
        f = g + self.heuristic(node)
        if f > threshold:
            return f, None
        if np.array_equal(node, self.env.goal_state):
            return "FOUND", path
        if g > t_max:
            return float('inf'), path
        min_threshold = float('inf')
        for move in self.env.valid_moves(node):
            neighbor = self.env.move(node, move)
            if not any((np.array_equal(neighbor, existing) for existing in path)):
                path.append(neighbor)
                temp, plan = self.search_recursive(path, g + 1, threshold, t_max)
                if temp == "FOUND":
                    return "FOUND", plan
                if temp < min_threshold:
                    min_threshold = temp
                path.pop()
        return min_threshold, None

# Creating the Neural Networks.

We create two neural networks: A Feedforward Neural Network; And a Weight Uncertainty Neural Network, otherwise known as a Bayesian Neural Network.

In [44]:

def create_nnFFNN():
    input_layer = Input(shape=(16,))
    x = Dense(128, activation='relu', kernel_initializer='he_normal')(input_layer)
    x = Dense(128, activation='relu', kernel_initializer='he_normal')(x)
    y_hat = Dense(1, name='y_hat')(x)
    log_sigma2_a = Dense(1, name='log_sigma2_a')(x)
    model = Model(inputs=input_layer, outputs=[y_hat, log_sigma2_a])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
    return model

def create_nnWUNN():
    input_layer = Input(shape=(16,))
    x = Dense(128, activation='relu', kernel_initializer='he_normal')(input_layer)
    x = Dense(128, activation='relu', kernel_initializer='he_normal')(x)
    mean = Dense(1, name='mean')(x)
    log_var = Dense(1, name='log_var')(x)
    model = Model(inputs=input_layer, outputs=[mean, log_var])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss='mse')
    return model

# **Defining the algorithms.**

We integrate information from the set up of the environment, neural networks and search algorithm (above) to create the GenerateTaskPrac and LearnHeuristicPrac algorithms.

# GenerateTaskPrac

In [45]:

def GenerateTaskPrac(nnWUNN, epsilon, max_steps, K):
    env = Puzzle15()
    start_state = env.goal_state.copy()

    def select_high_uncertainty_state(states):
        features = np.array([env.state_to_features(state) for state in states])
        mean, log_var = nnWUNN.predict(features)
        var = np.exp(log_var)
        uncertainties = var.mean(axis=1)
        return states[np.argmax(uncertainties)]

    task_state = start_state

    for k in range(K):
        task_state = start_state.copy()
        for step in range(max_steps):
            if np.random.rand() < epsilon:
                valid_moves = env.valid_moves(task_state)
                move = valid_moves[np.random.choice(len(valid_moves))]
                task_state = env.move(task_state, move)
            else:
                neighbors = env.reverse_neighbors(task_state)
                task_state = select_high_uncertainty_state(neighbors)
        print(f"Task generated at iteration {k}: {task_state}")

    if env.cost_to_goal(task_state) > 0:
        print(f"Valid task state generated: {task_state}")
        return task_state
    else:
        print("No valid task state generated.")
        return None

nnFFNN = create_nnFFNN()
nnWUNN = create_nnWUNN()
# Assuming nnWUNN is already created and defined somewhere before
epsilon = 0.2
max_steps = 50
K = 5

# Generate tasks and print them
for _ in range(K):
    generated_task = GenerateTaskPrac(nnWUNN, epsilon, max_steps, K)
    print(f"Generated task: {generated_task}")

Task generated at iteration 0: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0  9 10 15]
 [13 14 12 11]]
Task generated at iteration 1: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0 10 11 12]
 [ 9 13 14 15]]
Task generated at iteration 2: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0 10 11 12]
 [ 9 13 14 15]]
Task generated at iteration 3: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0  9 11 15]
 [13 10 12 14]]
Task generated at iteration 4: [[ 1  3  4  8]
 [ 5  2  7 12]
 [10  6 11 15]
 [ 9 13 14  0]]
Valid task state generated: [[ 1  3  4  8]
 [ 5  2  7 12]
 [10  6 11 15]
 [ 9 13 14  0]]
Generated task: [[ 1  3  4  8]
 [ 5  2  7 12]
 [10  6 11 15]
 [ 9 13 14  0]]
Task generated at iteration 0: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0 10 11 12]
 [ 9 13 14 15]]
Task generated at iteration 1: [[ 1  2  3  4]
 [ 6  9  7  8]
 [ 0  5 10 12]
 [13 14 11 15]]
Task generated at iteration 2: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0  9 10 12]
 [13 14 11 15]]
Task generated at iteration 3: [[ 1  2  3  4]
 [ 5  6  7  8]
 [ 0 10 11 12]
 [ 9 13 14 15]]
Task g

# LearnHeuristicPrac
The parameters were defined at the end of the cell. Their definition is not directly part of the algorithm. I did that so the code can produce output directly after running the cell.

In [None]:

def state_to_features(state):
    return state.flatten()

def heuristic_function(nnFFNN, nnWUNN, state, beta, yq):
    features = state_to_features(state)
    y_hat, log_sigma2_a = nnFFNN.predict(np.array([features]))
    y_hat = y_hat[0]
    log_sigma2_a = log_sigma2_a[0]
    sigma2_a = np.exp(log_sigma2_a)
    if y_hat < yq:
        sigma2_t = sigma2_a
    else:
        sigma2_t = epsilon
    h_value = max(y_hat - beta * sigma2_t, 0)
    return h_value

def solve_task_with_IDA_star(start_state, alpha, yq, nnFFNN, nnWUNN, t_max):
    heuristic = lambda state: heuristic_function(nnFFNN, nnWUNN, state, alpha, yq)
    ida_star = IDAStar(heuristic)
    task = {'start': start_state}
    plan = ida_star.search(task, t_max)
    return plan

def LearnHeuristicPrac(params):
    nnFFNN = create_nnFFNN()
    nnWUNN = create_nnWUNN()
    memory_buffer = deque(maxlen=params['MemoryBufferMaxRecords'])
    yq = -np.inf
    alpha = params['alpha0']
    beta = params['beta0']
    update_beta = True
    for n in range(params['NumIter']):
        num_solved = 0
        for i in range(params['NumTasksPerIter']):
            T = GenerateTaskPrac(nnWUNN, params['epsilon'], params['MaxSteps'], params['K'])
            if T is None:
                continue
            plan = solve_task_with_IDA_star(T, alpha, yq, nnFFNN, nnWUNN, params['t_max'])
            if plan is not None:
                num_solved += 1
                for sj in plan:
                    if sj != env.goal_state:
                        yj = env.cost_to_goal(sj)
                        xj = state_to_features(sj)
                        memory_buffer.append((xj, yj))
        memory_buffer = deque(list(memory_buffer)[-params['MemoryBufferMaxRecords']:])
        if num_solved < params['NumTasksPerIterThresh']:
            beta = max(beta - params['alpha'], 0.5)
            update_beta = False
        else:
            update_beta = True
        train_nnFFNN(nnFFNN, *zip(*memory_buffer), epochs=params['TrainIter'])
        train_nnWUNN(nnWUNN, *zip(*memory_buffer), epochs=params['MaxTrainIter'], batch_size=params['MiniBatchSize'])
        yq = np.percentile([yj for _, yj in memory_buffer], params['q'])
    return nnFFNN, nnWUNN


def train_nnFFNN(model, data, labels, epochs=1000):
    model.fit(data, [labels, labels], epochs=epochs, batch_size=32, verbose=1)

def train_nnWUNN(model, data, labels, epochs=5000, batch_size=100):
    for epoch in range(epochs):
        indices = np.random.choice(len(data), batch_size)
        x_batch = data[indices]
        y_batch = labels[indices]
        model.train_on_batch(x_batch, [y_batch, y_batch])

# Defining parameters
params = {
    'NumIter': 50,
    'NumTasksPerIter': 10,
    'NumTasksPerIterThresh': 6,
    'alpha0': 0.99,
    'beta0': 0.05,
    'epsilon': 1.0,
    'MaxSteps': 1000,
    'MemoryBufferMaxRecords': 25000,
    'TrainIter': 1000,
    'MaxTrainIter': 5000,
    'MiniBatchSize': 100,
    't_max': 60,
    'mu0': 0,
    'sigma0': 10,
    'q': 0.95,
    'K': 100
}

nnFFNN, nnWUNN = LearnHeuristicPrac(params)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Starting search with threshold: [30.21773]
Starting search with threshold: [30.230717]
Starting search with threshold: [30.24262]
Starting search with threshold: [30.246859]
