In [None]:
#!pip install clingo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting clingo
  Downloading clingo-5.6.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: clingo
Successfully installed clingo-5.6.2


In [None]:
### DO NOT CHANGE THIS CELL

%matplotlib inline
import networkx as nx
import matplotlib.pyplot as plt

import clingo
from IPython.display import clear_output

In [None]:
### YOU MAY CHANGE THIS CELL TO TEST YOUR SOLUTION ON DIFFERENT INPUTS

nodes = list(range(1, 6+1))
edges = [
    (1,2),
    (2,3),
    (3,4),
    (4,5),
    (2,6),
    (3,6),
]
factories = [
    {
        'location': 1,
        'load': 10,
    },
]
delivery_points = [
    {
        'location': 4,
        'demand': 2,
    },
    {
        'location': 5,
        'demand': 2,
    },
]
charging_points = [
    {
        'location': 2,
    },
    {
        'location': 6,
    },
]
trucks = [
    {
        'location': 1,
        'load': 0,
        'capacity': 1,
        'charge level': 2,
        'charge capacity': 10,
    },
    {
        'location': 2,
        'load': 0,
        'capacity': 1,
        'charge level': 2,
        'charge capacity': 10,
    },
]

num_time_steps = 40

In [None]:
### DO NOT CHANGE THIS CELL

warehouse_locations = [
    warehouse_info['location']
    for warehouse_info in factories
]
truck_locations = [
    truck_info['location']
    for truck_info in trucks
]
delivery_locations = [
    dp_info['location']
    for dp_info in delivery_points
]
charging_locations = [
    cp_info['location']
    for cp_info in charging_points
]

class InvalidInputError(Exception):
    pass

intersection = [location for location in warehouse_locations if location in delivery_locations]
if len(intersection) > 0:
    raise InvalidInputError(f'node {intersection[0]} is both a warehouse and a delivery location')

intersection = [location for location in warehouse_locations if location in charging_locations]
if len(intersection) > 0:
    raise InvalidInputError(f'node {intersection[0]} is both a warehouse and a charging location')

intersection = [location for location in delivery_locations if location in charging_locations]
if len(intersection) > 0:
    raise InvalidInputError(f'node {intersection[0]} is both a delivery and a charging location')

In [597]:
# Load example inputs 
asp_program = ""
asp_program += f"     #const t={num_time_steps}.\n"
asp_program += f"     #const tr={len(trucks)}.\n"
asp_program += f"     #const charge_cap={trucks[0]['charge capacity']}.\n" #all trucks have the same battery capacity safe to make that assumption??
for node in nodes:
  asp_program += f"     node({node}).\n"

for edge in edges:
  asp_program += f"     edge({edge[0]},{edge[1]}).\n"

max_factory_load = 0
for i, factory in enumerate(factories):
  location = factory['location']
  load = factory['load']
  if load >max_factory_load:
    max_factory_load = load
  asp_program += f"     factory_loc({location}).\n"
  asp_program += f"     factory_load({location},{load}).\n"

asp_program += f"     #const loads_f={max_factory_load}.\n"

max_delivery_load = 0
for d_point in delivery_points:
  location = d_point['location']
  demand = d_point['demand']
  if demand >max_delivery_load:
    max_delivery_load = demand
  asp_program += f"     delivery_point({location}).\n"
  asp_program += f"     delivery_demand({location},{demand}).\n"

asp_program += f"     #const loads_d={max_delivery_load }.\n"

for c_point in charging_points:
  location = c_point['location']
  asp_program += f"     charging_point({location}).\n"

max_load_for_truck = 0
for i, truck in enumerate(trucks):
  location = truck['location']
  load = truck['load']
  capacity = truck['capacity']
  if capacity > max_load_for_truck:
    max_load_for_truck = capacity
  c_level = truck['charge level']
  c_capacity = truck['charge capacity']
  asp_program += f"     truck({i+1}).\n"
  asp_program += f"     charge_level({c_level}).\n"
  asp_program += f"     truck_at({i+1},{location}).\n"
  asp_program += f"     truck_load({i+1},{load}).\n"
  asp_program += f"     truck_capacity({i+1},{capacity}).\n"
  asp_program += f"     truck_charge_level({i+1}, {c_level}).\n"
  asp_program += f"     charge_capacity({i+1},{c_capacity}).\n" 

asp_program += f"     #const loads_t={max_load_for_truck}.\n"

In [598]:
asp_program += """

    % Generate times till maximum number of time steps
    time(1..t).
    %possible_time(1..t).
    %{ time(T) : possible_time(T) }.
    %:- not time(1). %first time step is time 1
    %:- not time(T), time(T+1), possible_time(T), possible_time(T+1).

    charge_levels(0..charge_cap).
    load_for_trucks(0..loads_t).
    load_for_f(0..loads_f).
    load_for_d(0..loads_d).

    %*{ time(T) : possible_time(T) }.
    :- not time(1). %first time step is time 1
    :- possible_time(T), possible_time(T+1), not time(T), time(T+1). %times are sequential
    *%

    % Max time is last time step
    %max_time(MaxTime) :- MaxTime = #max { T : time(T) }.

    % Edges are undirected
    edge(X,Y) :- edge(Y,X).

    % These are the possible actions that we can take
    action(wait).
    action(move(X,Y)) :- edge(X,Y).
    action(load).
    action(unload).
    action(charge).

    % We perform exactly one action at each time step
    1 { do(T, Tr, A) : action(A) } 1 :- time(T), T <t,  truck(Tr).
    
    """

