# Artificial Intelligence - Fall 2020 - Homework4

# DEADLINE 25th of January 2021

## _Automated Planning - Graphplan_

c: Alexandra Dobrescu <alexandramaria.digital@gmail.com>

## Introduction

The goal of this homework is to become familiar with and implement the construction of a planning algorithm, Graphplan.

### The planning graph

The planning graph is a structure that comes to the aid of a planning algorithm to determine what actions can be performed at a given time and what are the possible simultaneous actions and the possible conflicts between these actions and facts.

The planning graph is an acyclic oriented graph that is organized into alternate state/action levels.

The first level has facts (or denials of them) as nodes, and these have no entry edges. The next level has actions as nodes and contains all the actions that could be performed given the facts on the first level. The third level contains, as nodes, the facts that could result from the actions on the second level, etc.

At each level, there may be mutual exclusion relationships (mutex) between nodes, in the sense that the two nodes could not exist / could not be achieved at the same time.

**_Useful resources:_**

Artificial Intelligence: A Modern Approach (Russel & Norvig), 11.4 Chapter (395-402)
http://aima.cs.berkeley.edu/2nd-ed/newchap11.pdf

### Representation

We will use for the representation of a fact (positive or negative literal) a string that represents the whole literal. Negative literals will begin with `NOT_`.

Actions will also be represented as strings. We will represent the special action of keeping a literal (*persistence actions*) as `____ P`, where` P` is the representation of the literal. E.g. `____ EatenCake`.

To represent a list of mutual exclusions, we will use a list of pairs of actions or literals, where two actions (or two literals) will appear only once in the list. To avoid the dependence on the order, the `isMutex` function will be used to easily see whether or not a pair already exists in the list of mutual exclusions.

We will represent the graph as a list of levels (alternative status and action levels), where a level is a dictionary containing three keys.

* A status level contains:
  * `type` key with `state` value;
  * the `state` key having as value a list of literals;
  * `mutex` key with a value of a list of mutually exclusive literal pairs.
* An action level contains:
  * `type` key with the value `actions`;
  * the `actions` key having as value a list of actions;
  * `mutex` key, with a value of a list of exclusive share pairs.

In [1]:
NOT_PARTICLE = "NOT_"
PERSISTENCE_PARTICLE = "____"
PARTICLE_LENGTH = len(NOT_PARTICLE)

TYPE = "type"
STATE = "state"
ACTIONS = "actions"
MUTEX = "mutex"

In [2]:
# Compute the opposite of P
def opposite(P):
    if P[:PARTICLE_LENGTH] == NOT_PARTICLE:
        return P[PARTICLE_LENGTH:]
    else:
        return NOT_PARTICLE + P
def NOT(P):
    return opposite(P)

print(NOT("Fact"))
print(NOT(NOT("Fact")))
print(NOT(NOT(NOT("Fact"))))

NOT_Fact
Fact
NOT_Fact


In [3]:
# Tasks taken from AIMA (page 396 for the first task; pages 391 and 399 for the second task):

# How to perform two actions in order?
Basic = {}
Basic['init'] = ["NothingDone", NOT("PhaseOneDone"), NOT("PhaseTwoDone")]
Basic['goal'] = ["PhaseTwoDone"]
Basic['actions'] = {}
Basic['actions']["PhaseOne"] = ([], ["PhaseOneDone"])
Basic['actions']["PhaseTwo"] = (["PhaseOneDone"], ["PhaseTwoDone"])

# How to have cake and eat it too?
Cake = {}
Cake['init'] = ["HaveCake", NOT("EatenCake")]
Cake['goal'] = ["EatenCake", "HaveCake"]
Cake['actions'] = {}
Cake['actions']["EatCake"] = (["HaveCake"], [NOT("HaveCake"), "EatenCake"])
Cake['actions']["BakeCake"] = ([NOT("HaveCake")], ["HaveCake"])

# How to solve the problem of a flat tire?
FlatTire = {}
FlatTire['init'] = ["At(Flat,Axle)", "At(Spare,Trunk)",NOT("At(Flat,Ground)"),NOT("At(Spare,Axle)"),NOT("At(Spare,Ground)")]
FlatTire['goal'] = ["At(Spare,Axle)"]
FA = {}
FA["Remove(Spare,Trunk)"] = (["At(Spare,Trunk)"],[NOT("At(Spare,Trunk)"),"At(Spare,Ground)"])
FA["Remove(Flat,Axle)"] = (["At(Flat,Axle)"],[NOT("At(Flat,Axle)"),"At(Flat,Ground)"])
FA["PutOn(Spare,Axle)"] = (["At(Spare,Ground)",NOT("At(Flat,Axle)")],["At(Spare,Axle)",NOT("At(Spare,Ground)")])
FlatTire['actions'] = FA

