In [None]:
from pddl.logic import Predicate, constants, variables
from pddl.core import Domain, Problem, Action, Requirements
from pddl.formatter import domain_to_string, problem_to_string
from pddl import parse_domain, parse_problem

from pddl.logic.effects import When, AndEffect
from pddl.logic.base import Not, And
from pddl.logic.predicates import EqualTo
from pddl.custom_types import name




import copy

import sys, os, csv


import pandas as pd

import re



## Functions

In [None]:
def get_all_folder_names(path):
    folder_names = []
    for root, _, _ in os.walk(path):
        folder_name = os.path.basename(root)
        if folder_name.startswith('p'):
            folder_names.append(folder_name)
    folder_names = sorted(list(set(folder_names)))
    return folder_names


In [None]:
import random
import os

def extract_random_subset(input_filename):
    # Read the input file
    with open(input_filename, 'r') as f:
        lines = f.readlines()

    # Determine the maximum number of lines to be extracted
    max_lines = min(5, len(lines))
    
    # Generate a random number between 0 and this maximum
    num_lines = random.randint(0, max_lines)

    # Extract the specified number of lines
    subset = lines[:num_lines]
    if subset and subset[-1].endswith("\n"):
            subset[-1] = subset[-1].rstrip("\n")
    # Derive the output filename
    base, ext = os.path.splitext(input_filename)
    output_filename = f"{base}_upto{ext}"

    # Save these lines to the output file
    with open(output_filename, 'w') as f:
        f.writelines(subset)
        
    return output_filename

In [None]:
def merge_files(file1_path, file2_path, output_path):
    merged_content = []

    # Read content from the first file and append to the merged content
    with open(file1_path, 'r') as f1:
        for line in f1:
            line = line.strip()
            if line and not line.startswith(";"):
                merged_content.append(line + '\n')

    # Read content from the second file, filter out lines starting with ';' or empty lines, and append to the merged content
    with open(file2_path, 'r') as f2:
        for line in f2:
            line = line.strip()
            if line and not line.startswith(";"):
                merged_content.append(line + '\n')
    if merged_content and merged_content[-1].endswith("\n"):
            merged_content[-1] = merged_content[-1].rstrip("\n")
    # Write the merged content to the output file
    with open(output_path, 'w') as out_file:
        out_file.writelines(merged_content)

    print(f"Merged content saved to {output_path}")

In [None]:
def get_prob_template_for_hyps (domain_folder_path, problem_file):    
    problem_file = parse_problem ( problem_file)
    problem_file_string = problem_to_string(problem_file)
    # Find the position of '(:goal'
    start_pos = problem_file_string.find('(:goal')

    # Check if '(:goal' was found
    if start_pos != -1:
        # Replace everything after '(:goal' with '<HYPOTHESIS>)))'
        problem_file_string = problem_file_string[:start_pos] + '(:goal \n(and \n \t<HYPOTHESIS> \n)))'

    with open(f"{domain_folder_path}/template.pddl", 'w') as file:
        file.write(problem_file_string)

In [None]:
def create_obs_dat_file(sas_plan_file_path, obs_file_path):
    with open(sas_plan_file_path, 'r') as file:
        lines = file.readlines()

    # Get rid of last line which is cost
    lines = lines[:-1]

    print(len(lines))
    

    num_lines_to_keep = len(lines)


    # Get the first one-third of the lines
    first_one_third_lines = lines[:num_lines_to_keep]
    
    # Check if the last line of first_one_third_lines ends with '\n'
    # If it does, remove the '\n'
    if first_one_third_lines and first_one_third_lines[-1].endswith('\n'):
        first_one_third_lines[-1] = first_one_third_lines[-1].rstrip('\n')

    # Save the first one-third of the lines to the new file
    with open(obs_file_path, 'w') as new_file:
        new_file.writelines(first_one_third_lines)


In [None]:
directory_path_1 = 'xyz'

In [None]:
domain_path_list = [directory_path_1]

In [None]:
folders = get_all_folder_names(directory_path_1)


In [None]:
folders[5:6]

# Merge files

In [None]:
file_name_list_after_t_merger = ['merged_domain_with_constants.pddl','merged_problem_for_obs.pddl', 'merged_domain.pddl', 'merged_problem.pddl','hyps.dat']

