In [8]:
from ompl import util as ou
from ompl import base as ob
from ompl import geometric as og
from shapely import Point, Polygon
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import json
from scipy.special import softmax
import numpy as np
import math
from matplotlib.backends.backend_pdf import PdfPages

In [9]:

import json

def load_json_data(file_path):
    """
    Reads data from a JSON file and converts it into a list of Python objects.
    
    :param file_path: Path to the JSON file.
    :return: List of Python objects.
    """
    data_list = []
    with open(file_path, "r") as json_file:
        for line in json_file:
            data_list.append(json.loads(line))
    return data_list


In [10]:

def count_occurrences_of_first_element(coordinates):
    #function that counts the duplicates of our first number
    #we use this function to help us remove the starting coordinates that repeat when our human player
    #has yet to move
    first_element = coordinates[0]
    count = coordinates.count(first_element)
    return count


In [11]:
human_paths_raw = load_json_data("data/human_paths.json")
#this takes in the raw coordinates of our human paths
first_to_cross_data = load_json_data("data/first_to_cross.json")
#stores who crossed our doorway first in our human paths, we use this to determine the effectivness of our model later on 
human_goals_data = load_json_data("data/human_goals.json")
#registers the goals for each human agent, we set this as our goal for the robots too
total_rounds = len(first_to_cross_data)
#number of overall rounds



In [12]:
#TO READ THE DATA,

#human_paths_raw[round][player][time/step][0 for x or 1 for y]

#To access the goals

#human_goals_data[round][player][0][0 for x or 1 for y]

#To access the order of crossing

# first_to_cross_data[round]
# This gives 1 if player one (left side) crossed first, or 2 if they crossed second.

# This information is crucial for evaluating our model, as we aim for the model to
# prefer the strategy that matches the observed order of crossing.

In [15]:
#we create an array to store the smaller number of duplicates between two players in the same round
min_duplicates_human_paths = []
#we plan to remove the smaller number of duplicates, as this will disregard time where the players have yet to start but we still are saving their cooridnates 

for i in range(total_rounds):
    min_duplicates_human_paths.append(min(count_occurrences_of_first_element(human_paths_raw[i][0]),  count_occurrences_of_first_element(human_paths_raw[i][1])))

#now in min_min_duplicates_human_paths, the i'th element has the smaller number of duplicates between two players in round i
#we use this to remove the duplicates at the beginning, but only the min number as we want our paths to still have the same number of steps

In [16]:
human_paths_improved = []
#this array will have our human paths but remove the first duplicates
for round in range(total_rounds):
    temp_path = []
    for p in range(2):
        temp_path.append(human_paths_raw[round][p][min_duplicates_human_paths[round]: ])
    human_paths_improved.append(temp_path)

In [17]:
#we define our obstacles
obstacles = []

obstacle_1 = Polygon([(-3, 10),(3, 10), (3, 2),(-3, 2)])
obstacles.append(obstacle_1)

obstacle_2 = Polygon([(-3, -2), (3, -2),(3, -10), (-3, -10)])
obstacles.append(obstacle_2)


In [18]:
# We store the strategies generated by RRT that match in order of crossing in the following
matching_strategies = []
# We store the strategies generated by RRT that don't match in order of crossing in the following
non_matching_strategies = []

# Create two state validity checkers. 
# One forces the strategy generated to have player one cross first, (isStateValid_player1_first)
# and the other forces player two to cross first. (isStateValid_player2_first)

def isStateValid_player1_first(state):
    # Extract agent coordinates from the state.
    x1 = state[0]
    y1 = state[1]
    x2 = state[2]
    y2 = state[3]

    agent1 = Point(x1, y1).buffer(1)
    agent2 = Point(x2, y2).buffer(1)

    # forces the strategy generated to have the first agent to cross first.
    if x2 < 3 and x1 < 3:
        return False

    # Check if either agent is outside the bounds of the environment.
    if x1 > 10 or x1 < -10 or y1 > 10 or y1 < -10 or x2 > 10 or x2 < -10 or y2 > 10 or y2 < -10:
        return False

    # Check if either agent intersects with any obstacles or with each other.
    for obstacle in obstacles:
        if obstacle.intersects(agent1) or obstacle.intersects(agent2) or agent1.intersects(agent2):
            return False
            
    # Return True if all checks pass, False otherwise.
    return True

