In [1]:
from animalai.envs.actions import AAIActions, AAIAction
from animalai.envs.raycastparser import RayCastParser
from animalai.envs.raycastparser import RayCastObjects
from animalai.envs.environment import AnimalAIEnvironment
from mlagents_envs.envs.unity_gym_env import UnityToGymWrapper
import random
import pandas as pd
import numpy as np
from typing import NamedTuple, Dict, Optional, List

#Braitenberg vehicle 1) Moves a few steps forward [to avoid clear walls] 2) Moves towards walls 3) Moves a few steps randomly 4)Moves towards target [which will hopefully be in sight and they will not be stuck by a wall]


## Parameters

In [2]:
aai_seed = random.randint(
    0, 10000
) # set the seed for the random number generator, and then vary this to make a population of agents

NUM_RAYS = 7 # number of rays to use in raycast, see here: https://github.com/Kinds-of-Intelligence-CFI/animal-ai/blob/main/docs/observations.md
STEP_LENGTH = 5 # how many steps to take before taking a random action, vary this to make a population of agents
NUM_EPISODES = 18 # add number of episodes for testing

agent_inference = True

agent_name = "BraitenbergVehicle"

## Paths

In [3]:
configuration_file = r"/Users/nataszasiwinska/Documents/research_projects/animal-ai-main/configs/combined_short.yml"
env_path = r"/Users/nataszasiwinska/Documents/research_projects/animal-ai-main/env/AAI3Mac.app"
log_folder_path = r"/Users/nataszasiwinska/Documents/research_projects/results"

In [4]:
# Braitenberg vehicle design by M. D. Crosby.
# Adapted by K. Voudouris where noted.
# Futher adapted by N. Siwinska 

class Braitenberg():
    """Implements a simple Braitenberg vehicle agent that heads towards food
    Can change the number of rays but only responds to GOODGOALs, GOODGOALMULTI and BADGOAL"""
    def __init__(self, no_rays, step_length):
        self.no_rays = no_rays
        assert(self.no_rays % 2 == 1), "Only supports odd number of rays (but environment should only allow odd number)"
        
        self.step_length = step_length
        assert(self.step_length >= 1), "Only supports step lengths greater than or equal to 1"
        
        
        """
        Note that BADGOAL includes both red goals AND death zones.
        """
        self.listOfObjects = [RayCastObjects.GOODGOAL, RayCastObjects.GOODGOALMULTI, RayCastObjects.BADGOAL, RayCastObjects.IMMOVABLE]
        self.raycast_parser = RayCastParser([RayCastObjects.GOODGOAL, RayCastObjects.GOODGOALMULTI, RayCastObjects.BADGOAL, RayCastObjects.IMMOVABLE], self.no_rays)
        self.actions = AAIActions()
        self.prev_action = self.actions.NOOP

    def prettyPrint(self, obs) -> str:
        """Prints the parsed observation"""
        return self.raycast_parser.prettyPrint(obs)
    
    def get_action(self, obs, step_length) -> AAIAction:
        """Returns the action to take given the current parsed raycast observation"""
        obs = self.raycast_parser.parse(obs)
        newAction = self.actions.NOOP
        if self.ahead(obs, RayCastObjects.GOODGOAL):
            newAction = self.actions.FORWARDS
        elif self.left(obs, RayCastObjects.GOODGOAL):
            newAction = self.actions.FORWARDSLEFT
        elif self.right(obs, RayCastObjects.GOODGOAL):
            newAction = self.actions.FORWARDSRIGHT
        #elif self.ahead(obs, RayCastObjects.BADGOAL):
            #newAction = self.actions.BACKWARDS
        #elif self.left(obs, RayCastObjects.BADGOAL):
            #newAction = self.actions.BACKWARDSLEFT
        #elif self.right(obs, RayCastObjects.BADGOAL):
            #newAction = self.actions.BACKWARDSRIGHT
        else:
            #If the agent is not at or further than the y=1.3 coordinate (as there is no advantage to random movement when there are clear walls on either side), the direction it should take is straight. 
            if np.array(movement[1] > 1.3) == True: 
                newAction = self.actions.FORWARDS
            #if self.prev_action == self.actions.NOOP or self.prev_action == self.actions.BACKWARDS:
            take_steps = random.randint(0, step_length) 
            if take_steps != 0:
                newAction = self.prev_action
                rand_action = random.randint(0,15) # pick either random actions (56% chance) or reactions based on sensing immovable objects straight ahead (44% chance) 
                if  (rand_action == 0):
                    newAction = self.actions.NOOP
                elif (rand_action == 1):
                    newAction = self.actions.FORWARDS
                elif (rand_action == 2): 
                    newAction = self.actions.FORWARDSRIGHT
                elif (rand_action == 3):
                    newAction = self.actions.RIGHT
                elif (rand_action == 4):
                    newAction = self.actions.BACKWARDSRIGHT
                elif (rand_action == 5):
                    newAction = self.actions.BACKWARDS
                elif (rand_action == 6):
                    newAction = self.actions.BACKWARDSLEFT
                elif (rand_action == 7):
                    newAction = self.actions.LEFT
                elif (rand_action == 8):
                    newAction = self.actions.FORWARDSLEFT
                elif (rand_action == 9 or 10 or 11 or 12 or 13 or 14 or 15):
                    if self.ahead(obs, RayCastObjects.IMMOVABLE):
                        #If an object is sensed in front,then perform random actions as per the chances given;
                        newAction = self.prev_action
                        rand_action_wall = random.randint(0,10) # pick one of 6 actions randomly, with the highest chance being a forward movement (55% forwards left, forwards right or forwards)
                        if  (rand_action_wall == 0 ):
                            newAction = self.actions.BACKWARDS
                        elif (rand_action_wall == 1 or 2):
                            newAction = self.actions.FORWARDSRIGHT
                        elif (rand_action_wall == 3 or 4): 
                            newAction = self.actions.FORWARDSLEFT
                        elif (rand_action_wall == 5):
                            newAction = self.actions.BACKWARDSLEFT
                        elif (rand_action_wall == 6): 
                            newAction = self.actions.BACKWARDSRIGHT
                        elif (rand_action_wall == 7):
                            newAction = self.actions.LEFT
                        elif (rand_action_wall == 8):
                            newAction = self.actions.RIGHT
                        elif (rand_action_wall == 9 or 10):
                            newAction = self.actions.FORWARDS
                    elif self.left(obs, RayCastObjects.IMMOVABLE):
                        newAction = self.actions.FORWARDSLEFT
                    elif self.right(obs, RayCastObjects.IMMOVABLE):
                        newAction = self.actions.FORWARDSRIGHT
                    elif self.ahead(obs, RayCastObjects.BADGOAL):
                        newAction = self.actions.BACKWARDS
                    elif self.left(obs, RayCastObjects.BADGOAL):
                        newAction = self.actions.BACKWARDSLEFT
                    elif self.right(obs, RayCastObjects.BADGOAL):
                        newAction = self.actions.BACKWARDSRIGHT
            #    newAction = self.prev_action  
            #elif self.ahead(obs, RayCastObjects.BADGOAL):
                #newAction = self.actions.BACKWARDS
            #elif self.left(obs, RayCastObjects.BADGOAL):
                #newAction = self.actions.BACKWARDSLEFT
            #elif self.right(obs, RayCastObjects.BADGOAL):
                #newAction = self.actions.BACKWARDSRIGHT
        self.prev_action = newAction
        return newAction
        


    def ahead(self, obs, object):
        """Returns true if the input object is ahead of the agent"""
        if(obs[self.listOfObjects.index(object)][int((self.no_rays-1)/2)] > 0):
            # print("found " + str(object) + " ahead")
            return True
        return False

    def left(self, obs, object):
        """Returns true if the input object is left of the agent"""
        for i in range(int((self.no_rays-1)/2)):
            if(obs[self.listOfObjects.index(object)][i] > 0):
                # print("found " + str(object) + " left")
                return True
        return False

    def right(self, obs, object):
        """Returns true if the input object is right of the agent"""
        for i in range(int((self.no_rays-1)/2)):
            if(obs[self.listOfObjects.index(object)][i+int((self.no_rays-1)/2) + 1] > 0):
                # print("found " + str(object) + " right")
                return True
        return False

