In [None]:
import torch
import numpy as np
import random
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import time

# --- Puzzle Environment Functions ---
def encode_15puzzle_state(state):
    encoded = np.zeros(16 * 2 * 4)
    for tile in range(16):
        idx = state.index(tile)
        row, col = divmod(idx, 4)
        encoded[tile * 8 + row] = 1
        encoded[tile * 8 + 4 + col] = 1
    return encoded

def get_valid_moves(state):
    zero_index = state.index(0)
    row, col = divmod(zero_index, 4)
    valid_moves = []
    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    for dr, dc in directions:
        new_row, new_col = row + dr, col + dc
        if 0 <= new_row < 4 and 0 <= new_col < 4:
            new_zero_index = new_row * 4 + new_col
            new_state = state[:]
            new_state[zero_index], new_state[new_zero_index] = new_state[new_zero_index], new_state[zero_index]
            valid_moves.append(new_state)
    return valid_moves

# --- Bayesian Neural Network ---
class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features, prior_std=2.0):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        # Weight and bias parameters
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).normal_(0, 0.1))
        self.weight_logvar = nn.Parameter(torch.Tensor(out_features, in_features).fill_(-2))

        self.bias_mu = nn.Parameter(torch.Tensor(out_features).normal_(0, 0.1))
        self.bias_logvar = nn.Parameter(torch.Tensor(out_features).fill_(-2))

    def forward(self, x):
        # Local reparameterization trick:
        # Compute mean and variance of output activation
        weight_var = torch.exp(self.weight_logvar)
        bias_var = torch.exp(self.bias_logvar)

        # Mean and variance of linear output
        mu_out = F.linear(x, self.weight_mu, self.bias_mu)
        var_out = F.linear(x.pow(2), weight_var, bias_var)

        # Sample from activation distribution
        eps = torch.randn_like(mu_out)
        out = mu_out + torch.sqrt(var_out + 1e-8) * eps  # add small epsilon for numerical stability
        return out

    def kl_divergence(self):
        # KL divergence between variational posterior and prior
        kl = 0.5 * (self.weight_mu.pow(2) + torch.exp(self.weight_logvar) - self.weight_logvar - 1).sum()
        kl += 0.5 * (self.bias_mu.pow(2) + torch.exp(self.bias_logvar) - self.bias_logvar - 1).sum()
        return kl


