# Group Theory for Autonomous Agents
## By David Payne

A Google robot walks into a bar full of Amazon robots working on a puzzle. How does it act to help the most?

This program is intended for exploring the idealized ways for agents to cooperate on a task under a variety of conditions. Specifically interested in situations where the robots cannot directly share data (e.g. an Amazon robot having to work with a Google robot) 

Takes inspiration from economics research using simplified models like the Prisoner's Dilemma, psychological research on groups, self-organizing system research,and acting where each person has a part to play.

In [18]:
## Imports
import random
import copy
import csv
from time import process_time 
from math import ceil

List of possible features to add:

* additional attributes for actors:
    * various skills (Strength, Dexterity, Intelligence)
    * forgetfullness (mistakes and omissions in memory)
    * Accuracy/(Some other term), basically are they right about their skill level, determining task difficulty

In [19]:
## Classes
# Class for actors. Can vary by actor or have all the same
class actorClass:
    def __init__(self):
        self.stubbornness = 0
        self.planner = 'no'
        self.skill = 1
        self.plan = []
        self.memory = []
        self.location = []
        self.name = "undefined"
        self.number = 0 # This is assigned as actors are created. Unique id number
        
# Class for Stage. A list of a list of dictionaries
class stageClass:
    def __init__(self, stageLength, stageWidth):
        self.length = stageLength
        self.width = stageWidth
        self.stage = [] # Placeholder
        self.adjacency_dependency = 'no' # Are actors only... 
                #allowed to take on tasks adjacent to the one they've just done
    
    # Create the stage given the length and width given at initialization
    def build(self):
        self.stage = [0] * self.length
        for row in range(self.length):
            self.stage[row] = [{
                'complete': 0,
                'difficulty': 1
                # possible precursors for specific dependencies
                # If high==low, all have same difficulty
            } for k in range(self.width)]
    
    # Randomize difficulty of tasks on stage
    def randomize_difficulty(self, low, high): #Inclusive of low and high
        for row in range(self.length):
            for col in range(self.width):
                self.stage[row][col] = random.randint(low, high)

#################################################################
## Program Functions

In [20]:
# get_random_open_task runs through to find a task/spot not currently taken or done
# Note: should not be called without first checking completeness else an infinite loop is possible
def get_random_open_task(inputStage):
    valid_location = 'no'
    # randomizes the location given
    while valid_location == 'no':
        vertical_position = random.randint(0,inputStage.length-1)
        horizontal_position = random.randint(0,inputStage.width-1)
        # checks to make sure no actor is currently there. No overlaps are allowed
        if inputStage.stage[vertical_position][horizontal_position]['complete'] == 0:
            valid_location = 'yes'
    return [vertical_position, horizontal_position]

In [21]:
# Checks whether there are any open tasks
def check_completeness(inputStage):
    complete = 1 # Defaults to complete. If it finds any open spot, it toggles to incomplete
    for row in range(inputStage.length):
        for column in range(inputStage.width):
            if inputStage.stage[row][column]['complete'] == 0:
                complete = 0 # If it finds an empty square, it sets as incomplete
    return complete

In [22]:
# gets the possible valid adjacent moves for the actor - constraints - no wrapping, no overlaps, no diagonal
def get_valid_possible_moves(current_vert_pos, current_horiz_pos, inputStage):
    # Determine valid moves - look right, left, down, up
    # initialize move matrix
    moves = []
    
    # check right
    right = current_horiz_pos+1
    if (right>=0) and (right<inputStage.width) and (inputStage.stage[current_vert_pos][right]['complete'] == 0):
        moves.append([current_vert_pos, right]) # updates the list of moves
    # check left
    left = current_horiz_pos-1
    if (left>=0) and (left<inputStage.width) and (inputStage.stage[current_vert_pos][left]['complete'] == 0):
        moves.append([current_vert_pos, left])
    # check down
    down = current_vert_pos+1
    if (down>=0) and (down<inputStage.length) and (inputStage.stage[down][current_horiz_pos]['complete'] == 0):
        moves.append([down, current_horiz_pos])
    # check up
    up = current_vert_pos-1
    if (up>=0) and (up<inputStage.length) and (inputStage.stage[up][current_horiz_pos]['complete'] == 0):
        moves.append([up, current_horiz_pos])
        
    # Returns a list of valid positions
    return moves

