In [2]:
import os

from os.path import  join, isdir
from plan import Plan
from action import Action
from utils import load_from_folder
from multiprocess import Pool
import random
from logging import exception
import re

random.seed(42)

In [63]:
save_dir = './new_plans/'
data_base_dir = '../datasets/'
domain = 'logistics'
results_dir = f"{save_dir}/{domain}/"   
source_dir = f"{join(data_base_dir, domain)}/optimal_plans/dictionaries_and_plans/" 
print('Domain dir:', source_dir)
os.makedirs(save_dir, exist_ok=True)
os.makedirs(results_dir, exist_ok=True)

plans_to_process = 10 # number of plans to process
versions_per_plan = 6 # number of versions per each plan
number_of_goals = 4 # number of goals per each new plan
test = True # test will process only 3 plans

Domain dir: ../datasets/logistics/optimal_plans/dictionaries_and_plans/


In [4]:
plans = load_from_folder(source_dir,["plans"])[0]
print(f"Plans: {len(plans)}")

plans loaded from ../datasets/logistics/optimal_plans/dictionaries_and_plans/
Plans: 47769


In [9]:
def compute_recognizability(current_goal_state, goal_state_list):
    """
    Compute the difficulty of a plan.
    :param plan: The plan to compute the difficulty of.
    :param goal_state_list: The list of goal states.
    :return: The difficulty of the plan.
    """
    max_recognizability  = 1*len(current_goal_state)
    min_recognizability  = 1/(len(goal_state_list)) * len(current_goal_state)
    
    print(f"Max recognizability : {max_recognizability}")
    print(f"Min recognizability : {min_recognizability}")
    
    sum = 0
    #need to count how many times the current goal fluent is in the goal state list
    for current_goal_fluent in current_goal_state:
        count = 0
        for goal_state in goal_state_list:
            for goal_fluent in goal_state:       
                if current_goal_fluent==goal_fluent:
                    count += 1
        sum += 1/count
    
    print(f"Unscaled recognizability: {sum}")
    
    #normalize the recognizability 
    recognizability = (sum-min_recognizability) / (max_recognizability-min_recognizability)
    
    return recognizability
                    

In [10]:
#compute difficulty testing
current_goal_state = [6, 5, 7, 9]
goal_state_list = [[1, 2, 3, 4], 
                   [1, 2, 3, 4], 
                   [1, 2, 3, 4], 
                   [1, 2, 3, 4], 
                   [1, 2, 3, 4]]
goal_state_list.append(current_goal_state)
rec = compute_recognizability(current_goal_state, goal_state_list)
print(f"recognizability : {rec}")

Max recognizability : 4
Min recognizability : 0.6666666666666666
Unscaled recognizability: 4.0
recognizability : 1.0


In [None]:
#chatgpt code, to test and check

import math, random

def make_goal_states(current_goal_state, N, R, filler_pool):
    M = len(current_goal_state)
    # 1. target sum
    S_t = (M/N) * (R*(N-1) + 1)
    # 2. ideal count
    c_real = N / (R*(N-1) + 1)
    c_low = math.floor(c_real)
    c_high = c_low + 1
    # 3. solve for how many get c_low
    if c_low == c_high:
        ks = M
    else:
        k = (S_t - M/c_high) / (1/c_low - 1/c_high)
        ks = int(round(k))
    counts = ([c_low]*ks + [c_high]*(M-ks))
    random.shuffle(counts)

    # 4. distribute into bins
    bins = [[] for _ in range(N)]
    for fluent, c in zip(current_goal_state, counts):
        chosen = random.sample(range(N), c)
        for b in chosen:
            bins[b].append(fluent)

    #! is this a problem? probabably yes
    # 5. fill with dummy fluents if you want equal lengths
    target_len = max(len(b) for b in bins)
    for b in bins:
        while len(b) < target_len:
            b.append(random.choice(filler_pool))

    return bins

# Example usage:
current = ['a','b','c','d']
fillers = list(range(100,200))
goals = make_goal_states(current, N=6, R=0.9, filler_pool=fillers)
print(goals)

[['b'], [117], ['a'], ['d'], ['c'], [154]]


In [None]:
#if i want nine classes of recognizability i want unscaled
#divide a range 0.6666 to 4 in 9 classes
#4-0.6666 = 3.3334
#3.3334/9 = 0.3704
classes = []
for i in range(1, 10):
    classes.append(0.67 + i*0.3704)
print(f"Unscaled Classes: {[round(x,2) for x in classes]}")
print(f"Scaled Classes: {[round((x-0.67) / (4-0.67),2) for x in classes]}")




Unscaled Classes: [1.04, 1.41, 1.78, 2.15, 2.52, 2.89, 3.26, 3.63, 4.0]
Scaled Classes: [0.11, 0.22, 0.33, 0.44, 0.56, 0.67, 0.78, 0.89, 1.0]


