In [1]:
import random
import copy
from scipy.stats import norm
import numpy as np
import argparse
import sys
import os
import yaml
import logging

# Get current and parent directory to handle import paths
current = os.getcwd()
parent = os.path.dirname(current)
sys.path.append(parent)

from env import gridworld_env2
from agent.q_learning_agent import ValueIteration
from reward_learning.ebirl_v2 import EBIRL
from utils.common_helper import (calculate_percentage_optimal_actions,
                                 compute_policy_loss_avar_bound,
                                 calculate_expected_value_difference)
from utils.env_helper import print_policy

In [61]:
import numpy as np

# Generate 50 random 3D vectors with 3 elements each from a normal distribution
random_vectors = np.random.randn(500, 3)

# Sort elements within each vector in ascending order
sorted_vectors = np.sort(random_vectors, axis=1)

# Normalize each vector to have unit length
normalized_vectors = sorted_vectors / np.linalg.norm(sorted_vectors, axis=1, keepdims=True)

# Filter vectors where the last element is positive AND the second element is negative
filtered_vectors = normalized_vectors[(normalized_vectors[:, -1] > 0) & (normalized_vectors[:, 1] < 0)]

# Print the filtered result
np.save('grid_world_weights.npy', filtered_vectors[:50])



In [2]:
feature_weights_list = np.load("grid_world_weights.npy")

In [3]:
feature_weights_list

