<a href="https://colab.research.google.com/github/andrewgrace21/ap-research/blob/main/APResearchPublishCode.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from mpl_toolkits.mplot3d import Axes3D
from scipy.sparse.linalg import cg
import random, math, itertools
from google.colab import auth
import gspread
from google.auth import default

# Authenticate and set up Google Sheets API
auth.authenticate_user()
creds, _ = default()

# -------------------- Global Parameters --------------------
# Hyperparameter grid: [update_rule, step_size_initial, step_size_final, outer_step_size, polynomial_degree]
params = [['', 0, 0, 0, 0]]
sheet_names = ['']
drive_name = ''

TRIALS = 5
ROOM_SIZE = 5  # Each room is 5x5 cells
DEGREE = 0    # Polynomial degree for feature approximation
EPISODES = 10000
EVAL_INTERVAL = 100  # Evaluate policy every SAVE_EPISODE episodes (set to 100 as per methods)
SHOW_INTERVAL = 1000  # Interval for showing plots and printing policy
NUM_TEST = 1
BATCH_SIZE = ROOM_SIZE  # Batch size for state-goal pairs generation
FD_EPSILON = 1e-4
DIVERGENCE_THRESHOLD = 50000
DISCOUNT = 0.9
SIZE = ROOM_SIZE * 2 + 1  # Gridworld size

np.set_printoptions(threshold=np.inf)