In [4]:
# Returns true if the smaller set is included in the bigger set.
def included(smaller, bigger):
    for x in smaller:
        if not x in bigger:
            return False
    return True

# Returns true if the elements el1 and el2 are mutually exclusive, according to the mutex list
def isMutex(el1, el2, mutex):
    return (el1, el2) in mutex or (el2, el1) in mutex

# Returns true if there are no mutually exclusive pairs in the to_check list
def notAnyMutex(to_check, mutex):
    for x in to_check:
        for y in to_check:
            if isMutex(x, y, mutex):
                return False
    return True

# Returns true if an action corresponds to 'No OPeration' type
def isNop(act):
    return len(act) > PARTICLE_LENGTH and act[:PARTICLE_LENGTH] == PERSISTENCE_PARTICLE

# Returns the fact which is the result of a 'No OPeration' action
def removeNop(act):
    if isNop(act):
        return act[PARTICLE_LENGTH:]
    return False

# Builds a 'No OPeration' action based on a fact
def makeNop(fact):
    return PERSISTENCE_PARTICLE + fact

# Displays the graph
def print_graph(graph, startLevel = 0, indent = ""):
    l = startLevel
    for level in graph:
        print(indent + "[ " + str(l) + " ] " + level[TYPE] + ":")
        for element in level[level[TYPE]]:
            out = indent + "\t" + element + "; mutex with "
            found = False
            for e in level[level[TYPE]]:
                if isMutex(e, element, level[MUTEX]):
                    out += e + ", "
                    found = True
            out = out + "None" if not found else out[:-2]
            print(out)
        l = l + 1
print(included("abc", "adecbf"))
print(included("abc", "adecf"))

True
False


In [5]:
from functools import reduce
import operator
DEBUG = False
def printd(i, *args):
    if DEBUG:
        print(i, *args)

# Returns true if all goals are in the last level of the graph and are not mutually exclusive.
def Maybe_completed(goals, graph):
    lLevel = len(graph) - 1
    if graph[lLevel][TYPE] != STATE: return False
    state = graph[lLevel][STATE]
    return included(goals, state) and notAnyMutex(goals, graph[lLevel][MUTEX])

# Extracts the solution of a problem using an already built planning graph
def Extract_solution(goals, graph, problem, indent = "\t"):
    global allgoals, allpotentialactions
    printd(indent, "=== checking; goals:", goals)
    if DEBUG:
        print_graph(graph, indent = indent)
    if len(graph) == 1:
        if included(goals, graph[0][STATE]): printd(indent, "## Done")
        return [] if included(goals, graph[0][STATE]) else False
    actions = graph[len(graph) - 2][ACTIONS]
    mutex_actions = graph[len(graph) - 2][MUTEX]
    all_actions = problem[ACTIONS]
    potential_actions = []
    first = True
    # All possible actions combinations
    allgoals += len(goals)
    for g in goals:
        goal_actions = [a for a in actions if removeNop(a) == g or (not isNop(a) and g in all_actions[a][1])]
        if first:
            potential_actions = [[a] for a in goal_actions]
            first = False
        else:
            pa = potential_actions
            potential_actions = []
            for a in goal_actions:
                for aa in pa:
                    potential_actions.append((aa + [a]) if a not in aa else aa)
        printd(indent, "## potential actions after checking goal",g,":",potential_actions)
    # Not-mutex actions
    printd(indent, "## all potential actions:",potential_actions)
    potential_actions = [comb for comb in potential_actions if notAnyMutex(comb, mutex_actions)]
    printd(indent, "## potential non-mutex actions:",potential_actions)
    allpotentialactions += reduce(operator.add, [len(a) for a in potential_actions], 0)
    for comb in potential_actions:
        new_goals = []
        for act in comb:
            if isNop(act):
                if removeNop(act) not in new_goals:
                    new_goals.append(removeNop(act))
            else:
                new_goals.extend([precond for precond in all_actions[act][0] if precond not in new_goals])
        printd(indent, "## attempt: actions:",comb)
        result = Extract_solution(new_goals, graph[:-2], problem, indent + ">\t")
        printd(indent, "## Result:",result)
        if result != False: return result + comb  
    return False