In [5]:
random.seed(aai_seed) #set seed for random action selection

port = 5005 + random.randint(
    0, 1000
)  # use a random port to avoid problems if a previous version exits slowly

aai_env = AnimalAIEnvironment( 
    inference=agent_inference,
    seed = aai_seed,
    log_folder = log_folder_path,
    worker_id=aai_seed,
    file_name=env_path,
    arenas_configurations=configuration_file,
    base_port=port,
    useCamera=False, # uses raycasts instaed
    #resolution=36,
    useRayCasts=True,
    raysPerSide = int((NUM_RAYS-1)/2),
    rayMaxDegrees = 30
)

#env = UnityToGymWrapper(aai_env, uint8_visual=False, allow_multiple_obs=False, flatten_branched=True)

obs = aai_env.reset()

braitenbergAgent = Braitenberg(NUM_RAYS, STEP_LENGTH)
behavior = list(aai_env.behavior_specs.keys())[0] # by default should be AnimalAI?team=0

reward_list = []
firststep = True
for _episode in range(NUM_EPISODES): #Run episodes with the Braitenberg-style agent
    if firststep:
        aai_env.step() # Need to make a first step in order to get an observation.
        firstep = False
    dec, term = aai_env.get_steps(behavior)
    done = False
    episodeReward = 0
    while not done:
        raycasts = aai_env.get_obs_dict(dec.obs)["rays"] # Get the raycast data
        # print(braitenbergAgent.prettyPrint(raycasts)) #print raycasts in more readable format
        movement = aai_env.get_obs_dict(dec.obs)["position"]
        # print(movement)
        action = braitenbergAgent.get_action(raycasts, STEP_LENGTH)
        # print(action)
        aai_env.set_actions(behavior, action.action_tuple)
        aai_env.step()      
        dec, term = aai_env.get_steps(behavior)
        if len(dec.reward) > 0:
            episodeReward += dec.reward
        if len(term) > 0: #Episode is over
            episodeReward += term.reward
            print(F"Episode Reward: {episodeReward}")
            reward_list.append(episodeReward)
            done = True
            firststep = True
aai_env.close()

reward_df = pd.DataFrame(reward_list, columns = ['finalRewards'])

csv_path = log_folder_path + "/" + agent_name + "_NumRays_" + str(NUM_RAYS) + "_Seed_" + str(aai_seed) + "_StepLength_" + str(STEP_LENGTH) + ".csv"
reward_df.to_csv(csv_path, index = False)

UnityEnvironmentException: Environment shut down with return code -9 (SIGKILL).

In [None]:
aai_env.close()


array(True)