In [599]:
asp_program += """
    truck_at(Tr, Loc) :-  do(T, Tr, wait), truck_at(Tr, Loc), truck(Tr), node(Loc).

    % Effect of move
    truck_at(Tr, Y) :- do(T, Tr, move(X,Y)), truck_at(Tr, X), truck(Tr), node(Y).
    %truck_charge_level(Tr, L-1) :-  do(T-1, Tr, move(X,Y)), truck_at(Tr, Loc), truck_charge_level(Tr, L),
     %                         charge_levels(L-1), charge_levels(L), node(Loc), time(T-1).
    
    %truck_prev_at(Tr, Loc) :- do(T, Tr, move(Loc,Y)), truck(Tr), node(Loc).
    %truck_charge_level(Tr, Loc, L-1) :-  truck_at(Tr, Loc), truck_charge_level(Tr, L), charge_levels(L-1), charge_levels(L), node(Loc), 
               %                     truck_prev_at(P), Loc!=P.

    % Effect of charge
    truck_charge_level(Tr, L+1) :-  truck_at(Tr, Loc), charging_point(Loc), truck_charge_level(Tr, L), charge_levels(L+1), charge_levels(L).
    truck_at(Tr, Loc) :- do(T, Tr, charge), truck(Tr), truck_at(Tr, Loc), charging_point(Loc). %truck_charge_level(Tr, _).

    % Effect of unload
    truck_load(Tr, L-1) :-  truck_at(Tr, Loc), delivery_point(Loc), load_for_trucks(L-1), load_for_trucks(L), truck_load(Tr, L).                       
    delivery_demand(Loc, L-1) :-  truck_at(Tr, Loc), delivery_point(Loc), delivery_demand(Loc, L),   
                            load_for_d(L-1), load_for_d(L).
    truck_at(Tr, Loc) :- do(T,Tr, unload), truck(Tr), truck_at(Tr, Loc), delivery_point(Loc). % truck_load(Tr, L).
  
    % Effect of load
    factory_load(F, L-1) :- truck_at(Tr, F), factory_loc(F), factory_load(F, L), 
                            load_for_f(L-1), load_for_f(L).
    truck_load(Tr, L+1) :-  truck_at(Tr, Loc), factory_loc(F), truck_load(Tr, L), 
                            load_for_trucks(L-1), load_for_trucks(L).
    truck_at(Tr, Loc) :- do(T, Tr, load), truck(Tr), truck_at(Tr,Loc), factory_loc(Loc).
  
  
"""

In [600]:
asp_program += """
    % Cannot move when battery is empty
    :- do(T, Tr, move(X,_)), truck_charge_level(Tr, 0), time(T), truck_at(Tr,X).

    % At each location, there can be at most one truck at the same time.
    :- truck_at(Tr1, Loc), truck_at(Tr2, Loc), Tr1!=Tr2.

    % Trucks cannot pass each other on a piece of road—e.g., trucks at adjacent locations cannot swap places in a single time step.
    :- do(T, Tr1, move(X,Y)), do(T, Tr2, move(Y, X)), truck_at(Tr1, X), truck_at(Tr2,Y), Tr1 != Tr2, time(T).

    % Cannot move to node without an edge
    :- do(T, Tr, move(X,Y)), truck_at(Tr, X), not edge(X,Y), time(T).




    % Cannot charge if truck is not in charging location
    :- do(T, Tr, charge),  truck_at(Tr, Loc), not charging_point(Loc), time(T).

    % Cannot charge when battery is full
    :- do(T,Tr, charge), truck_charge_level(Tr, charge_cap), time(T), truck_at(Tr, _).

     % Cannot unload if not at delivery point
    :- do(T, Tr, unload), truck_at(Tr, Loc), not delivery_point(Loc), time(T).

    % Cannot unload if the demand is 0
    :- do(T, Tr, unload), truck_at(Tr, Loc), delivery_demand(Loc, 0), time(T).

    % Cannot unload when truck load is 0
    :- do(T, Tr, unload),  truck_load(Tr, 0), time(T).

    % Cannot load if not at factory
    :- do(T, Tr, load), truck_at(Tr, Loc), not factory_loc(Loc), time(T).

    % Cannot load if the factory is empty
    :- do(T, Tr, load), truck_at(Tr, Loc),  factory_load(Loc, 0), time(T).

    % Cannot load when load capacity is full
    :- do(T, Tr, load), truck_load(Tr,load), truck_capacity(Tr, cap), load>=cap, time(T).

    % A truck cannot be left without enough battery to reach a charging point
    :- truck_at(Tr, X), truck_charge_level(Tr, L), charging_point_dist(X, DP), L<DP.

    :- do(T, Tr, wait), do(T+1, Tr, wait), time(T), time(T+1), T+1 < t.     
    
      
  
    
"""

