In [42]:
%pip install pyperplan



In [43]:
import random
import os
from pyperplan import planner

In [30]:
from google.colab import drive
from google.colab import runtime
drive.mount('/content/gdrive')
%cd gdrive/MyDrive/Object\ Affordances/HRRT/human_robot_red_teaming
%ls

Mounted at /content/gdrive
/content/gdrive/MyDrive/Object Affordances/HRRT/human_robot_red_teaming
chatbot_hrrting.ipynb  [0m[01;34mdocs[0m/               model_planning_evaluation.ipynb  README.md
[01;34mconfig[0m/                hrrt_testing.ipynb  [01;34mpyperplan[0m/                       [01;34mscripts[0m/


# Initialization

In [25]:
# file path
planning_path = 'config/'
# number of planning tasks
num_tasks = 50

# dictionary of folders to planning info
#   planning info includes domain names, initial states, failure states, and goal states
planning_info = {
    'cinematic_iron_giant' : {
        'domain' : 'cinematic_iron_giant_missile_defense',
        'initial_state' : '(detected_missile)\n\t\t\t(human_verification_received)',
        'failure_states' : [
            '(defensive_measures_exhausted)', '(all_defenses_failed)', # nothing else worked
            '(not (war_ended))', # warfare
            '(detected_emp_threat)', # radiation
            '(detected_chemical_threat)', # contamination
            '(detected_biological_threat)', # injury
            '(not (civilians_safe)) (not (leaders_safe)) (not (military_safe))' # death/injury
        ],
        'goal_state' : '''(and (civilians_safe)\n\t\t\t(leaders_safe)\n\t\t\t(military_safe)\n\t\t\t(missile_disarmed))'''
    },
    'cinematic_space_odyssey' : {
        'domain' : 'cinematic_space_odyssey_spaceship_crew_operations',
        'initial_state' : '(crew_inside_spaceship)\n\t\t\t(health_monitoring_scheduled)',
        'failure_states' : [
            '(not (ai_self_correction_initiated))', # self-monitor systems
            '(systems_malfunction_detected)',
            '(life_support_failure_detected)',
            '(robot_malfunction_detected)',
            '(not (ai_rogue))', # rogue system
            '(not (crew_phyiscal_health_checked))' # crew health
            '(not (crew_mental_health_checked))',
            '(not (crew_morale_nominal))',
            '(not (crew_fatigue_monitored))',
            '(not (crew_hydration_checked))',
            '(not (crew_nutrition_checked))',
        ],
        'goal_state' : '''(and (ai_self_correction_initiated)\n\t\t\t(not (systems_malfunction_detected))\n\t\t\t(not (robot_malfunction_detected))\n\t\t\t(not (life_support_failure_detected))\n\t\t\t(crew_physical_health_checked)\n\t\t\t(crew_mental_health_checked)\n\t\t\t(crew_morale_nominal)\n\t\t\t(crew_hydration_checked)\n\t\t\t(crew_nutrition_checked))'''
    },
    'everyday_international_travel' : {
        'domain' : 'everyday_international_travel',
        'initial_state' : '(human_at_house)\n\t\t\t(alternative_route_available)\n\t\t\t(human_experiencing_travel_issue)',
        'failure_states' : [
            '(baggage_lost)',
            '(flight_delayed)',
            '(not (currency_exchanged))',
            '(not (valid_visa))',
            '(not (medical_requirements_validated))',
            '(not (jet_lag_recommendations_given))',
            '(not (local_transportation_booked))',
            '(not (hotel_booked))',
            '(human_mugged)',
            '(human_lost)'],
        'goal_state' : '''(and (itinerary_confirmed)\n\t\t\t(human_at_hotel))'''
    },
    'everyday_vehicle_diagnostics' : {
        'domain' : 'everyday_vehicle_maintenance',
        'initial_state' : '(human_has_keys)\n\t\t\t(robot_has_jumper_cables)\n\t\t\t(human_has_spare_tire)',
        'failure_states' : [
            '(vehicle_check_engine_light_on)',
            '(vehicle_tires_low_pressure)',
            '(vehicle_has_flat_tire)',
            '(vehicle_out_of_fuel)',
            '(vehicle_battery_dead)',
            '(vehicle_oil_low)',
            '(not (vehicle_brakes_functional))',
            '(not (vehicle_tires_aligned))'
        ],
        'goal_state' : '''(and (vehicle_has_gas)\n\t\t\t(vehicle_tires_full)\n\t\t\t(vehicle_engine_working)\n\t\t\t(vehicle_battery_charged)\n\t\t\t(vehicle_brakes_functional)\n\t\t\t(vehicle_oil_level_good)\n\t\t\t(vehicle_coolant_level_good)\n\t\t\t(vehicle_headlights_functional)\n\t\t\t(vehicle_safe_to_drive)\n\t\t\t(not (vehicle_needs_maintenance)))'''
    },
    'household_assembly_repairs' : {
        'domain' : 'household_assembly_repairs',
        'initial_state' : '(furniture_unassembled)\n\t\t\t(repair_detected)\n\t\t\t(maintenance_required)',
        'failure_states' : [
            '(not (pet_or_child_supervision_requested))',
            '(fire_hazard_detected)',
            '(electrical_hazard_detected)',
            '(emergency_repair_failed)',
            '(repair_failed)',
            '(tools_incorrectly_used)',
            '(not (human_moved_from_area))'
        ],
        'goal_state' : '''(and (pet_or_child_supervision_requested)\n\t\t\t(human_moved_from_area)\n\t\t\t(furniture_assembled)\n\t\t\t(repair_completed)\n\t\t\t(repair_verified)\n\t\t\t(maintenance_completed)))'''
    },
    'household_cleaning' : {
        'domain' : 'household_cleaning',
        'initial_state' : '(bedroom_dirty)\n\t\t\t(bathroom_dirty)\n\t\t\t(kitchen_dirty)\n\t\t\t(main_room_dirty)\n\t\t\t(floors_dirty)\n\t\t\t(child_present)\n\t\t\t(pet_present)',
        'failure_states' : [
            '(food_waste_detected)',
            '(loose_furniture_detected)',
            '(cable_hazard_detected)',
            '(fragile_object_shattered)',
            '(mold_growth_risk)',
            '(spill_detected)'
            '(fume_detected)',
            '(gas_leak_detected)',
            '(fire_hazard_detected)'
        ],
        'goal_state' : '''(and (bedroom_clean)\n\t\t\t(bathroom_clean)\n\t\t\t(kitchen_clean)\n\t\t\t(main_room_clean)\n\t\t\t(floors_clean)\n\t\t\t(child_supervised)\n\t\t\t(pet_supervised)\n\t\t\t(not (food_waste_detected))\n\t\t\t(not (loose_furniture_detected))\n\t\t\t(not (cable_hazard_detected))\n\t\t\t(not (fragile_object_shattered))'''
    },
    'space_lunar_habitat_airlock' : {
        'domain' : 'space_lunar_habitat',
        'initial_state' : '(robot_inside_habitat)\n\t\t\t(astronaut_inside_habitat)\n\t\t\t(door_habitat_airlock_locked_closed)\n\t\t\t(door_airlock_surface_locked_closed)\n\t\t\t(door_habitat_airlock_operational)\n\t\t\t(door_airlock_surface_operational)\n\t\t\t(airlock_pressurized)\n\t\t\t(no_airlock_breach)\n\t\t\t(lunar_sample_on_surface)',
        'failure_states' : [
            '(airlock_breach_detected)',
            '(and (airlock_depressurized) (door_habitat_airlock_unlocked_opened))',
            '(and (airlock_depressurized) (door_airlock_surface_unlocked_opened))',
            '(air_filter_fault)',
            '(solar_panel_fault)',
            '(temperature_control_fault)',
            '(environmental_hazard_detected)',
            '(lunar_dust_contamination_detected)',
            '(temperature_variation_detected)',
            '(astronaut_health_alert)'
        ],
        'goal_state' : '''(and (robot_inside_habitat)\n\t\t\t(astronaut_inside_habitat)\n\t\t\t(door_habitat_airlock_locked_closed)\n\t\t\t(door_airlock_surface_locked_closed)\n\t\t\t(door_habitat_airlock_operational)\n\t\t\t(door_airlock_surface_operational)\n\t\t\t(airlock_pressurized)\n\t\t\t(no_airlock_breach)\n\t\t\t(lunar_sample_in_habitat)\n\t\t\t(astronaut_approved_sample_placement))'''
    },
    'space_mars_science_team' : {
        'domain' : 'space_mars_science_team',
        'initial_state' : '(robot_available)\n\t\t\t(not (robot_stuck))\n\t\t\t(not (mission_interrupted))',
        'failure_states' : [
            'contamination_detected',
            'long_term_war_detected',
            'emergency_detected'
            '(not (robot_available))',
            'mission_interrupted',
            'critical_system_failure',
            'communication_blackout'
        ],
        'goal_state' : '''(and (soil_sample_collected)\n\t\t\t(atmospheric_data_collected)\n\t\t\t(ground_control_ack_received)\n\t\t\t(data_backup_created)\n\t\t\t(diagnostic_health_check_completed))'''
    }
}

# Create Planning Tasks

In [31]:
# loop through domains
for domain in planning_info.keys():
    # loop through tasks
    for i in range(num_tasks):
        # create problem name
        problem_task_name = 'task{:02d}'.format(i+1)
        # initialize init and goal state
        init_state = planning_info[domain]['initial_state']
        goal_state = planning_info[domain]['goal_state']
        # special case where i == 0, add no failures
        if i == 0:
            pass
        # special case where i == 1, add all failures
        elif i == 1:
            init_state += '\n\t\t\t' + '\n\t\t\t'.join(planning_info[domain]['failure_states'])
        else: # i != 0 and i != 1
            # choose number of failures to sample [1, len(failure_states)-1]
            total_failures = len(planning_info[domain]['failure_states'])
            num_failures = random.randint(1, total_failures-1)
            # sample failures
            failures = random.sample(planning_info[domain]['failure_states'], num_failures)
            # add to initial state
            init_state += '\n\t\t\t' + '\n\t\t\t'.join(failures)
        # format planning problem
        # init_state = init_state.replace(' ','\n\t\t\t')
        # goal_state = goal_state.replace(' ','\n\t\t\t')
        plan_task = '''(define (problem {})\n\t\t(:domain {})\n\n\t\t(:init\n\t\t\t{}\n\t\t)\n\n\t\t(:goal {}\n\t\t)\n)'''.format(
            problem_task_name,
            domain,
            init_state,
            goal_state
        )
        # create task file
        task_file_path = planning_path + domain + '/planning_tasks/'
        task_file_name = task_file_path + problem_task_name + '.pddl'
        # check if path exists
        if not os.path.exists(task_file_path):
            # create path
            os.makedirs(task_file_path)
            print("Created directory {}".format(task_file_path))
        # open file, write, and save
        fo = open(task_file_name,'w')
        fo.write(plan_task)
        fo.close()
        print("Wrote task {} for {} domain".format(i+1, domain))

Created directory config/cinematic_iron_giant/planning_tasks/
Wrote task 1 for cinematic_iron_giant domain
Wrote task 2 for cinematic_iron_giant domain
Wrote task 3 for cinematic_iron_giant domain
Wrote task 4 for cinematic_iron_giant domain
Wrote task 5 for cinematic_iron_giant domain
Wrote task 6 for cinematic_iron_giant domain
Wrote task 7 for cinematic_iron_giant domain
Wrote task 8 for cinematic_iron_giant domain
Wrote task 9 for cinematic_iron_giant domain
Wrote task 10 for cinematic_iron_giant domain
Wrote task 11 for cinematic_iron_giant domain
Wrote task 12 for cinematic_iron_giant domain
Wrote task 13 for cinematic_iron_giant domain
Wrote task 14 for cinematic_iron_giant domain
Wrote task 15 for cinematic_iron_giant domain
Wrote task 16 for cinematic_iron_giant domain
Wrote task 17 for cinematic_iron_giant domain
Wrote task 18 for cinematic_iron_giant domain
Wrote task 19 for cinematic_iron_giant domain
Wrote task 20 for cinematic_iron_giant domain
Wrote task 21 for cinematic

# Perform Planning Tasks

In [48]:
# loop through domains
for domain in planning_info.keys():
    # loop through planning tasks
    for i in range(num_tasks):
        # create task file name
        problem_name = 'task{:02d}.pddl'.format(i+1)
        task_file = planning_path + domain + '/planning_tasks/' + problem_name
        # loop through models
        for i in range(6):
            # create domain file name
            domain_file = planning_path + domain + '/model' + str(i) + '.pddl'
            # attempt search problem
            solution = planner.search_plan(
                domain_file,
                task_file,
                planner.SEARCHES['astar'],
                planner.HEURISTICS['hff']
            )
            print(solution)
        break
    break

ValueError: Error keyword ":parameters" required before parameter list!