In [23]:
# plan role, meaning the set of tasks that actor will do
def plan_role(start_vert_pos, start_horiz_pos, number_of_tasks_to_plan, inputStage):
    # Plans out a role (aka set of tasks) of given length based on current position
    vert_pos = start_vert_pos
    horiz_pos = start_horiz_pos
    planned_role = [[vert_pos, horiz_pos]]
    planStage = copy.deepcopy(inputStage) # Creates an image of current stage to then build from
    for task in range (number_of_tasks_to_plan):
        # choose a next task
        valid_next_moves = get_valid_possible_moves(vert_pos, horiz_pos, planStage)
        # check to make sure there is a valid move
        if (len(valid_next_moves) > 0):
            next_task = random.choice(valid_next_moves)
            # add that to the end of the plan
            planned_role.append(next_task)
            planStage.stage[next_task[0]][next_task[1]]['complete'] = -1 # Complete flag is -1
            # update plan position for next iteration
            vert_pos = next_task[0]
            horiz_pos = next_task[1]
        # Return role as planned thus far if no valid move remains
        else:
            return planned_role
    return planned_role

In [24]:
# Printing the results of each time step
def print_stage(inputStage):
     # The * refers to completed spots, 0 for jobs to be done, numbers for actor locations
    print('  ', '\n   '.join(' '.join(
        str(inputStage.stage[row][column]['complete']).replace('-1', '*') for column in range(inputStage.width)) 
                for row in range(inputStage.length)))

In [25]:
# Assign starting locations for each actor
def initialize_memory(actors_list, inputStage, print_flag):
    for actor in actors_list: # Using actor class
        # Get a random task/spot and udpate the actor locations
        if (check_completeness(inputStage) == 1):
            # If the task is already complete (no empty spots) then say the actor was idle
            actor.memory.insert(0, [[-1,-1], 'no'])
            if (print_flag==1):
                print("No initial spots left on stage for actor %i\n" %(actor.number))
        else:
            actor.location = get_random_open_task(inputStage)
            # Update the stage to reflect that position
            inputStage.stage[actor.location[0]][actor.location[1]]['complete'] = actor.number
            # Initialize each actor's memory
            # For now - just previous action and was it successful
            actor.memory.insert(0, [actor.location, 'yes'])

In [26]:
# Creates a stage and then builds it by calling a class function
def build_stage(stageLength, stageWidth):
    # Build stage
    built_stage = stageClass(stageLength, stageWidth)
    built_stage.build()
    return built_stage

In [27]:
# Takes a list of actor attributes in and then creates each actor
# Important so that each actor gets a unique ID number
def initialize_actors(list_of_actors):
    number_of_actors = len(list_of_actors)
    actors = [0]*number_of_actors
    for actor_number in range(number_of_actors):
        actors[actor_number] = actorClass()
        actors[actor_number].stubbornness = list_of_actors[actor_number]['stubbornness']
        actors[actor_number].planner = list_of_actors[actor_number]['plan_follower']
        actors[actor_number].skill = list_of_actors[actor_number]['skill']
        actors[actor_number].number = actor_number+1 # +1 since the iterator starts at 0
    return actors

In [28]:
# Main function. Calls above functions. Takes in a dictionary of inputs to set parameters
def run_collaboration_sim(inputDict):
    # Initial Values
    max_time_steps = inputDict['max_time_steps']
    stageLength = inputDict['stageLength']
    stageWidth = inputDict['stageWidth']
    numberOfActors = inputDict['number_of_actors']
    print_flag = inputDict['print_flag']
    
    # Calculates the minimum number of steps given the stage size and number of actors
    minimum_possible_steps = int(ceil(stageLength*stageWidth/numberOfActors))
    if (print_flag == 1):
        print("Total number of tasks: {}\nNumber of Actors: {}\nMax Time Steps: {}\n".format((stageLength*stageWidth), numberOfActors, max_time_steps))
        print("Minimum Number of Steps to Complete: %i\n" %(minimum_possible_steps))
    
    # Initialize stage, actors and memory
    mainStage = build_stage(stageLength, stageWidth)
