# Social Laws

This notebook shows a demonstration of social laws - a mechanism for coordinating multiple agents.

We will demonstrate social laws on a simple example of an intersection which cars need to cross from the four cardinal directions.

## Modeling Multi Agent Planning Problems for Social Laws

One of the basic concepts of social laws is _waiting_, and so we extend the ``MultiAgentProblem`` class to include waitfor annotations, and create a class called ``MultiAgentProblemWithWaitfor``

In [18]:
from unified_planning.shortcuts import *
from unified_planning.social_law.ma_problem_waitfor import MultiAgentProblemWithWaitfor
from unified_planning.model.multi_agent import *
unified_planning.shortcuts.get_env().credits_stream = None

problem = MultiAgentProblemWithWaitfor("intersection")

print(problem)

problem name = intersection

environment fluents = [
]

agents = [
]

initial values = [
]

goals = [
]


waitfor: {}


### Environment Types and Fluents

To define the intersection environment, we will create 2 types: loc (location) and direction.
We will then create the fluents to describe the state of the intersection: 
    * connected - which locations are connected and in which direction
    * free - whether given location is free

In [19]:
loc = UserType("loc")
direction = UserType("direction")

connected = Fluent('connected', BoolType(), l1=loc, l2=loc, d=direction)
free = Fluent('free', BoolType(), l=loc)

problem.ma_environment.add_fluent(connected, default_initial_value=False)
problem.ma_environment.add_fluent(free, default_initial_value=True)

print(problem)

problem name = intersection

types = [loc, direction]

environment fluents = [
  bool connected[l1=loc, l2=loc, d=direction]
  bool free[l=loc]
]

agents = [
]

objects = [
  loc: []
  direction: []
]

initial values = [
]

goals = [
]


waitfor: {}


We will now create the map of the intersection and populate the initial values accordingly.

``intersection_map`` given the path (list of locations) taken by a car crossing the intersection in each cardinal direction. From this we will create the objects and initial values for the environment



In [20]:
intersection_map = {
    "north": ["south-ent", "cross-se", "cross-ne", "north-ex"],
    "south": ["north-ent", "cross-nw", "cross-sw", "south-ex"],
    "west": ["east-ent", "cross-ne", "cross-nw", "west-ex"],
    "east": ["west-ent", "cross-sw", "cross-se", "east-ex"]
}

location_names = set()
for l in intersection_map.values():
    location_names = location_names.union(l)
locations = list(map(lambda l: unified_planning.model.Object(l, loc), location_names))
problem.add_objects(locations)

direction_names = intersection_map.keys()
directions = list(map(lambda d: unified_planning.model.Object(d, direction), direction_names))
problem.add_objects(directions)

for d, l in intersection_map.items():
    for i in range(len(l)-1):            
        problem.set_initial_value(connected(unified_planning.model.Object(l[i], loc), unified_planning.model.Object(l[i+1], loc), unified_planning.model.Object(d, direction)), True)

print(problem)


problem name = intersection

types = [loc, direction]

environment fluents = [
  bool connected[l1=loc, l2=loc, d=direction]
  bool free[l=loc]
]

agents = [
]

objects = [
  loc: [south-ex, west-ent, cross-sw, cross-se, east-ent, west-ex, cross-ne, north-ex, south-ent, cross-nw, east-ex, north-ent]
  direction: [north, south, west, east]
]

initial values = [
  connected(south-ent, cross-se, north) := true
  connected(cross-se, cross-ne, north) := true
  connected(cross-ne, north-ex, north) := true
  connected(north-ent, cross-nw, south) := true
  connected(cross-nw, cross-sw, south) := true
  connected(cross-sw, south-ex, south) := true
  connected(east-ent, cross-ne, west) := true
  connected(cross-ne, cross-nw, west) := true
  connected(cross-nw, west-ex, west) := true
  connected(west-ent, cross-sw, east) := true
  connected(cross-sw, cross-se, east) := true
  connected(cross-se, east-ex, east) := true
]

goals = [
]


waitfor: {}


### Agents

We will now define the agents (cars), with their agent-specific fluents:
    * at - current location of the car
    * not_arrived - whether the car has already arrived at the intersection or not
    * start - the car's start location
    * traveldirection - the car's travel direction

In [21]:
cars = ["car-north", "car-south", "car-east", "car-west"]