### INDIVIDUAL Contribution

Implement the missing parts of the `Extend_graph` function, which receives a planning graph and the dictionary of available actions in the problem. In the given graph, the last level is the state level.

The function must determine the following two levels in the graph, one of **the actions** and one of **state**. Each of the levels must also contain the indication of mutually exclusive actions/states.

Two actions are mutually exclusive (cannot be performed simultaneously) if:
* have mutually exclusive preconditions (*competing needs*) - the actions *a1* and *a2* cannot be performed simultaneously if there is a precondition *p1* for *a1* and a precondition *p2* for *a2* such as *p1* and *p2* are mutually exclusive, so they cannot exist simultaneously.
* have inconsistent effects - the actions *a1* and *a2* cannot be performed simultaneously if there is an effect *e1* of *a1* and an effect *e2* of *a2* that are opposite (one is the denial of the other), so both actions could not be performed successfully.
* interfere with each other (*interference*); the resulting plan must be linear so that two compatible actions on the same level will nevertheless be carried out one after the other; the actions *a1* and *a2* are therefore not compatible if there is an effect *e* of *a1* and a precondition *p* for *a2* which are opposite.

Two effects (facts) *e1* and *e2* are mutually exclusive (cannot be obtained at the same stage of the plan) if:
* we cannot find a pair of actions *(a1, a2)*, where *a1* produces *e1* and *a2* produces *e2*, such that *a1* and *a2* are **not** mutually exclusive.

In [10]:
# Construct the following two levels (an action level and a status level) from a planning graph,
# based on the last existing level, which is status.
# Considerring a list of all the actions described in the problem, as an action dictionary -> (preconditions, effects)
# The function returns a tuple of the two newly created levels.
def Extend_graph(graph, all_actions):
    lastLevel = len(graph) - 1
    # The last level in the graph
    state = graph[lastLevel][STATE]
    mutex = graph[lastLevel][MUTEX]
    
    # The next level is created after the given ones: the actions available on the previous status level
    
    # the applicable actions are calculated: all actions whose preconditions
    # exist in the previous state and are not mutually exclusive (see notAnyMutex function).
    # Store available actions as a name -> dictionary (precondition list, effects list)
    
    # 'No OPeration' actions are added
    actions = { makeNop(fact): ([fact], [fact]) for fact in state}
    # TO DO1: add the other available actions --> 2 POINTS
    for action in all_actions.keys():
        (preconditions, effects) = all_actions[action]
        compatible = True
        for prec in preconditions:
            if prec not in state:
                compatible = False
        if compatible:
            actions[action] = (preconditions, effects)
    
    # Compute mutually exclusive actions
    mutex_actions = []
    # TO DO2 --> 4 POINTS
    for action1 in actions.keys():
        (preconditions1, effects1) = actions[action1]
        for action2 in actions.keys():
            if action1 != action2:
                (preconditions2, effects2) = actions[action2]
                mutex = False
                for prec1 in preconditions1:
                    for prec2 in preconditions2:
                        if prec1 == NOT(prec2):
                            mutex = True
                    for eff2 in effects2:
                        if prec1 == NOT(eff2):
                            mutex = True
                for eff1 in effects1:
                    for prec2 in preconditions2:
                        if eff1 == NOT(prec2):
                            mutex = True
                    for eff2 in effects2:
                        if eff1 == NOT(eff2):
                            mutex = True
                if mutex and (action1, action2) not in mutex_actions and (action2, action1) not in mutex_actions:
                    mutex_actions.append((action1, action2))
            
    # Useful prints
    #print("mutex " + str((a1,a2)) + ": inconsistent effects <" + effect + "> of " + a1 + " and <" + NOT(effect) + "> of " + a2)
    #print("mutex " + str((a1,a2)) + ": interferring precondition <" + precond + "> of " + a1 + " and effect <" + NOT(precond) + "> of " + a2)
    #print("mutex " + str((a1,a2)) + ": competing preconditions (needs) <" + precond + "> of " + a1 + " and <" + NOT(precond) + "> of " + a2)

    # The next second level is created after the given ones: the state generated by the previous level of actions
    
    # calculates the effects of actions from the previous level
    # Store the effects as a fact dictionary -> list the actions that produce the effect
    effects = {}
    # TO DO3 --> 1.5 POINTS
    for action in actions.keys():
        (preconditions, facts) = actions[action]
        for fact in facts:
            if fact not in effects:
                effects[fact] = [action]
            else:
                effects[fact].append(action)

    # mutually exclusive effects are calculated:
    # 2 effects are mutex if they cannot result from actions that are not mutually exclusive
    mutex_effects = []
    # TO DO4 --> 2.5 POINTS
    for effect1 in effects.keys():
        actions1 = effects[effect1]
        for effect2 in effects.keys():
            if effect1 != effect2:
                actions2 = effects[effect2]
                mutex = True
                for action1 in actions1:
                    for action2 in actions2:
                        if (action1, action2) not in mutex_actions:
                            mutex = False
                if mutex and (effect1, effect2) not in mutex_effects and (effect2, effect1) not in mutex_effects:
                    mutex_effects.append((effect1, effect2))
   
    # Result: the last two levels in the graph as a tuple
    return ({TYPE: ACTIONS, ACTIONS: actions.keys(), MUTEX: mutex_actions},
            {TYPE: STATE, STATE: effects.keys(), MUTEX: mutex_effects})