#     if (inputDict['task_difficulty_enabled'] == 'yes'):
#         mainStage.set_difficulty(1,1)
    actors = initialize_actors(inputDict['list_of_actors'])
    initialize_memory(actors, mainStage, print_flag)
    mainStage.adjacency_dependency = inputDict['adjacency_dependency']
    
    ## Play game
    # check completeness. Possible already complete if tasks <= number of actors
    complete = check_completeness(mainStage)
    if (complete == 0):
        # Plan roles
        for actor in actors:
            role = plan_role(actor.location[0], actor.location[1], 
                             max_time_steps-1, mainStage) #-1 due to initial step already taken
            actor.plan = role
            if (print_flag==1):
                print("Planned role for actor %i\n" %(actor.number), role)
    
    
    # initialize the time via number of steps
    step = 1
    if (print_flag==1):
        print("Time Step %i:\n" %(step))
        print_stage(mainStage)
    
    
    # Run the game until completion, meaning all tasks/spots covered or until hitting the max time
    while (complete == 0) and (step < max_time_steps):
        step += 1
        if (print_flag==1):
            print("Time Step %i:\n" %(step))

        # initialize a matrix of intended moves
        intended_moves = [0]*numberOfActors

        # Complete once for each actor determining their valid moves and building a table of intended moves
    #     for actor in range(numberOfActors):
        for actor in actors:
            # If the actor is a plan follower, their last step was successful, and they still have a plan - continue on the plan
            if ((actor.planner == 'yes') and (actor.memory[0][1] == 'yes') and (len(actor.plan)>=step) 
                and (mainStage.stage[actor.plan[step-1][0]][actor.plan[step-1][1]]['complete']==0)):
                    next_move = actor.plan[step-1]
            else:
                # get all valid adjacent moves
                valid_next_moves = get_valid_possible_moves(actor.location[0],actor.location[1], mainStage)
                # If there is a valid adjacent move, do that
                if (len(valid_next_moves) > 0):
                    next_move = random.choice(valid_next_moves)
                # Else if the stage is not set to be strictly adjacent dependent, select an open task elsewhere
                elif (mainStage.adjacency_dependency == 'no'):
                    next_move = get_random_open_task(mainStage)
                # Or stay where they are
                else:
                    if (print_flag==1):
                        print('Actor %i has no valid moves available' %(actor.number))
                    next_move = actor.location

            # If that intended move is the same as most recent action and that was a failure (this is redundant)
            # given current implementation. And checks that the actor is not intending to stay in place
            if ((next_move == actor.memory[0][0]) and (actor.memory[0][1] == 'no') 
                and (next_move != actor.location)):
            # decide whether to try again or hold still
                dice_roll = random.randint(1,101)
                # try the same thing again
                if (dice_roll <= actor.stubbornness):
                    intended_moves[actor.number-1] = next_move
                    if (print_flag==1):
                        print('Actor %i doing the same thing again and expecting a different result' %(actor.number))
                # stay where they are
                else:
                    if (print_flag==1):
                        print('Actor %i standing idle, letting others go first after failed last action' %(actor.number))
                    intended_moves[actor.number-1] = actor.location
            else:
                intended_moves[actor.number-1] = next_move
        # completes for loop

        # Second for loop to check for any conflicts and then update stage
        for actor in actors:
            movers_intending = 0

            # checks that a mover is intending to move
            if intended_moves[actor.number-1] == actor.location:
                actor.memory.insert(0,[intended_moves[actor.number-1], 'no'])
            else:
                # checks whether that move conflicts with another's intended move
                for others in actors:
                    if intended_moves[actor.number-1] == intended_moves[others.number-1]:
                        movers_intending += 1
                # if the mover is the only one intending to go to that spot, do that
                if movers_intending == 1:
                    # gets the mover's location. This assignment done only for readability of rest of code block
                    moverVerticalPos = actor.location[0]
                    moverHorizontalPos = actor.location[1]
                    # updates stage with move
                    mainStage.stage[moverVerticalPos][moverHorizontalPos]['complete'] = -1
                    mainStage.stage[intended_moves[actor.number-1][0]][intended_moves[actor.number-1][1]]['complete'] = actor.number
                    # update actor location with move
                    actor.location = intended_moves[actor.number-1]
                    # update memory to indicate success
                    actor.memory.insert(0, [intended_moves[actor.number-1], 'yes'])
                elif movers_intending > 1:
                    # update memory to indicate failure
                    actor.memory.insert(0,[intended_moves[actor.number-1], 'no'])
                    if (print_flag==1):
                        print('Conflict! Actor %i was intending the same move as another actor. Actor holding still' % (actor.number))


        # check completeness
        complete = check_completeness(mainStage)
        if (print_flag==1):
            print_stage(mainStage)
    
    ######################################3
    ## Final statistics after game over
    outcome_statistics = {} 
    outcome_statistics["complete?"] = complete
    # Determine efficacy
    if (print_flag==1):
        print('Statistics for task:')
    
    if complete == 1:
        if (print_flag==1):
            print('Completed in %i steps!' %step)
        efficacy = minimum_possible_steps/step*100
        if (print_flag==1):
            print('The minimum possible number of steps was {}, that is an efficacy of {} percent'.format(minimum_possible_steps,efficacy))
    
    elif complete == 0:
        efficacy = 0
        if (print_flag==1):
            print('Did not complete. Hit time limit of %i maximum time steps' %max_time_steps)
    outcome_statistics["Steps Taken"] = step
    outcome_statistics["Efficacy"] = efficacy

    # Calculate what percent of the time each actor is waiting around
    for actor in actors:
        idle_time = 0
        for action in range(len(actor.memory)):
            if (actor.memory[action][1] == 'no'):
                idle_time += 1
        percent_time_idle = idle_time/(step)*100
        outcome_statistics["idleness_actor"+str(actor.number)] = percent_time_idle
        if (print_flag==1):
            print("Actor {} was idle for {} time steps out of {} total steps or in other words, {} percent of the time".format((actor.number), idle_time, step, percent_time_idle))
    
    
    return outcome_statistics