at = Fluent('at', BoolType(), l1=loc)    
not_arrived = Fluent('not-arrived', BoolType())
start = Fluent('start', BoolType(), l=loc)        
traveldirection = Fluent('traveldirection', BoolType(), d=direction)

We will now model the 2 actions: arrive (car arrive's to its start location), and drive (from one location to a connected one)

In [22]:
arrive = InstantaneousAction('arrive', l=loc)    
l = arrive.parameter('l')
arrive.add_precondition(start(l))
arrive.add_precondition(not_arrived())
arrive.add_precondition(free(l))
arrive.add_effect(at(l), True)
arrive.add_effect(free(l), False)
arrive.add_effect(not_arrived(), False)   


drive = InstantaneousAction('drive', l1=loc, l2=loc, d=direction)
l1 = drive.parameter('l1')
l2 = drive.parameter('l2')
d = drive.parameter('d')
drive.add_precondition(at(l1))
drive.add_precondition(free(l2))
drive.add_precondition(traveldirection(d))
drive.add_precondition(connected(l1,l2,d))
drive.add_effect(at(l2),True)
drive.add_effect(free(l2), False)
drive.add_effect(at(l1), False)
drive.add_effect(free(l1), True)  

Finally, we can create the agents, and add their fluents and actions, and set their initial values and goals

In [23]:
for d, l in intersection_map.items():
    carname = "car-" + d
    if carname in cars:
        car = Agent(carname, problem)
    
        problem.add_agent(car)
        car.add_fluent(at, default_initial_value=False)        
        car.add_fluent(not_arrived, default_initial_value=True)
        car.add_fluent(start, default_initial_value=False)
        car.add_fluent(traveldirection, default_initial_value=False)
        car.add_action(arrive)
        car.add_action(drive)

        slname = l[0]
        slobj = unified_planning.model.Object(slname, loc)

        glname = l[-1]
        globj = unified_planning.model.Object(glname, loc)
        
        dobj = unified_planning.model.Object(d, direction)

        problem.set_initial_value(Dot(car, car.fluent("start")(slobj)), True)
        problem.set_initial_value(Dot(car, car.fluent("traveldirection")(dobj)), True)        
        problem.add_goal(Dot(car, car.fluent("at")(globj)))

print(problem)

problem name = intersection

types = [loc, direction]

environment fluents = [
  bool connected[l1=loc, l2=loc, d=direction]
  bool free[l=loc]
]

agents = [
  Agent name = car-north

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]
 bool traveldirection[d=direction]
]

actions = [
 action arrive(loc l) {
    preconditions = [
      start(l)
      not-arrived
      free(l)
    ]
    effects = [
      at(l) := true
      free(l) := false
      not-arrived := false
    ]
    simulated effect = None
  }
 action drive(loc l1, loc l2, direction d) {
    preconditions = [
      at(l1)
      free(l2)
      traveldirection(d)
      connected(l1, l2, d)
    ]
    effects = [
      at(l2) := true
      free(l2) := false
      at(l1) := false
      free(l1) := true
    ]
    simulated effect = None
  }
]


  Agent name = car-south

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]
 bool traveldirection[d=direction]
]