In [64]:
def write_and_save_plan(plan, goal_set, obj_set_dict={}, i=0):
    #extract plan name with regex
    name = re.search(r"(p\d+)(?=\.)", plan.plan_name).group(1)
    
    #definition
    new_problem = f"(define (problem {domain}_{name}_{i})\n(:domain {domain})\n(:objects\n\t"
    
    #objects in a dcit format, {type: obj_set}
    for type, obj_set in obj_set_dict.items():
        if len(obj_set) > 0:
            for obj in obj_set:
                new_problem += f"{obj} "
            new_problem += f"- {type}\n\t"
    new_problem += f")\n"
    
    #initial state
    new_problem += f"(:init\n"
    for fluent in plan.initial_state:
        new_problem += f"\t{fluent}\n"
    new_problem += f")\n"
    
    #goal state
    new_problem += f"(:goal (and\n"
    for goal in goal_set:
        new_problem += f"\t{goal}\n"
    new_problem += f"))\n)"
    #print(new_problem + "\n\n")
    
    #save the new problem in a file
    #naming convention is {current plan name_version number.pddl}
    new_problem_dir = f"{results_dir}/{name}/"
    os.makedirs(new_problem_dir, exist_ok=True)
    new_problem_file = f"{new_problem_dir}/{name}_{i}.pddl"
    with open(new_problem_file, "w") as f:
        f.write(new_problem)

In [66]:
#process the objects in the plan then create the variations
count = 0
for plan in plans:
    if test:
        if count >= 3:
            break
    elif count > plans_to_process:
        break
    
    #begin plan processing
    
    all_obj_set = set()
    package_for_goal_set = set()
    pos_for_goal_set = set()
    
    #* find all objects in the initial state and actions
    for line in plan.initial_state:
        for obj in line.split(" ")[1:]:
            all_obj_set.add(obj)
    for action in plan.actions:
        for fluent in action.positiveEffects:
            for obj in fluent.split(" ")[1:]:
                all_obj_set.add(obj)
        for fluent in action.negativeEffects:
            for obj in fluent.split(" ")[1:]:
                all_obj_set.add(obj)
        for fluent in action.precondition:
            for obj in fluent.split(" ")[1:]:
                all_obj_set.add(obj)
    
    #split the objects in their types
    pos_set = set()
    apn_set = set()
    cit_set = set()
    apt_set = set()
    tru_set = set()
    pack_set = set()
    obj_set_dict = {}
    for obj in all_obj_set:
        if obj.startswith("pos"):
            pos_set.add(obj)
            pos_for_goal_set.add(obj) #these will be used for goal creation
        elif obj.startswith("obj"):
            pack_set.add(obj)
            package_for_goal_set.add(obj) #these will be used for goal creation
        elif obj.startswith("apn"):
            apn_set.add(obj)
        elif obj.startswith("cit"):
            cit_set.add(obj)
        elif obj.startswith("tru"):
            tru_set.add(obj)
        elif obj.startswith("apt"):
            apt_set.add(obj)
    
    if len(pos_set) > 0:
        obj_set_dict["location"] = pos_set
    if len(apn_set) > 0:
        obj_set_dict["airplane"] = apn_set
    if len(cit_set) > 0:
        obj_set_dict["city"] = cit_set
    if len(apt_set) > 0:
        obj_set_dict["airport"] = apt_set
    if len(tru_set) > 0:
        obj_set_dict["truck"] = tru_set
    if len(pack_set) > 0:
        obj_set_dict["package"] = pack_set
    
    #raise an exception if number of goals > number of packages or positions
    if number_of_goals > len(package_for_goal_set):
        raise exception(f"Number of goals {number_of_goals} is greater than the number of objects {len(package_for_goal_set)}")

    if number_of_goals > len(pos_for_goal_set):
        raise exception(f"Number of goals {number_of_goals} is greater than the number of positions {len(pos_for_goal_set)}")

    
    goals_state_list = []
    #* now we start creating the new versions of the problem from the current one
    for i in range(0, versions_per_plan):
        
        #? do we need to keep the original plan?
        
        #working copy of the sets as we have to prevent a package being in different positions
        package_for_goal_set_copy = package_for_goal_set.copy()
        
        #? can two packages be in the same position?
        #? for example |at obj1 pos1| e |at obj2 pos1|
        pos_for_goal_set_copy = pos_for_goal_set.copy()
        
        #create a set of goals of the type [at obj pos] 
        #by picking a random package and a random position from the sets
        #todo how to not have same goal_sets in two different versions? do we care about it enough?
        goal_set= set()    
        for k in range(0, number_of_goals):
            random_package = random.choice(list(package_for_goal_set_copy))
            #print(f"Random package: {random_package}")
            package_for_goal_set_copy.remove(random_package) # so we don't have conflicting goals
            random_pos = random.choice(list(pos_for_goal_set_copy))
            #print(f"Random position: {random_pos}")
            #? can two packages be in the same position?
            #? for example |at obj1 pos1| e |at obj2 pos1|
            #pos_for_goal_set_copy.remove(random_pos)
            goal_set.add(f"at {random_package} {random_pos}") 
        #print(f"Goal set: {goal_set}")
        
         #* now we build the new pddl file with these goals
        goals_state_list.append(goal_set)
        
        write_and_save_plan(plan=plan, goal_set=goal_set, obj_set_dict=obj_set_dict, i=i)
        
    count = count + 1