In [1]:
num_iters = 2
env_num = 2
spec_num = 3

In [2]:
from conformal.all_paths_conformal_pred import all_paths_conformal_pred
from conformal.bucketed_conformal_pred import bucketed_conformal_pred
from conformal.nonconformity_score_graph import DIRLCumRewardScoreGraph, DIRLTimeTakenScoreGraph
from spectrl.hierarchy.construction import adj_list_from_task_graph, automaton_graph_from_spec
from spectrl.hierarchy.reachability import HierarchicalPolicy, ConstrainedEnv
from spectrl.main.spec_compiler import ev, seq, choose, alw
from spectrl.util.io import parse_command_line_options, save_log_info, save_object
from spectrl.util.rl import print_performance, get_rollout
from spectrl.rl.ars import HyperParams

from spectrl.examples.rooms_envs import (
    GRID_PARAMS_LIST,
    MAX_TIMESTEPS,
    START_ROOM,
    FINAL_ROOM,
)
from spectrl.envs.rooms import RoomsEnv

import os


render = False
folder = ''
itno = -1

log_info = []

grid_params = GRID_PARAMS_LIST[env_num]

hyperparams = HyperParams(30, num_iters, 30, 15, 0.05, 0.3, 0.15)

print(
    "\n**** Learning Policy for Spec #{} in Env #{} ****".format(
        spec_num, env_num
    )
)

# Step 1: initialize system environment
system = RoomsEnv(grid_params, START_ROOM[env_num], FINAL_ROOM[env_num])

# Step 4: List of specs.
if env_num == 2:
    bottomright = (0, 2)
    topleft = (2, 0)
if env_num == 3 or env_num == 4:
    bottomright = (0, 3)
    topleft = (3, 0)

# test specs
spec0 = ev(grid_params.in_room(FINAL_ROOM[env_num]))
spec1 = seq(
    ev(grid_params.in_room(FINAL_ROOM[env_num])),
    ev(grid_params.in_room(START_ROOM[env_num])),
)
spec2 = ev(grid_params.in_room(topleft))

# Goto destination, return to initial
spec3 = seq(
    ev(grid_params.in_room(topleft)),
    ev(grid_params.in_room(START_ROOM[env_num])),
)
# Choose between top-right and bottom-left blocks (Same difficulty - learns 3/4 edges)
spec4 = choose(
    ev(grid_params.in_room(bottomright)), ev(grid_params.in_room(topleft))
)
# Choose between top-right and bottom-left, then go to Final state (top-right).
# Only one path is possible (learns 5/5 edges. Should have a bad edge)
spec5 = seq(
    choose(
        ev(grid_params.in_room(bottomright)), ev(grid_params.in_room(topleft))
    ),
    ev(grid_params.in_room(FINAL_ROOM[env_num])),
)
# Add obsacle towards topleft
spec6 = alw(grid_params.avoid_center((1, 0)), ev(grid_params.in_room(topleft)))
# Either go to top-left or bottom-right. obstacle on the way to top-left.
# Then, go to Final state. Only one route is possible
spec7 = seq(
    choose(
        alw(grid_params.avoid_center((1, 0)), ev(grid_params.in_room(topleft))),
        ev(grid_params.in_room(bottomright)),
    ),
    ev(grid_params.in_room(FINAL_ROOM[env_num])),
)

specs = [spec0, spec1, spec2, spec3, spec4, spec5, spec6, spec7]

# Step 3: construct abstract reachability graph
_, abstract_reach = automaton_graph_from_spec(specs[spec_num])
print("\n**** Abstract Graph ****")
abstract_reach.pretty_print()

# Step 5: Learn policy
path_policies = abstract_reach.learn_all_paths(
    system,
    hyperparams,
    res_model=None,
    max_steps=20,
    render=render,
    neg_inf=-100,
    safety_penalty=-1,
    num_samples=500,
)

adj_list = adj_list_from_task_graph(abstract_reach.abstract_graph)
terminal_vertices = [i for i in range(len(adj_list)) if i in adj_list[i]]



**** Learning Policy for Spec #3 in Env #2 ****

**** Abstract Graph ****
0 -> 1
1 -> 2
2 -> 2

Learning policy for edge 0 -> 1



  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")



Steps taken at iteration 0: 1260
Time taken at iteration 0: 0.006488247712453207 mins
Expected reward at iteration 0: -439.8662170376224

Steps taken at iteration 1: 2520
Time taken at iteration 1: 0.012099850177764892 mins
Expected reward at iteration 1: -422.007487901614

Learning policy for edge 1 -> 2


Steps taken at iteration 0: 1260
Time taken at iteration 0: 0.00540237029393514 mins
Expected reward at iteration 0: -55.07375798624198

Steps taken at iteration 1: 2520
Time taken at iteration 1: 0.010900564988454183 mins
Expected reward at iteration 1: -54.456999149135555


In [None]:
time_taken_score_graph = DIRLTimeTakenScoreGraph(adj_list, path_policies)
e = 0.1
n_samples = 500
total_buckets = 100
vbs = bucketed_conformal_pred(time_taken_score_graph, e, total_buckets, n_samples)
min_path, min_path_scores = all_paths_conformal_pred(time_taken_score_graph, e, n_samples)

vb = vbs.buckets[(terminal_vertices[0], total_buckets)]
print("Bucketed:")
print(vb.path)
print(vb.path_buckets)
print(vb.path_score_quantiles)
print(max(vb.path_score_quantiles))

print()
print("All paths:")
print(min_path)
print(min_path_scores)
print(max(min_path_scores))