def isStateValid_player2_first(state):
    # Extract agent coordinates from the state.
    x1 = state[0]
    y1 = state[1]
    x2 = state[2]
    y2 = state[3]

    agent1 = Point(x1, y1).buffer(1)
    agent2 = Point(x2, y2).buffer(1)

    # forces the strategy generated to have the second agent to cross first.
    if x1 > -3 and x2 > -3:
        return False

    # Check if either agent is outside the bounds of the environment.
    if x1 > 10 or x1 < -10 or y1 > 10 or y1 < -10 or x2 > 10 or x2 < -10 or y2 > 10 or y2 < -10:
        return False

    # Check if either agent intersects with any obstacles or with each other.
    for obstacle in obstacles:
        if obstacle.intersects(agent1) or obstacle.intersects(agent2) or agent1.intersects(agent2):
            return False
            
    # Return True if all checks pass, False otherwise.
    return True


#generates the strategies (paths for both players) that force player one to cross first
def plan_player1_first(i):
    # Create a state space, set its bounds, and create a simple setup.
    space = ob.RealVectorStateSpace(4)
    bounds = ob.RealVectorBounds(4)
    bounds.setLow(-10)
    bounds.setHigh(10)
    space.setBounds(bounds)

    # Create a simple setup with the 4D state space
    ss = og.SimpleSetup(space)

    # Set the state validity checker to isStateValid_player1_first.
    ss.setStateValidityChecker(ob.StateValidityCheckerFn(isStateValid_player1_first))

    # Create start and goal states based on the human paths and goals.
    start = ob.State(space)
    start[0] = human_paths_improved[i][0][0][0]
    start[1] = human_paths_improved[i][0][0][1]
    start[2] = human_paths_improved[i][1][0][0]
    start[3] = human_paths_improved[i][1][0][1]

    goal = ob.State(space)
    goal[0] = human_goals_data[i][0][0][0]
    goal[1] = human_goals_data[i][0][0][1]
    goal[2] = human_goals_data[i][1][0][0]
    goal[3] = human_goals_data[i][1][0][1]

    # Set the start and goal states for the simple setup.
    ss.setStartAndGoalStates(start, goal)

    # Create an RRT planner and set it for the simple setup.
    planner = og.RRTstar(ss.getSpaceInformation())
    ss.setPlanner(planner)

    # Try to solve the problem within a time limit.
    solved = ss.solve(5.0)
    if solved:
        # Interpolate the path to match the length of the human path.
        path = ss.getSolutionPath()
        path.interpolate(len(human_paths_improved[i][0])) 

        # Extract agent coordinates from the path.
        agent1data = []
        agent2data = []
        for state in path.getStates():
            x1 = state[0]
            y1 = state[1]
            x2 = state[2]
            y2 = state[3]
            agent1data.append((x1, y1))
            agent2data.append((x2, y2))
        temp_path = [agent1data, agent2data]

        # If the actual first player to cross matches with the strategy generated, 
        # add the strategy to the matching strategies, else to the non-matching strategies.
        if first_to_cross_data[i][0] == 1:
            matching_strategies.append(temp_path)
        else:
            non_matching_strategies.append(temp_path)
    else:
        print("no solution")

#generates the strategies (paths for both players) that force player two to cross first
def plan_player2_first(i):
    # Same steps as plan_player1_first, but with a different state validity checker.
    space = ob.RealVectorStateSpace(4)
    bounds = ob.RealVectorBounds(4)
    bounds.setLow(-10)
    bounds.setHigh(10)
    space.setBounds(bounds)

    ss = og.SimpleSetup(space)

    ss.setStateValidityChecker(ob.StateValidityCheckerFn(isStateValid_player2_first))

    start = ob.State(space)
    start[0] = human_paths_improved[i][0][0][0]
    start[1] = human_paths_improved[i][0][0][1]
    start[2] = human_paths_improved[i][1][0][0]
    start[3] = human_paths_improved[i][1][0][1]

    goal = ob.State(space)
    goal[0] = human_goals_data[i][0][0][0]
    goal[1] = human_goals_data[i][0][0][1]
    goal[2] = human_goals_data[i][1][0][0]
    goal[3] = human_goals_data[i][1][0][1]

    ss.setStartAndGoalStates(start, goal)

    planner = og.RRTstar(ss.getSpaceInformation())
    ss.setPlanner(planner)

    solved = ss.solve(5.0)
    if solved:
        path = ss.getSolutionPath()
        path.interpolate(len(human_paths_improved[i][0]))

        agent1data = []
        agent2data = []
        for state in path.getStates():
            x1 = state[0]
            y1 = state[1]
            x2 = state[2]
            y2 = state[3]
            agent1data.append((x1, y1))
            agent2data.append((x2, y2))
        temp_path = [agent1data, agent2data]
        if first_to_cross_data[i][0] == 2:
            matching_strategies.append(temp_path)
        else:
            non_matching_strategies.append(temp_path)
    else:
        print("no solution")