array([[-0.97266673, -0.03083248,  0.23014951],
       [-0.35835056, -0.08647525,  0.92957351],
       [-0.71046942, -0.64188771,  0.28846728],
       [-0.89578956, -0.24432685,  0.37130237],
       [-0.7282924 , -0.22722733,  0.64649665],
       [-0.75022399, -0.57596895,  0.32469021],
       [-0.70842322, -0.26108332,  0.65572254],
       [-0.74635145, -0.57484795,  0.33542412],
       [-0.0505111 , -0.01531811,  0.99860602],
       [-0.76518493, -0.64328377,  0.02603879],
       [-0.87829137, -0.24439244,  0.41094598],
       [-0.8497926 , -0.50850291,  0.13884283],
       [-0.34477193, -0.32328333,  0.88126058],
       [-0.43088031, -0.17122291,  0.88601629],
       [-0.75433776, -0.02805593,  0.65588674],
       [-0.92032338, -0.11683579,  0.37330186],
       [-0.76208548, -0.28664819,  0.58056743],
       [-0.39174849, -0.30341429,  0.86860399],
       [-0.92648321, -0.16748955,  0.33701054],
       [-0.61012931, -0.3129562 ,  0.72787406],
       [-0.3738588 , -0.3262014 ,  0.868

In [39]:
def generate_random_trajectory(env, max_horizon=25):
    """
    Generate a random trajectory of fixed length (max_horizon + 1) using random actions.
    The state is stored as an integer index (raw_index) instead of (row, col).
    
    Args:
        env: The GridWorld environment.
        max_horizon (int): Maximum length of the trajectory.
        
    Returns:
        list of (state_index, action) tuples.
    """
    trajectory = []
    obsv = env.reset()  # Reset environment and get initial observation.
    agent_position = obsv["agent"]  # [row, col]
    terminal_states = obsv["terminal states"]  # List of terminal states as indices

    # Compute the raw index (integer) for the initial state.
    state = agent_position[0] * env.columns + agent_position[1]

    for step in range(max_horizon + 1):
        # Check if the current state is terminal.
        if state in terminal_states:
            break  # Stop generating the trajectory if a terminal state is reached.

        # Choose a random action uniformly.
        action = np.random.choice(env.num_actions)

        # Sample the next state based on transition probabilities.
        next_state = np.random.choice(env.num_states, p=env.transitions[state][action])

        # Append (current state, chosen action) to the trajectory.
        trajectory.append((state, action))

        # Update state (now directly using raw index).
        state = next_state

    return trajectory

In [40]:
# Initialize the environment
color_to_feature_map = {
    "red": [1, 0, 0],
    "blue": [0, 1, 0],
    "black": [0, 0, 1]  # Terminal state
}

custom_grid_features = [
    ["blue", "red", "blue"],
    ["blue", "blue", "black"]
]

env = gridworld_env2.NoisyLinearRewardFeaturizedGridWorldEnv(
    gamma=1,
    color_to_feature_map=color_to_feature_map,
    grid_features=custom_grid_features,
    custom_feature_weights=[-0.69171446, -0.20751434,  0.69171446]
)

# Generate a random trajectory
max_horizon = 5
random_trajectory = generate_random_trajectory(env, max_horizon)


In [None]:
random_trajectory

In [42]:
def simulate_human_estop(env, full_trajectory, beta=2.0, gamma=1.0, fixed_length=None):
    """
    Simulates human E-stop (early stopping) behavior in a GridWorld environment and ensures all output trajectories have the same length.

    Args:
        env (NoisyLinearRewardFeaturizedGridWorldEnv): The environment instance.
        full_trajectory (list): A full-length trajectory as [(state, action), ...].
        beta (float): Sensitivity parameter for Boltzmann distribution.
        gamma (float): Discount factor for cumulative rewards.
        fixed_length (int, optional): Desired fixed length for the output trajectory. If the trajectory is shorter, the last step is repeated.

    Returns:
        tuple: (trajectory, stopping_time)
    """
    cumulative_rewards = []
    current_reward = 0

    for k, (state, _) in enumerate(full_trajectory):
        if state is None:  # Handle padding
            break

        # Compute reward for the current state using the environment function
        reward = env.compute_reward(state)  # Now using the built-in reward function

        # Discounted cumulative reward up to step k
        current_reward += (gamma**k) * reward
        cumulative_rewards.append(current_reward)

    # Compute stopping probabilities using Boltzmann distribution
    probabilities = np.exp(beta * np.array(cumulative_rewards))
    probabilities /= probabilities.sum()

    # Use the stopping point with the highest cumulative reward
    t_stop = np.argmax(cumulative_rewards)

    # Pad the trajectory to ensure it matches the fixed length
    if fixed_length is not None:
        last_step = full_trajectory[-1]
        while len(full_trajectory) < fixed_length:
            full_trajectory.append(last_step)

    return (full_trajectory[:fixed_length] if fixed_length else full_trajectory, t_stop)


In [43]:
def simulate_human_estop_v2(env, full_trajectory, beta=2.0, gamma=1.0):
    """
    Simulates E-stop data based on the provided likelihood model.

    Args:
        env (NoisyLinearRewardFeaturizedGridWorldEnv): The environment instance.
        full_trajectory (list): A full-length trajectory as [(state, action), ...].
        beta (float): Sensitivity parameter for Boltzmann distribution.
        gamma (float): Discount factor for cumulative rewards.

    Returns:
        tuple: (trajectory, stopping_time)
    """
    traj_len = len(full_trajectory)

    # Compute cumulative reward for the entire trajectory
    traj_reward = sum(env.compute_reward(s) for s, _ in full_trajectory)

    # Initialize variables
    cumulative_rewards = []
    probabilities = []

    # Compute cumulative rewards up to each time step and probabilities
    for t in range(traj_len):
        # Reward up to time t
        reward_up_to_t = sum(env.compute_reward(s) for s, _ in full_trajectory[:t+1])

        # Add repeated reward for the last step
        reward_up_to_t += (traj_len - t - 1) * env.compute_reward(full_trajectory[t][0])

        # Numerator and denominator for the stopping probability
        numerator = np.exp(beta * reward_up_to_t)
        denominator = np.exp(beta * traj_reward) + numerator

        # Compute the probability of stopping at time t
        stop_probability = numerator / denominator
        probabilities.append(stop_probability)

    # Normalize probabilities (to ensure numerical stability)
    probabilities = np.array(probabilities)
    probabilities /= probabilities.sum()

    # Sample stopping point t_stop from the computed probabilities
    t_stop = np.random.choice(len(probabilities), p=probabilities)

    # Return the trajectory and the stopping point
    return (full_trajectory, t_stop)


In [None]:
simulate_human_estop_v2(env, full_trajectory=random_trajectory, beta=100, gamma=1.0)

In [None]:
random_trajectory

In [None]:
(5, None)llklk