class WUNN(nn.Module):
    def __init__(self, input_dim, hidden_dim=20, S=5, C=1, prior_std=1.0):
        super().__init__()
        self.S = S
        self.C = C
        self.fc1 = BayesianLinear(input_dim, hidden_dim, prior_std)
        self.fc2 = BayesianLinear(hidden_dim, 1, prior_std)
        self.optimizer = optim.Adam(self.parameters(), lr=0.01)

    def forward_single(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

    def predict_sigma_e(self, x, K):
        self.eval()
        x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            outputs = [self.forward_single(x_tensor).item() for _ in range(K)]
        return np.var(outputs)

    def elbo_loss(self, x, y, beta=0.05):
        x_tensor = torch.tensor(x, dtype=torch.float32).unsqueeze(0)
        y_tensor = torch.tensor([y], dtype=torch.float32)
        preds = torch.stack([self.forward_single(x_tensor) for _ in range(self.S)])
        log_likelihood = -F.mse_loss(preds.mean(), y_tensor)
        kl_div = self.fc1.kl_divergence() + self.fc2.kl_divergence()
        return beta * kl_div + (-log_likelihood)

    def sample_weighted_batch(self, memory_buffer, batch_size, kappa_epsilon, K):
        weights = []
        for x, _ in memory_buffer:
            sigma2 = self.predict_sigma_e(x, K)
            weights.append(sigma2 + 1e-6)  # More numerically stable
        weights = np.array(weights)
        weights /= weights.sum()
        indices = np.random.choice(len(memory_buffer), size=min(batch_size, len(memory_buffer)),
                                   replace=False, p=weights)
        return [memory_buffer[i] for i in indices]

    def train_model(self, memory_buffer, max_iter=500, batch_size=10, kappa_epsilon=0.1, K=5):
        self.train()
        for iteration in range(max_iter):
            batch = self.sample_weighted_batch(memory_buffer, batch_size, kappa_epsilon, K)
            for x, y in batch:
                loss = self.elbo_loss(x, y)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
            if iteration % 100 == 0:
                print(f"WUNN - Iteration [{iteration}/{max_iter}], Loss: {loss.item():.4f}")
            if all(self.predict_sigma_e(x, K=5) < kappa_epsilon for x, _ in memory_buffer):
                print("Convergence reached. Stopping training.")
                break

    
# --- Task Generator ---
def GenerateTaskPrac(nnWUNN, epsilon, MaxSteps, K, Erev, feature, sg):
    s_prime = sg
    numSteps = 0
    s_prev = None
    while numSteps < MaxSteps:
        numSteps += 1
        states = {}

        for s in Erev(s_prime):
            if s_prev is not None and s == s_prev:
                continue
            x = feature(s)
            sigma2_e = nnWUNN.predict_sigma_e(x, K)
            states[tuple(s)] = sigma2_e
            print(f"Step {numSteps}: Checking state {s} - Epistemic uncertainty (σ²_e) = {sigma2_e:.4f}, Epsilon (ϵ) = {epsilon}")

        if not states:
            return None

        keys = list(states.keys())
        values = np.array([states[k] for k in keys])
        probs = np.exp(values) / np.sum(np.exp(values))
        s = list(keys[np.random.choice(len(keys), p=probs)])

        if states[tuple(s)] >= epsilon:
            T = {
                "S": s,
                "E": states[tuple(s)],
                "s": s,
                "sg": sg
            }

            # The value of O and C?

            print(f"Task generated with uncertainty σ²_e = {states[tuple(s)]:.4f} ≥ ϵ = {epsilon}")
            return T

        s_prev = s_prime
        s_prime = s

    return None

# --- Example Execution ---
goal_state = list(range(16))
feature = lambda s: np.array(encode_15puzzle_state(s))
Erev = get_valid_moves
input_dim = len(feature(goal_state))
nnWUNN = WUNN(input_dim)

# Scrambling utility
def scramble(state, steps=10):
    s = state[:]
    for _ in range(steps):
        s = random.choice(get_valid_moves(s))
    return s

# Value function (distance to goal)
def manhattan_distance(state, goal_state):
    total = 0
    for i in range(1, 16):
        curr_idx = state.index(i)
        goal_idx = goal_state.index(i)
        curr_row, curr_col = divmod(curr_idx, 4)
        goal_row, goal_col = divmod(goal_idx, 4)
        total += abs(curr_row - goal_row) + abs(curr_col - goal_col)
    return total

# Create training data
memory_buffer = [
    (feature(scramble(goal_state, steps=np.random.randint(1, 20))), manhattan_distance(scramble(goal_state, 1), goal_state))
    for _ in range(200)
]

# Train WUNN
nnWUNN.train_model(memory_buffer)

# Generate task
task = GenerateTaskPrac(nnWUNN, epsilon=1, MaxSteps=1000, K=100, Erev=Erev, feature=feature, sg=goal_state)
print("Generated Task:", task)


  log_likelihood = -F.mse_loss(preds.mean(), y_tensor)


WUNN - Iteration [0/500], Loss: 80.9411
WUNN - Iteration [100/500], Loss: 2.7428
WUNN - Iteration [200/500], Loss: 1.9361
WUNN - Iteration [300/500], Loss: 1.8385
WUNN - Iteration [400/500], Loss: 1.8033
Step 1: Checking state [4, 1, 2, 3, 0, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] - Epistemic uncertainty (σ²_e) = 2.4211, Epsilon (ϵ) = 1
Step 1: Checking state [1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] - Epistemic uncertainty (σ²_e) = 1.6194, Epsilon (ϵ) = 1
Task generated with uncertainty σ²_e = 2.4211 ≥ ϵ = 1
Generated Task: {'S': [4, 1, 2, 3, 0, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 'E': np.float64(2.421100339200478), 's': [4, 1, 2, 3, 0, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 'sg': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]}


In [None]:
# --- FFNN ---

class FFNN(nn.Module):
    def __init__(self, input_dim, hidden_dim=20, dropout_rate=0.025):
        super(FFNN, self).__init__()
        
        # Define the layers of the FFNN
        self.fc1 = nn.Linear(input_dim, hidden_dim)  # First hidden layer
        self.fc2 = nn.Linear(hidden_dim, 1)  # Output layer (predicts the mean y)
        self.fc2_logvar = nn.Linear(hidden_dim, 1)  # Output layer for aleatoric uncertainty (log variance)
        
        # Dropout layer for aleatoric uncertainty estimation
        self.dropout = nn.Dropout(dropout_rate)
        
        # He Normal Initialization for the layers
        nn.init.kaiming_normal_(self.fc1.weight, mode='fan_in', nonlinearity='relu')
        nn.init.kaiming_normal_(self.fc2.weight, mode='fan_in', nonlinearity='linear')
        nn.init.kaiming_normal_(self.fc2_logvar.weight, mode='fan_in', nonlinearity='linear')

        # Initialize biases to 0
        nn.init.zeros_(self.fc1.bias)
        nn.init.zeros_(self.fc2.bias)
        nn.init.zeros_(self.fc2_logvar.bias)

    def forward(self, x):
        """
        Forward pass through the network.
        - Apply first hidden layer with ReLU activation
        - Apply dropout for aleatoric uncertainty estimation
        - Apply second hidden layer (for cost-to-goal prediction and uncertainty estimation)
        """
        x = F.relu(self.fc1(x))  # Apply ReLU activation to the first hidden layer
        
        # Apply dropout to introduce aleatoric uncertainty
        x = self.dropout(x)
        
        # Predict the cost-to-goal (mean value)
        mean_output = self.fc2(x)
        
        # Predict the log-variance (for aleatoric uncertainty)
        logvar_output = self.fc2_logvar(x)
        
        return mean_output, logvar_output

    def predict(self, x):
        """
        Predict the cost-to-goal (mean) and aleatoric uncertainty (log variance).
        """
        mean, logvar = self.forward(x)
        return mean, torch.exp(logvar)  # Return exp of log variance to get variance