In [29]:
## Writer to file
# Assumes that stats are given as a list of [[header, data],[header, data],...] 
def output_statistics_file(stats_list):
    file = open("./simulation_out.csv",'w', newline='') # Open as write. Will auto-create file
    writer = csv.writer(file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
    headers = []
    data = []
    for stat in stats_list:
        headers.insert(0,stat[0])
        data.insert(0,stat[1])
    writer.writerows([headers, data])
    file.close()

## End Program Functions
#################################################################

In [30]:
# Runs the simulation numerous times across a range of parameters
def sim_test_bench(parameters):
    # Add a check to make sure values are compatible
    # Open output file to write to. 
    file = open("./simulation_out.csv",'w', newline='') # Open as write. Will auto-create file/overwrite existing
    writer = csv.writer(file, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
    input_headers = ['stageLength', 'stageWidth', 'min_possible_time','max_time_steps'
                     , 'number_of_actors','adjacency_dependency']
    output_headers = ['complete?', 'Steps Taken', 'Efficacy']
#     actor_list = copy.deepcopy(parameters['actor_list'])
    
    if (parameters['identical_actors?']=='yes'):
        actor_base = {} # created as empty dict. Will convert to list
        attributes = parameters['actor_attributes']    
        max_actors = parameters['max_actors']
    
    actor_output_headers = []
    for number in range(max_actors):
        actor_output_headers.append("idleness_actor"+str(number+1))
    writer.writerow(input_headers+["stubbornness"]+['plan_follower']+['skill']+output_headers+actor_output_headers)
    
    Inputs = {}
    Inputs['print_flag'] = 0 # Set print off
    Inputs['adjacency_dependency'] = parameters['adjacency_dependency']
    
    # Iterates through the possible combinations of values. Could parallelize
    for stub_value in range(len(attributes['stubbornness'])): #If multiple values given, they're used. Else set
        actor_base['stubbornness'] = attributes['stubbornness'][stub_value]
        for planner_value in range(len(attributes['plan_follower'])):
            actor_base['plan_follower'] = attributes['plan_follower'][planner_value]
            for skill_value in range(len(attributes['skill'])):
                actor_base['skill'] = attributes['skill'][skill_value]
                # Duplicate actors
                actor_list = [actor_base]*parameters['max_actors']
                for stageL in range(parameters['max_stage_length']):
                    Inputs['stageLength'] = stageL+1 # Plus one because of range starting at 0
                    for stageW in range(parameters['max_stage_width']):
                        Inputs['stageWidth'] = stageW+1                   
                        for actors in range(max_actors):
                            Inputs['number_of_actors'] = actors+1
                            Inputs['list_of_actors'] = actor_list[:Inputs['number_of_actors']]
                            # Figure out the minimum time possible. Don't iterate through unwinnable games
                            min_possible_time = int(ceil(Inputs['stageLength']*Inputs['stageWidth']/Inputs['number_of_actors']))
                            Inputs['min_possible_time'] = min_possible_time

                            for time in range(min_possible_time, parameters['max_time']+1):
                                Inputs['max_time_steps'] = time
                                statistics = run_collaboration_sim(Inputs)
                                
                                # Compile write data
                                writedata = []
                                for i in range(len(input_headers)):
                                    writedata.append(Inputs[input_headers[i]])
                                writedata.append(actor_base['stubbornness'])
                                writedata.append(actor_base['plan_follower'])
                                writedata.append(actor_base['skill'])
                                for j in range(len(output_headers)):
                                    writedata.append(statistics[output_headers[j]])
                                for k in range(Inputs['number_of_actors']):
                                    writedata.append(statistics[actor_output_headers[k]])  
                                writer.writerow(writedata)
    file.close()

In [31]:
## Used to run a single simulation (as opposed to a full test bench)
def single_tester_simulation():
    actor_base = {
        'stubbornness' : 25, # How likely is an actor to try something again after failing. 0-100
        'plan_follower': 'yes', # Does the actor create a plan and try to follow it
        'skill': 1 # General "skill" at completing tasks. Could be changed to be more specific
    }
    Test_Inputs = {
        'stageLength': 5,
        'stageWidth': 5,
        'max_time_steps': 10, # How long to allow this to run. If complete, will end before this
        'adjacency_dependency': 'yes',
        'number_of_actors': 3,
        'print_flag': 1, # Print Game? 0 = no, 1 = yes
        'task_difficulty_enabled': 'yes'
    }
    # Create a list of identical actors by duplicating the base actor
    Test_Inputs['list_of_actors'] = [actor_base]*Test_Inputs['number_of_actors']
    
    min_possible_time = int(ceil(Test_Inputs['stageLength']*Test_Inputs['stageWidth']
                                 /Test_Inputs['number_of_actors']))
    Test_Inputs['min_possible_time'] = min_possible_time
    
    statistics = run_collaboration_sim(Test_Inputs)
#     output_statistics_file(statistics) # Need to update for new stats dict structure

In [32]:
# Note: see definitions above for attributes. Used for test bench
Parameters = {
    # Access actor list if given but otherwise use below
#     'actor_list': [{
#         'stubbornness' : 25,
#         'plan_follower': 'yes',
#         'skill': 1
#     }]*10,
    'identical_actors?': 'yes',
    'max_actors': 5,
    'actor_attributes': {
        'stubbornness': [0,25,50,75,100],
        'plan_follower': ['yes', 'no'],
        'skill': [1],
    },
    'max_stage_length': 10,
    'max_stage_width': 10,
    'adjacency_dependency': 'yes',
    'max_time': 5
}

In [35]:
## Call to either single simulation or to a test bench
t_start = process_time()
# single_tester_simulation()
sim_test_bench(Parameters)
t_stop = process_time() 
print("Elapsed time during the whole program in seconds:", t_stop-t_start)

Elapsed time during the whole program in seconds: 1.5467320310000003


In [36]:
# import multiprocessing as mp
# print("Number of processors: ", mp.cpu_count())

Number of processors:  4