In [19]:
for i in range(total_rounds):
    plan_player1_first(i)
    plan_player2_first(i)

Debug:   RRTstar: Planner range detected to be 8.000000
Info:    RRTstar: No optimization objective specified. Defaulting to optimizing path length for the allowed planning time.
Info:    RRTstar: Started planning with 1 states. Seeking a solution better than 0.00000.
Info:    RRTstar: Initial k-nearest value of 83
Info:    RRTstar: Found an initial solution with a cost of 31.88 in 655 iterations (113 vertices in the graph)
Info:    RRTstar: Created 638 new states. Checked 203841 rewire options. 1 goal states in tree. Final solution cost 26.178
Info:    Solution found in 5.049802 seconds
Debug:   RRTstar: Planner range detected to be 8.000000
Info:    RRTstar: No optimization objective specified. Defaulting to optimizing path length for the allowed planning time.
Info:    RRTstar: Started planning with 1 states. Seeking a solution better than 0.00000.
Info:    RRTstar: Initial k-nearest value of 83
Info:    RRTstar: Found an initial solution with a cost of 41.76 in 678 iterations (68 v

In [20]:
def error(human_path, generated_path, n):
    #error function to compare two paths, this will be used in updating our agent's opinions
    #it calculates the difference between paths the next n steps
    e = 0
    for i in range(n):
        e += math.dist(human_path[i], generated_path[i])
    return e/n

In [29]:
def opinion_model(matching_strategy, non_matching_strategy, human_strategy, u, n):

    # n is how frequent our agent updates its opinon, every n steps
    #u is the weight of the social incentives of other agents, currently in this code it won't make a difference as we have no ratioanl incentives 
    
    
    #this will be our agents opinion on the two paths 
    z = [0,0] 
    
    length_of_paths = len(matching_strategy) #this will be the same across all 3 paths
    
    for j in range(length_of_paths // n -1): 
        #this for loop alllows us to iteratively update our opinion every n steps

        error_strategy_0 = -error(matching_strategy[j*n:], human_strategy[j*n:], n)
        error_strategy_1 = -error(non_matching_strategy[j*n:], human_strategy[j*n:], n)
        #these two will be the error between each teh strategies each n steps

        relative_error_strategy_0 = math.tanh(error_strategy_0 - error_strategy_1) + 1 
        relative_error_strategy_1 = math.tanh(error_strategy_1 - error_strategy_0) + 1
        #these are the relative error estimates 

        z[0] = z[0] - (1/ (j + 1)) * (z[0] - u * relative_error_strategy_0)
        z[1] = z[1] - (1/ (j + 1)) * (z[1] - u * relative_error_strategy_1)

        print(z)
    return z
        

        

In [31]:
opinion_model(matching_strategies[0][0], non_matching_strategies[0][0], human_paths_improved[0][0], 1, 30)

#this case we see agent 0 opinion on the matching strategy as time goes on 
#Round 0, Agent 0


[1.5229937777539484, 0.4770062222460515]
[1.7411983454791782, 0.2588016545208218]
[1.8263464778933793, 0.1736535221066206]
[1.869705515189707, 0.13029448481029293]
[1.8957636134693452, 0.10423638653065492]
[1.9131362408253567, 0.08686375917464345]
[1.9255453114591028, 0.07445468854089735]
[1.9348519254316554, 0.0651480745683447]
[1.9420885854027747, 0.057911414597225375]
[1.9478402045795615, 0.05215979542043856]
[1.9520303412688083, 0.0479696587311919]
[1.9493208781601112, 0.05067912183988898]


[1.9493208781601112, 0.05067912183988898]

In [35]:
opinion_model(matching_strategies[0][1], non_matching_strategies[0][1], human_paths_improved[0][1], 1, 30)
#agent 1's opinion on the same case 

[1.4868138670768278, 0.5131861329231721]
[0.8012078157201789, 1.1987921842798213]
[0.579925688333119, 1.420074311666881]
[0.9044181892385461, 1.0955818107614541]
[1.1235187675559017, 0.8764812324440986]
[1.2695989589413645, 0.7304010410586357]
[1.373941964460755, 0.6260580355392453]
[1.4521992126951844, 0.5478007873048159]
[1.5130655493628917, 0.4869344506371084]
[1.5616957142791932, 0.4383042857208068]


[1.5616957142791932, 0.4383042857208068]