for a_domain_path in domain_path_list:
    problem_instance_folder_list = get_all_folder_names(a_domain_path)
    
    for a_problem_instance in problem_instance_folder_list[5:6]:
        
        print(a_problem_instance)
        merger_path = f'{a_domain_path}/t_merger.py'
        human_domain = f'{a_domain_path}/{a_problem_instance}/human_domain.pddl'
        human_problem = f'{a_domain_path}/{a_problem_instance}/robot_problem.pddl'
        robot_domain = f'{a_domain_path}/{a_problem_instance}/robot_domain.pddl'
        
        os.system(f"python {merger_path} -h_d {human_domain} -h_i {human_problem} -r_d {robot_domain} ")

        current_directory = os.getcwd()
        
        for a_merged_file in file_name_list_after_t_merger:
            source_file_path = f'{current_directory}/{a_merged_file}'
            destination_file_path = f'{a_domain_path}/{a_problem_instance}/{a_merged_file}'
            
            os.rename(source_file_path, destination_file_path)
        
        get_prob_template_for_hyps(f'{a_domain_path}/{a_problem_instance}', f"{a_domain_path}/{a_problem_instance}/merged_problem.pddl")


## Get observation
From human_domain

In [None]:
import os


for a_domain_path in domain_path_list:
    problem_instance_folder_list = get_all_folder_names(a_domain_path)
    
    for a_problem_instance in problem_instance_folder_list[5:6]:
        
        print(a_problem_instance)
        problem_file_updater = f'{a_domain_path}/t_upd_prob_file_with_obs_list_not_merged.py'
        
        print('a_problem_instance:', a_problem_instance)
        
        
        human_domain = f'{a_domain_path}/{a_problem_instance}/human_domain.pddl'
        init_human_problem = f'{a_domain_path}/{a_problem_instance}/robot_problem.pddl'
        
        
        non_fail_merged_robot_problem = f'{a_domain_path}/{a_problem_instance}/merged_problem_for_obs.pddl'
        merged_domain = f'{a_domain_path}/{a_problem_instance}/merged_domain.pddl'

        FD_PATH = "/Users/xyz/Documents/downward-main_2/fast-downward.py"
        rest = "--search 'lazy_greedy([ff()])'"

        os.system(f"{FD_PATH} {merged_domain} {non_fail_merged_robot_problem} {rest} ")
        
        current_directory = os.getcwd()

        
        #1) Run merged domain where robot cant fail to get initial plan
        sas_plan_path = f'{current_directory}/sas_plan'
        obs_file_path = f'{a_domain_path}/{a_problem_instance}/obs_robot_not_failed.dat'
        
        #get input plan and outputs to the file path
        create_obs_dat_file(sas_plan_path, obs_file_path)

        #2) get up to 5 random non failure observations (it cna be 0 also)
        output_obs =  extract_random_subset(obs_file_path)
        
        #3) Iterate the problem file upto that moment
        os.system(f"python {problem_file_updater} -d {human_domain} -i {init_human_problem} -o {output_obs}")        
        #this is j+1 bc we'll use this for the next step, current problem has been used already
        os.rename('temp_iterated_problem.pddl',f'temp_iterated_problem_human.pddl')

        
        #4) Run human domain with updated file to get failure observation to merge with non failures
        
        
        
        os.system(f"{FD_PATH} {human_domain} temp_iterated_problem_human.pddl {rest} ")
        
        #first input, second input, output path
        merge_files(output_obs,'sas_plan',f'{a_domain_path}/{a_problem_instance}/obs.dat' )



    

#         os.remove('sas_plan')
        

        



In [None]:
Valid_PATH = "/Users/xyz/Documents/VAL-master_2/build/macos64/Release/Val--Darwin/bin/./Validate"


for a_domain_path in domain_path_list:
    problem_instance_folder_list = get_all_folder_names(a_domain_path)
    
    for a_problem_instance in problem_instance_folder_list:
        print('a_problem_instance:', a_problem_instance)



        human_problem = f'{a_domain_path}/{a_problem_instance}/robot_problem.pddl'
        human_domain = f'{a_domain_path}/{a_problem_instance}/human_domain.pddl'


        plan_file = f'{a_domain_path}/{a_problem_instance}/obs.dat'


        os.system(f"{Valid_PATH} {human_domain} {human_problem} {plan_file}" )
        
        