actions = [
 action arrive(loc l) {
    precon

## Checking for Robustness

Social laws assume each agent plans individually, assuming the other agents do not perform any actions. A *robust* social law is one where this is guaranteed to produce correct plans.

To check whether the intersection example is robust, we can use the ``SocialLawRobustnessChecker`` class.

``SocialLawRobustnessChecker`` first create the single agent projection -- a classical planning problem for each agent, assuming it is alone in the world, and solves them. If these are all solvable, it creates another classical planning problem which tries to get the agents to interfere with each other. If this problem is solvable, it provides a counter example to robustness. We can see the counterexample as actions from the original problems and as actions from the compilation - both have their uses.

In [36]:
from unified_planning.social_law.robustness_checker import SocialLawRobustnessChecker

slrc = SocialLawRobustnessChecker()

result = slrc.is_robust(problem)

print("status=", result.status)
print("counter example (original actions): ", result.counter_example_orig_actions)
print("counter example (compiled actions): ", result.counter_example)


status= SocialLawRobustnessStatus.NON_ROBUST_MULTI_AGENT_FAIL
counter example (original actions):  [arrive(west-ent), drive(west-ent, cross-sw, east), arrive(north-ent), arrive(east-ent), arrive(south-ent), drive(north-ent, cross-nw, south), drive(cross-sw, cross-se, east), drive(cross-se, east-ex, east), drive(cross-nw, cross-sw, south), drive(cross-sw, south-ex, south), drive(east-ent, cross-ne, west), drive(south-ent, cross-se, north), drive(cross-se, cross-ne, north), drive(cross-ne, north-ex, north), drive(cross-ne, cross-nw, west), drive(cross-nw, west-ex, west)]
counter example (compiled actions):  [s_car-east_arrive(west-ent), s_car-east_drive(west-ent, cross-sw, east), s_car-south_arrive(north-ent), s_car-west_arrive(east-ent), s_car-north_arrive(south-ent), s_car-south_drive(north-ent, cross-nw, south), s_car-east_drive(cross-sw, cross-se, east), s_car-east_drive(cross-se, east-ex, east), s_car-south_drive(cross-nw, cross-sw, south), s_car-south_drive(cross-sw, south-ex, sout

## Applying Social Laws


In this case, we see the result is ``NON_ROBUST_MULTI_AGENT_FAIL``, which indicates the agents interfered with each other -- specifically, the action ``f_1_car-north_drive(cross-se, cross-ne, north)`` indicates the northbound car violated precondition number 1 (the second one -- ``free(l2)``), and crashed into another car.

To remedy that, we can use the waitfor mechanism, to ensure each car waits until its destination location is clear before driving into it.

Instead of manually modifying the multi agent problem, we will utilize the ``SocialLaw`` class, which represents a modification to a multi-agent problem. 
``SocialLaw`` is implemented as a compiler, which takes a multi-agent problem, and output a modified multi-agent problem.

In [37]:
from unified_planning.social_law.social_law import SocialLaw

l = SocialLaw()

for agent in problem.agents:
    l.add_waitfor_annotation(agent.name, "drive", "free", ("l2",)) # Use only strings because this should work on any given problem, and note the extra comment to make ("l2",) a tuple (as ("l2") is interpreted as "l2" )
    
new_problem = l.compile(problem).problem

print(new_problem)

problem name = sl_intersection

types = [loc, direction]

environment fluents = [
  bool connected[l1=loc, l2=loc, d=direction]
  bool free[l=loc]
]

agents = [
  Agent name = car-north

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]
 bool traveldirection[d=direction]
]

actions = [
 action arrive(loc l) {
    preconditions = [
      start(l)
      not-arrived
      free(l)
    ]
    effects = [
      at(l) := true
      free(l) := false
      not-arrived := false
    ]
    simulated effect = None
  }
 action drive(loc l1, loc l2, direction d) {
    preconditions = [
      at(l1)
      free(l2)
      traveldirection(d)
      connected(l1, l2, d)
    ]
    effects = [
      at(l2) := true
      free(l2) := false
      at(l1) := false
      free(l1) := true
    ]
    simulated effect = None
  }
]


  Agent name = car-south

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]
 bool traveldirection[d=direction]
]

actions = [
 action arrive(loc l) {
    pre

We can now call the robustness verification procedure again, to see whether the system (with the new waitfor conditions) is robust.

In [38]:
result = slrc.is_robust(new_problem)

print(result.status)
print(result.counter_example)

SocialLawRobustnessStatus.NON_ROBUST_MULTI_AGENT_DEADLOCK
[s_car-east_arrive(west-ent), s_car-east_drive(west-ent, cross-sw, east), s_car-south_arrive(north-ent), s_car-west_arrive(east-ent), s_car-north_arrive(south-ent), s_car-west_drive(east-ent, cross-ne, west), s_car-south_drive(north-ent, cross-nw, south), w0_car-west_drive(cross-ne, cross-nw, west), pw_car-west_drive(cross-nw, west-ex, west), w0_car-south_drive(cross-nw, cross-sw, south), pw_car-south_drive(cross-sw, south-ex, south), s_car-north_drive(south-ent, cross-se, north), w0_car-north_drive(cross-se, cross-ne, north), pw_car-north_drive(cross-ne, north-ex, north), w0_car-east_drive(cross-sw, cross-se, east), pw_car-east_drive(cross-se, east-ex, east), end_f_car-east_0, end_f_car-west_0, end_f_car-south_0, end_f_car-north_0]


The result now is a deadlock (``NON_ROBUST_MULTI_AGENT_DEADLOCK``), caused by each car driving one step into the intersection, resulting in each car blocking the other's path.

To avoid this, we will yield (not drive into the intersection) if the far end of the intersection is not free. 
We will implement this by adding a parameter to the drive action, which indicates which location this drive action yields to, and a fluent which specifies this. Drive will wait until the location it yields to is free.

Since modifying the existing problem is tricky, and to avoid having to rewrite the problem generation code, we import a function that takes care of this for us.


In [39]:
l2 = SocialLaw()        
# Add the yieldsto fluent
l2.add_new_fluent(None, "yieldsto", ( ("l1","loc"), ("l2","loc")), False)  
# Add a parameter to the drive action for the yieldsto location, add a static precondition checking it, and add a waitfor condition that the location is free
for agent in problem.agents:
    l2.add_parameter_to_action(agent.name, "drive", "ly", "loc")            
    l2.add_precondition_to_action(agent.name, "drive", "yieldsto", ("l1", "ly") )
    l2.add_precondition_to_action(agent.name, "drive", "free", ("ly",) )
    l2.add_waitfor_annotation(agent.name, "drive", "free", ("ly",) )
# For each entrance to the intersection, add the location 2 steps from it as its yieldto location
for loc1,loc2 in [("south-ent", "cross-ne"),("north-ent", "cross-sw"),("east-ent", "cross-nw"),("west-ent", "cross-se")]:
    l2.set_initial_value_for_new_fluent(None, "yieldsto", (loc1, loc2), True)
# For all other locations, add a dummy location they yield to
l2.add_new_object("dummy_loc", "loc")
for loc in problem.objects(problem.user_type("loc")):
    if loc.name not in ["south-ent", "north-ent", "east-ent", "west-ent"]:
        l2.set_initial_value_for_new_fluent(None, "yieldsto", (loc.name, "dummy_loc"), True)


robust_problem = l2.compile(new_problem).problem

print(robust_problem)

problem name = sl_sl_intersection

types = [loc, direction]

environment fluents = [
  bool connected[l1=loc, l2=loc, d=direction]
  bool free[l=loc]
  bool yieldsto[l1=loc, l2=loc]
]

agents = [
  Agent name = car-north

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]
 bool traveldirection[d=direction]
]

actions = [
 action arrive(loc l) {
    preconditions = [
      start(l)
      not-arrived
      free(l)
    ]
    effects = [
      at(l) := true
      free(l) := false
      not-arrived := false
    ]
    simulated effect = None
  }
 action drive(loc l1, loc l2, direction d, loc ly) {
    preconditions = [
      at(l1)
      free(l2)
      traveldirection(d)
      connected(l1, l2, d)
      free(ly)
      yieldsto(l1, ly)
    ]
    effects = [
      at(l2) := true
      free(l2) := false
      at(l1) := false
      free(l1) := true
    ]
    simulated effect = None
  }
]


  Agent name = car-south

fluents = [
 bool at[l1=loc]
 bool not-arrived
 bool start[l=loc]


And now we can verify the robustness of the new problem, and ensure that it is robust.

In [40]:
result = slrc.is_robust(robust_problem)

print(result.status)


SocialLawRobustnessStatus.ROBUST_RATIONAL


## Planning using robust multi-agent systems

Now that we have a robust multi-agent system, we can be assured that our simple planning strategy of planning for each agent alone, and then combining the plans, is guaranteed to work. This can be done using the same ``SocialLawRobustnessChecker``, which is one a OneShotPlanner

In [41]:
presult = slrc.solve(robust_problem)

print(presult.status)
print(presult.plan)

PlanGenerationResultStatus.SOLVED_SATISFICING
[car-north.arrive(south-ent), car-south.arrive(north-ent), car-west.arrive(east-ent), car-east.arrive(west-ent), car-north.drive(south-ent, cross-se, north, cross-ne), car-south.drive(north-ent, cross-nw, south, cross-sw), car-north.drive(cross-se, cross-ne, north, dummy_loc), car-south.drive(cross-nw, cross-sw, south, dummy_loc), car-north.drive(cross-ne, north-ex, north, dummy_loc), car-south.drive(cross-sw, south-ex, south, dummy_loc), car-west.drive(east-ent, cross-ne, west, cross-nw), car-east.drive(west-ent, cross-sw, east, cross-se), car-west.drive(cross-ne, cross-nw, west, dummy_loc), car-east.drive(cross-sw, cross-se, east, dummy_loc), car-west.drive(cross-nw, west-ex, west, dummy_loc), car-east.drive(cross-se, east-ex, east, dummy_loc)]