In [11]:
def Plan(Problem):
    LIMIT = 10
    graph = [{TYPE: STATE, STATE: Problem['init'], MUTEX: []}]
    print("first level: ")
    print_graph(graph)
    cLevel = 0
    
    while cLevel < LIMIT:
        if Maybe_completed(Problem['goal'], graph):
            solution = Extract_solution(Problem['goal'], graph, Problem)
            if solution:
                return [a for a in solution if not isNop(a)]
        print("======")
        new_levels = Extend_graph(graph, Problem['actions'])
        graph.extend(new_levels)
        print("new levels:")
        print_graph(graph[-2:],cLevel + 1)
        cLevel += 2
    if cLevel == LIMIT: print("## Limit reached.")
    return False

tests = [Basic, Cake, FlatTire]
solutions = [['PhaseOne', 'PhaseTwo'], ['EatCake', 'BakeCake'],
             ['Remove(Spare,Trunk)', 'Remove(Flat,Axle)', 'PutOn(Spare,Axle)']]
goal_efficiency = [[2, 2], [4, 3], [3, 3]]
for i in range(len(tests)):
    allgoals, allpotentialactions = 0, 0
    sol = Plan(tests[i])
    print("\n## Solution: " + str(sol))
    if sol == solutions[i]:
        print("## OK, solution found.")
        ideal_goals, ideal_actions = goal_efficiency[i]
        if allgoals > ideal_goals: print("BUT, there were", allgoals, "goals generated instead of", ideal_goals)
        if allpotentialactions > ideal_actions:
            print("BUT, there were", allpotentialactions, "action branches generated instead of", ideal_actions)
#         print("Goals", allgoals, "Potential actions", allpotentialactions)
        if i == len(tests) - 1:
            print("all done.")
    else:
        print("## NOT OK. Should be: " + str(solutions[i]))
        if i < len(tests) - 1:
             print("Solve this before moving on.")
        break
    print("\n===========================================")

first level: 
[ 0 ] state:
	NothingDone; mutex with None
	NOT_PhaseOneDone; mutex with None
	NOT_PhaseTwoDone; mutex with None
new levels:
[ 1 ] actions:
	____NothingDone; mutex with None
	____NOT_PhaseOneDone; mutex with PhaseOne
	____NOT_PhaseTwoDone; mutex with None
	PhaseOne; mutex with ____NOT_PhaseOneDone
[ 2 ] state:
	NothingDone; mutex with None
	NOT_PhaseOneDone; mutex with PhaseOneDone
	NOT_PhaseTwoDone; mutex with None
	PhaseOneDone; mutex with NOT_PhaseOneDone
new levels:
[ 3 ] actions:
	____NothingDone; mutex with None
	____NOT_PhaseOneDone; mutex with ____PhaseOneDone, PhaseOne, PhaseTwo
	____NOT_PhaseTwoDone; mutex with PhaseTwo
	____PhaseOneDone; mutex with ____NOT_PhaseOneDone
	PhaseOne; mutex with ____NOT_PhaseOneDone
	PhaseTwo; mutex with ____NOT_PhaseOneDone, ____NOT_PhaseTwoDone
[ 4 ] state:
	NothingDone; mutex with None
	NOT_PhaseOneDone; mutex with PhaseOneDone, PhaseTwoDone
	NOT_PhaseTwoDone; mutex with PhaseTwoDone
	PhaseOneDone; mutex with NOT_PhaseOneDone
	Ph