In [601]:
asp_program += """
    all_distances(0..M) :- M = #max{N : node(N)}.
    distance(X,X,0) :- node(X).
    distance(X, Y, D+1) :- all_distances(D+1), distance(X, Z, D), edge(Z,Y), not distance(X,Y, DMin) : all_distances(DMin), DMin<=D.

    charging_point_dist(X, PD) :- node(X), PD = #min{ D, Y: distance(X,Y,D), charging_point(Y)}.

    deliveries_done(T, Loc) :- time(T), delivery_demand(Loc,0).
    truck_parked(T, Tr) :- time(T), truck_at(Tr, Loc), charging_point(Loc).
    truck_empty(T, Tr) :- time(T), truck_load(Tr,0).
    find_ch_point(T, Tr) :- time(T), truck_at(Tr, Loc), truck_charge_level(Tr, L), shortest_path(Loc, Node), Node<=L.

    goal_reached(T) :- time(T), delivery_demand(N, 0): delivery_point(N); 
            time(T), truck_at(Tr,Loc), charging_point(Loc), truck_load(Tr,0):truck(Tr).

    %*goal_reached(T) :- time(T),
	      deliveries_done(T, LocD): delivery_point(LocD);
        truck_parked(T, Tr): truck(Tr);
        truck_empty(T, Tr): truck(Tr) ;
        find_ch_point(T, Tr): truck(Tr).*%
    
    :- not goal_reached(t).
    :- time(M), goal_reached(t), M>t.
   

    %#minimize { 1,T : time(T) }.
    %#minimize { 1,T : do(T, _, wait) }.
    #show do/3.
       

"""

In [None]:
print(asp_program)

In [None]:
### DO NOT CHANGE THIS CELL

# Some machinery to (dynamically) print things in the notebook
log_str = ""
def reset_log():
    global log_str
    log_str = ""
def log(msg):
    global log_str
    print(msg, flush=True)
    log_str += f"{msg}\n"
def print_log():
    global log_str
    print(log_str, end='', flush=True)
    
def display_solution(model):
    """
    Displays a solution specified by the answer set
    in a human-readable format.
    
    Assumes that the answer set program uses do/3
    to specify actions, and uses truck_at/2 to
    specify the initial locations of the trucks.
    """
    actions = {}
    truck_locations = {}
    for atom in model.symbols(atoms=True):
        if atom.name == "do":
            time = atom.arguments[0].number
            truck = str(atom.arguments[1])
            action = str(atom.arguments[2])
            print(time, truck, action)
            if time not in actions:
                actions[time] = {}
            actions[time][truck] = action
        elif atom.name == "truck_at":
            truck_locations[str(atom.arguments[0])] = str(atom.arguments[1])
    solution_str = "\n"
    solution_str += " "*5 + "".join([
        f"{truck:<5} @ {truck_locations[truck]:<10}"
        for truck in sorted(truck_locations)
    ]) + "\n"
    solution_str += "-"*(5+18*len(truck_locations)) + "\n"
    for time in sorted(actions):
        time_str = f"{time}:"
        solution_str += f"{time_str:<4} " + "".join([
            f"{actions[time][truck]:<18}"
            for truck in sorted(truck_locations)
        ]) + "\n"
    return solution_str

def find_and_output_solution(
    asp_program,
    timeout=10,
    num_threads=1,
):
    """
    Calls the ASP solver to find answer sets,
    and translates these to solutions,
    which are printed in human-readable format.
    """

    reset_log()
    log("..Grounding..")
    
    control = clingo.Control([f"--parallel-mode={num_threads}"])
    control.add("base", [], asp_program)
    control.ground([("base", [])])

    control.configuration.solve.models = 1
    control.configuration.solve.opt_mode = "optN"
    
    def interpret_model(model):
        clear_output(wait=True)
        print_log()
        sol = display_solution(model)
        print(sol)

    if timeout > 0:
        log(f"..Solving with timeout {timeout}s..")
        with control.solve(
            async_=True,
            on_model=lambda model: interpret_model(model),
        ) as handle:
            finished = handle.wait(timeout)
            if not finished:
                log("..Stopped after timeout..")
            else:
                log("..Stopped..")
                time = control.statistics['summary']['times']['total']
                log(f"..Total solving time: {time:.2f} seconds..")
    else:
        log("..Solving..")
        control.solve(
            on_model=lambda model: interpret_model(model),
        )
        log("..Stopped..")
        time = control.statistics['summary']['times']['total']
        log(f"..Total solving time: {time:.2f} seconds..")

In [602]:
find_and_output_solution(
    asp_program,
    timeout=0,
    num_threads=4,
)

..Grounding..
..Solving..
..Stopped..
..Total solving time: 0.02 seconds..