# -------------------- Main RL Loop --------------------
def main(update_rule, lr_init, lr_final, outer_lr):
    epsilon = 1.0
    step_size = lr_init
    # Initialize weight tensor for polynomial basis functions (6 dimensions: pos(x,y), goal(x,y), distance(x,y))
    weights = np.full((DEGREE + 1,) * 6, 0.0)
    train_avg, test_avg, train_mse, test_mse = [], [], [], []
    diverged = False

    # Loop over episodes
    for episode in range(EPISODES):
        # Generate a batch of state-goal pairs from the gridworld environment
        state_goal_pairs = generate_state_goal_pairs(update_rule)
        td_adapted = weights.copy()
        gradients = [[], [], [], []]
        # Decay the outer learning rate over episodes
        current_outer_lr = outer_lr / (1 + EPISODES / 100)
        saved_features = ([], [])

        for idx, (state, goal) in enumerate(state_goal_pairs):
            dist = np.subtract(goal, state)
            # Select an action using epsilon-greedy policy
            angle = choose_action(state, goal, epsilon, weights)
            action = (round(math.cos(angle)), round(math.sin(angle)))
            new_state = move(state, action)
            new_dist = np.subtract(goal, new_state)

            # Calculate current and next features (polynomial basis)
            feat = compute_features(state, goal, dist)
            next_feat = compute_features(new_state, goal, new_dist)
            saved_features[0].append(feat)
            saved_features[1].append(next_feat)

            reward = get_reward(state, new_state, goal)
            old_val = np.sum(weights * feat)
            new_val = np.sum(weights * next_feat)

            # Inner update: update weights (or store gradient info) based on the update rule
            weights, td_adapted, gradients = inner_update(
                step_size, reward, new_val, old_val, feat, update_rule,
                td_adapted, weights, episode, idx, gradients
            )

        # Outer update (for Fish and IDGM)
        weights = outer_update(weights, current_outer_lr, td_adapted, update_rule, episode, state_goal_pairs, gradients, saved_features)

        # Policy evaluation and divergence check every EVAL_INTERVAL episodes
        if (episode + 1) % EVAL_INTERVAL == 0:
            train_avg, train_mse = evaluate_policy(epsilon, weights, train_avg, train_mse, episode, test=False)
            test_avg, test_mse = evaluate_policy(epsilon, weights, test_avg, test_mse, episode, test=True)
            if max(train_avg) > DIVERGENCE_THRESHOLD or max(test_avg) > DIVERGENCE_THRESHOLD:
                diverged = True
                break

        # Decay exploration and learning rate gradually
        epsilon -= epsilon / ((EPISODES // 2) - 1)
        step_size = lr_init / (1 + ((lr_init - lr_final) / (lr_final * EPISODES)) * episode)

    # Final evaluation after training (or if diverged)
    if not diverged:
        train_avg, train_mse = evaluate_policy(epsilon, weights, train_avg, train_mse, EPISODES, test=False)
        test_avg, test_mse = evaluate_policy(epsilon, weights, test_avg, test_mse, EPISODES, test=True)
    else:
        # If diverged, pad results to expected length
        num_evals = EPISODES // EVAL_INTERVAL + 1
        train_avg.extend([train_avg[-1]] * (num_evals - len(train_avg)))
        test_avg.extend([test_avg[-1]] * (num_evals - len(test_avg)))
        train_mse.extend([train_mse[-1]] * (num_evals - len(train_mse)))
        test_mse.extend([test_mse[-1]] * (num_evals - len(test_mse)))

    # Determine starting evaluation index based on last portion to display
    start_idx = int(min(max(np.floor(len(train_avg) * (1 - 0.2)) - 1, 0), len(train_avg)))
    return train_avg[start_idx:], test_avg[start_idx:], train_mse[start_idx:], test_mse[start_idx:], start_idx, update_rule, lr_init, outer_lr

# -------------------- Evaluation Function --------------------
def evaluate_policy(epsilon, weights, avg_store, mse_store, episode, test=True):
    total_steps = 0
    for _ in range(NUM_TEST):
        if test:
            state = (np.random.randint(ROOM_SIZE + 1, SIZE), np.random.randint(0, ROOM_SIZE))
            goal = (np.random.randint(0, ROOM_SIZE), np.random.randint(ROOM_SIZE + 1, SIZE))
        else:
            state = (np.random.randint(0, ROOM_SIZE), np.random.randint(0, ROOM_SIZE))
            goal = (np.random.randint(ROOM_SIZE + 1, SIZE), np.random.randint(ROOM_SIZE + 1, SIZE))
        steps = 0
        dist = np.subtract(goal, state)
        while dist[0] != 0 and dist[1] != 0:
            angle = choose_action(state, goal, epsilon, weights)
            action = (round(math.cos(angle)), round(math.sin(angle)))
            state = move(state, action)
            dist = np.subtract(goal, state)
            steps += 1
        total_steps += steps
    avg = total_steps / NUM_TEST
    avg_store.append(avg)
    mse = compute_mse(test, weights)
    mse_store.append(mse)
    if (episode + 1) % SHOW_INTERVAL == 0:
        print(("Test" if test else "Train") + f" Episode: {episode + 1}\nAverage Steps: {np.round(avg, 4)}\nGoal: {goal}\nMSE: {mse}\n")
        plot_value_function(weights, goal, avg_store, mse_store, episode, state, test)
        print_policy(weights, goal, state)
    return avg_store, mse_store

# -------------------- Helper Functions --------------------
def generate_state_goal_pairs(update_rule):
    # Generate BATCH_SIZE random integers for positions and goals for each of 4 categories
    x_coords = [[np.random.randint(low, high) for _ in range(BATCH_SIZE)]
                for low, high in [(0, ROOM_SIZE), (ROOM_SIZE + 1, SIZE), (0, ROOM_SIZE), (ROOM_SIZE + 1, SIZE)]]
    y_coords = [[np.random.randint(low, high) for _ in range(BATCH_SIZE)]
                for low, high in [(0, ROOM_SIZE), (0, ROOM_SIZE), (ROOM_SIZE + 1, SIZE), (ROOM_SIZE + 1, SIZE)]]
    goal_x = [[np.random.randint(ROOM_SIZE + 1, SIZE) for _ in range(BATCH_SIZE)] for _ in range(4)]
    goal_y = [[np.random.randint(ROOM_SIZE + 1, SIZE) for _ in range(BATCH_SIZE)] for _ in range(4)]
    # Flatten coordinate lists to produce state and goal tuples
    states = [(x, y) for xs, ys in zip(x_coords, y_coords) for x, y in zip(xs, ys)]
    goals = [(gx, gy) for gxs, gys in zip(goal_x, goal_y) for gx, gy in zip(gxs, gys)]
    return list(zip(states, goals))

def inner_update(step_size, reward, new_val, old_val, features, update_rule, td_adapted, weights, episode, idx, grad_list):
    td_error = step_size * (reward + DISCOUNT * new_val - old_val) * features
    if update_rule == 'td(0)' or episode <= 0:
        weights += td_error
    elif update_rule == 'Fish':
        td_adapted -= td_error
    elif update_rule == 'IDGM':
        grad_list[idx // BATCH_SIZE].append(td_error * features)
    return weights, td_adapted, grad_list

def outer_update(weights, outer_lr, td_adapted, update_rule, episode, state_goal_pairs, gradients, saved_features):
    if update_rule == 'Fish' and episode > 0:
        weights += outer_lr * (td_adapted - weights)
    elif update_rule == 'IDGM' and episode > 0:
        all_grads = np.array([g for sublist in gradients for g in sublist])
        if all_grads.size > 0:
            avg_grad = np.mean(all_grads, axis=0)
            weights -= outer_lr * avg_grad
    return weights

def compute_features(state, goal, dist):
    # Create a polynomial basis of order (DEGREE) for 6 features normalized by grid size
    grids = np.indices((DEGREE + 1,) * 6)
    exponents = [grid.flatten() for grid in grids]
    norm_factors = [state[0]/SIZE, state[1]/SIZE, goal[0]/SIZE, goal[1]/SIZE, dist[0]/SIZE, dist[1]/SIZE]
    feature_vals = np.ones_like(exponents[0], dtype=float)
    for exp, factor in zip(exponents, norm_factors):
        feature_vals *= factor ** exp
    return feature_vals.reshape((DEGREE + 1,) * 6)

def choose_action(state, goal, epsilon, weights):
    # With probability epsilon choose a random action; otherwise, select action with highest estimated value
    if np.random.random() < epsilon:
        return np.random.choice([0, np.pi/2, np.pi, 3*np.pi/2])
    else:
        return (np.pi/2) * argmax_action(state, goal, weights)

def argmax_action(state, goal, weights):
    values = []
    for i in range(4):
        new_state = move(state, (round(math.cos(np.pi/2 * i)), round(math.sin(np.pi/2 * i))))
        if new_state != state:
            new_dist = np.subtract(goal, new_state)
            value = np.sum(compute_features(new_state, goal, new_dist) * weights)
        else:
            value = -np.inf
        values.append(value)
    return np.argmax(values)

def move(state, action):
    new_state = [min(SIZE - 1, max(0, state[i] + action[i])) for i in range(2)]
    # Prevent moving through walls (if on corridor boundaries)
    for i in range(2):
        if new_state[0] == ROOM_SIZE and new_state[1] not in {ROOM_SIZE // 2, 2 * ROOM_SIZE - ROOM_SIZE // 2}:
            new_state[i] = state[i]
    return tuple(new_state)

def get_reward(state, new_state, goal):
    # Reward +1 for reaching the goal; small negative reward if no move is made
    if new_state == goal:
        return 1
    elif state == new_state:
        return -1 / ROOM_SIZE
    else:
        return 0

def compute_mse(test, weights):
    # Compute mean squared error between the estimated value and an optimal value over a sampled set of states
    pos_x, pos_y = np.indices((SIZE - 1, SIZE - 1))
    pos_x = [[x if x < ROOM_SIZE else x + 1 for x in row] for row in pos_x]
    pos_y = [[y if y < ROOM_SIZE else y + 1 for y in row] for row in pos_y]
    pos_list = list(np.array(pos_x).flatten()) + [ROOM_SIZE, SIZE - (ROOM_SIZE - 1) / 2 - 1, ROOM_SIZE, (ROOM_SIZE - 1) / 2]
    pos_list_y = list(np.array(pos_y).flatten()) + [(ROOM_SIZE - 1) / 2, ROOM_SIZE, SIZE - (ROOM_SIZE - 1) / 2 - 1, ROOM_SIZE]
    # Sample a set of states and compute MSE
    all_states = []
    for gx in range(ROOM_SIZE):
        for gy in range(ROOM_SIZE):
            all_states.append((pos_list[gx], pos_list_y[gy], gx, gy, gx - pos_list[gx], gy - pos_list_y[gy]))
    sample_states = random.sample(all_states, int(len(all_states) / SIZE))
    mse = np.mean([(np.sum(compute_features((s[0], s[1]), (s[2], s[3]), (s[4], s[5])) * weights) - optimal_value((s[0], s[1]), (s[2], s[3]), test)) ** 2 for s in sample_states])
    return mse

def optimal_value(state, goal, first=True, test=False):
    # Recursive estimation of optimal value (using reward and discount) for a given state; stops when state equals goal.
    if state == goal:
        return get_reward(state, state, goal)
    if not first:
        return get_reward((-1, -1), state, goal) + DISCOUNT * optimal_value((state[0] - 1, state[1]), goal, first=False, test=test)
    d = 0
    if test:
        state = (SIZE - state[0] - 1, state[1])
        goal = (SIZE - goal[0] - 1, goal[1])
    if state[0] < ROOM_SIZE and state[1] < ROOM_SIZE:
        i = np.argmin([
            np.sum(np.abs(np.subtract(state, (ROOM_SIZE - 1) / 2, ROOM_SIZE - 1))) +
            np.sum(np.abs(np.subtract(goal, (SIZE - ROOM_SIZE, SIZE - 1 - (ROOM_SIZE - 1) / 2)))),
            np.sum(np.abs(np.subtract(state, (ROOM_SIZE - 1, (ROOM_SIZE - 1) / 2)))) +
            np.sum(np.abs(np.subtract(goal, (SIZE - 1 - (ROOM_SIZE - 1) / 2, SIZE - ROOM_SIZE))))
        ])
        d = min_distance(state, goal, index=i) + ROOM_SIZE + 3
    elif state[0] > ROOM_SIZE and state[1] > ROOM_SIZE:
        d = np.sum(np.abs(np.subtract(state, goal)))
    else:
        d = min_distance(state, (SIZE - 1 - (ROOM_SIZE - 1) / 2, SIZE - ROOM_SIZE)) + \
            min_distance(goal, (SIZE - 1 - (ROOM_SIZE - 1) / 2, SIZE - ROOM_SIZE))
    return get_reward((d - 1, 0), (d, 0), (0, 0)) + DISCOUNT * optimal_value((d - 1, 0), (0, 0), first=False, test=test)

def min_distance(state, goal, index=-1):
    if index == -1:
        return min(np.sum(np.abs(np.subtract(state, goal))),
                   np.sum(np.abs(np.subtract(state, (goal[1], goal[0])))))
    options = [
        np.sum(np.abs(np.subtract(state, ((ROOM_SIZE - 1) / 2, ROOM_SIZE - 1)))) +
        np.sum(np.abs(np.subtract(goal, (SIZE - ROOM_SIZE, SIZE - 1 - (ROOM_SIZE - 1) / 2)))),
        np.sum(np.abs(np.subtract(state, (ROOM_SIZE - 1, (ROOM_SIZE - 1) / 2)))) +
        np.sum(np.abs(np.subtract(goal, (SIZE - 1 - (ROOM_SIZE - 1) / 2, SIZE - ROOM_SIZE))))
    ]
    return options[index]

def plot_value_function(weights, goal, avg_list, mse_list, episode, start_state, test):
    # Generate a meshgrid for plotting the value function surface and error surface
    x, y = np.meshgrid(np.arange(SIZE), np.arange(SIZE))
    z_val = np.array([[np.sum(compute_features((i, j), goal, np.subtract(goal, (i, j))) * weights) for j in range(SIZE)] for i in range(SIZE)])
    z_err = np.array([[(np.sum(compute_features((i, j), goal, np.subtract(goal, (i, j))) * weights) - optimal_value((i, j), goal, test=test)) ** 2 for j in range(SIZE)] for i in range(SIZE)])

    fig = plt.figure(figsize=(12, 8))
    gs = GridSpec(2, 2, width_ratios=[2, 1], height_ratios=[1, 1])
    ax1 = fig.add_subplot(gs[0, 0], projection="3d")
    ax1.plot_surface(x, y, z_val)
    ax1.set(title='Value Function', xlabel='X', ylabel='Y')
    ax1.view_init(elev=30, azim=-60)

    ax2 = fig.add_subplot(gs[0, 1])
    ax2.plot(np.arange(len(avg_list)) * EVAL_INTERVAL, avg_list, color='red')
    ax2.set(title='Policy Evaluation', ylim=[0, 1.2 * max(avg_list)])

    ax3 = fig.add_subplot(gs[1, 0], projection='3d')
    ax3.plot_surface(x, y, z_err)
    ax3.set(title='Error Surface', xlabel='X', ylabel='Y')
    ax3.view_init(elev=30, azim=-60)

    ax4 = fig.add_subplot(gs[1, 1])
    ax4.plot(np.arange(len(mse_list)) * EVAL_INTERVAL, mse_list, color='red')
    ax4.set(title='Mean Squared Error', ylim=[0, 1.2 * max(mse_list)])

    plt.subplots_adjust(wspace=0.25)
    plt.show()

def print_policy(weights, goal, start_state):
    # Print a grid representation of the policy using symbols for actions
    action_symbols = {0: ">", 1: "V", 2: "<", 3: "^"}
    for y in range(SIZE):
        row = ""
        for x in range(SIZE):
            if (x, y) == goal:
                row += "O "
            elif (x, y) == start_state:
                row += "X "
            elif (x == ROOM_SIZE and y not in {ROOM_SIZE // 2, 2 * ROOM_SIZE - ROOM_SIZE // 2}) or \
                 (y == ROOM_SIZE and x not in {ROOM_SIZE // 2, 2 * ROOM_SIZE - ROOM_SIZE // 2}):
                row += "  "
            else:
                row += action_symbols[argmax_action((x, y), goal, weights)] + " "
        print(row)
    print("\n" + "_" * 100)

def col_letter(n):
    # Convert a 1-indexed column number to an Excel-style column letter
    result = ''
    while n:
        n, rem = divmod(n - 1, 26)
        result = chr(65 + rem) + result
    return result

# -------------------- Data Export and Plotting --------------------
for p_idx, param in enumerate(params):
    train_avgs, test_avgs, train_mses, test_mses = [], [], [], []
    start_idx, update_rule = 0, ''
    DEGREE = param[4]
    for trial in range(TRIALS):
        t_avg, t_test, mse_train, mse_test, start_idx, update_rule, lr0, outer_lr = main(param[0], param[1], param[2], param[3])
        train_avgs.append(t_avg)
        test_avgs.append(t_test)
        train_mses.append(mse_train)
        test_mses.append(mse_test)

    train_avgs = np.round(np.array(train_avgs), 4)
    test_avgs = np.round(np.array(test_avgs), 4)
    train_mses = np.round(np.array(train_mses), 4)
    test_mses = np.round(np.array(test_mses), 4)

    # Export results to Google Sheets
    gc = gspread.authorize(creds)
    worksheet = gc.open(drive_name).worksheet(sheet_names[p_idx])
    worksheet.update('A1:A2', [[param[0]], ['Episode']])
    episode_cells = worksheet.range(f"A3:A{len(train_avgs[0]) + 2}")
    for i, cell in enumerate(episode_cells):
        cell.value = EVAL_INTERVAL * i
    worksheet.update_cells(episode_cells)

    for trial in range(TRIALS):
        start_col = col_letter(trial * 5 + 3)
        end_col = col_letter(trial * 5 + 6)
        header_range = f"{start_col}1:{end_col}2"
        meta = [f"Trial {trial + 1}", f"LR = {lr0}", f"Outer = {outer_lr}", f"Degree = {DEGREE}", "Train Avg", "Test Avg", "Train MSE", "Test MSE"]
        header_cells = worksheet.range(header_range)
        for i, cell in enumerate(header_cells):
            cell.value = meta[i]
        worksheet.update_cells(header_cells)
        data_range = f"{start_col}3:{end_col}{len(train_avgs[trial]) + 2}"
        data_cells = worksheet.range(data_range)
        data_pool = [train_avgs[trial], test_avgs[trial], train_mses[trial], test_mses[trial]]
        for i, cell in enumerate(data_cells):
            cell.value = data_pool[i % 4][i // 4]
        worksheet.update_cells(data_cells)

    # Plot overall performance
    fig, axes = plt.subplots(1, 4, figsize=(20, 4))
    for avgs, ax, title in zip([train_avgs, test_avgs, train_mses, test_mses], axes, ['Train Policy Eval', 'Test Policy Eval', 'Train MSE', 'Test MSE']):
        for trial_data in avgs:
            ax.plot(EVAL_INTERVAL * (np.arange(len(trial_data)) + start_idx), trial_data, color='black')
        ax.set_title(title)
        ax.set_ylim([0, 1.2 * np.max(avgs)])
    plt.subplots_adjust(wspace=0.25)
    plt.show()