In [8]:
import random
from dataclasses import dataclass, field
from typing import List,Dict

import copy
import itertools 
import time
from tampura.policies.policy import save_config, RolloutHistory, save_run_data

import numpy as np
import os
from tampura.environment import TampuraEnv
from tampura.spec import ProblemSpec
from tampura.structs import (
    AbstractBelief,
    ActionSchema,
    StreamSchema,
    AliasStore,
    Belief,
    NoOp,
    Predicate,
    State,
    effect_from_execute_fn,
    Observation,
    AbstractBeliefSet,
)
import logging 
from tampura.symbolic import OBJ, Atom, ForAll, Not, Exists, Or, And, OneOf, eval_expr
from tampura.policies.tampura_policy import TampuraPolicy
from tampura.config.config import load_config, setup_logger

ROB = "robot_"
REG = "region_"
MUG = "mug"
DOOR = "door"
REGIONS = [f"{REG}{MUG}",f"{REG}{DOOR}",f"{REG}stable_mug"]
ACTION_NAMES = ["transit_action","transfer_action","pick_action","place_action","open_action","close_action","nothing_action"]

# problem specification: try with just one robot to demonstrate how overall cost increases
ROBOTS=[f"{ROB}1",f"{ROB}2"]
ROB_REGIONS = {ROBOTS[0]:REGIONS[-1],ROBOTS[1]:REGIONS[-1]} # long horizon: combinatorial explosion
# ROB_REGIONS = {ROBOTS[0]:REGIONS[1],ROBOTS[1]:REGIONS[0]} # short horizon: kind of works?
OBJ_REGIONS={MUG:REGIONS[0]}

# Test 
GOAL = And([Atom("clean",[REGIONS[0]]),Atom("in_obj",[MUG,REGIONS[0]])])

In [None]:
# Centralized planner
# State of the environment

# Belief space
class CentralBelief(Belief):
    
    def __init__(self, holding={},obj_regions={},clean=[],next_actions=[],turn=ROBOTS[0]):
        # true state
        self.holding = holding.copy()
        self.obj_regions = obj_regions.copy()
        self.clean = clean.copy()
        self.turn = turn
    
        

    def update(self, a, o, s):
        
        # dictionary mutations are IN-PLACE!!! use .copy()!!
        holding = self.holding.copy() 
        obj_regions = self.obj_regions.copy()
        clean = self.clean.copy()
        turn = self.turn
        
        
        # BE CAREFUL: update names if you change action schema names
        if a.name == "pick":
            holding[a.args[0]]=[a.args[1]]
            obj_regions[a.args[1]]=""
        elif a.name == "place":
            holding[a.args[0]]=[]
            obj_regions[a.args[1]]=a.args[2]
        elif a.name == "clean":
            clean.append(a.args[1])
            
        turn=a.args[-1] # turn of the agent
            
        return CentralBelief(holding=holding,clean=clean,obj_regions=obj_regions,turn=turn)

    def abstract(self, store: AliasStore):
        
        ab = []
        
        # true state
        for rob in self.holding.keys():
            ab += [Atom("holding",[rob,obj]) for obj in self.holding[rob]]
        
        for obj in self.obj_regions.keys():
            if self.obj_regions[obj] !="":
                ab += [Atom("in_obj",[obj,self.obj_regions[obj]])]
        
        for reg in self.clean:
            ab += [Atom("clean",[reg])]
        
        ab += [Atom("turn",[self.turn])]
            
        return AbstractBelief(ab)

    # def vectorize(self):
    #     return np.array([int(obj in self.holding) for obj in OBJECTS])
      

def deterministic_execute_fn(a, b, s, store):
    return State(), Observation()
    
def deterministic_effects_fn(a, b, store):
    o = Observation()    
    new_belief=b.update(a,o,store)
    return AbstractBeliefSet.from_beliefs([new_belief], store)

# Set up environment dynamics
class ToyDiscreteCentral(TampuraEnv):
    
    def initialize(self,holding,obj_regions,clean,turn):
        
        store = AliasStore()
        
        for rob in ROBOTS:
            
            store.set(rob, rob, "robot")
        # store.set(ego,ego,"robot")
            
        for region in REGIONS:
            store.set(region, region, "region")
        
        store.set(MUG, MUG, "physical")
        
        store.certified.append(Atom("stable",[MUG,REGIONS[0]]))
        store.certified.append(Atom("stable",[MUG,REGIONS[2]]))
        

        b = CentralBelief(holding=holding,obj_regions=obj_regions,clean=clean,turn=turn)

        return b, store

    def get_problem_spec(self) -> ProblemSpec:
        

        predicates = [
            
            Predicate("holding", ["robot","physical"]),
            Predicate("stable",["physical","region"]),
            Predicate("in_obj",["physical","region"]),
            Predicate("clean",["region"]),
            Predicate("turn",["robot"])
        ] 
        
        # modify preconditions, effects and execute functions for observation
        action_schemas = [
            
            # ego-agent
            ActionSchema(
                name="pick",
                inputs=["?rob1","?obj1","?reg1","?rob2"],
                input_types=["robot","physical","region","robot"],
                preconditions=[Atom("turn",["?rob1"]),Not(Atom("turn",["?rob2"])),
                               Atom("in_obj",["?obj1","?reg1"]), # object is in region from where pick is attempted
                               Not(Exists(Atom("holding",["?rob1","?obj"]),["?obj"],["physical"])), # robot hand is free
                               ],
                effects=[Atom("holding",["?rob1","?obj1"]),Not(Atom("in_obj",["?obj1","?reg1"])),Atom("turn",["?rob2"]),Not(Atom("turn",["?rob1"]))],
                execute_fn=deterministic_execute_fn,
                effects_fn=deterministic_effects_fn,
            ),
            
            
            ActionSchema(
                name="place",
                inputs=["?rob1","?obj1","?reg1","?rob2"],
                input_types=["robot","physical","region","robot"],
                preconditions=[Atom("turn",["?rob1"]),Not(Atom("turn",["?rob2"])),
                               Not(Atom("in_obj",["?obj1","?reg1"])), # object is not in region where place is attempted
                               Atom("holding",["?rob1","?obj1"]), # robot holds the object
                               ],
                effects=[Not(Atom("holding",["?rob1","?obj1"])),Atom("in_obj",["?obj1","?reg1"]),Atom("turn",["?rob2"]),Not(Atom("turn",["?rob1"]))],
                execute_fn=deterministic_execute_fn,
                effects_fn=deterministic_effects_fn,
            ),
            
            ActionSchema(
                name="clean",
                inputs=["?rob1","?reg1","?rob2"],
                input_types=["robot","region","robot"],
                preconditions=[Atom("turn",["?rob1"]),Not(Atom("turn",["?rob2"])),
                               Not(Exists(Atom("in_obj",["?obj","?reg1"]),["?obj"],["physical"])), # region is free
                               Not(Atom("clean",["?reg1"])), # region is unclean
                               Not(Exists(Atom("holding",["?rob1","?obj"]),["?obj"],["physical"])), # robot hand is free
                               ],
                effects=[Atom("clean",["?reg1"]),Atom("turn",["?rob2"]),Not(Atom("turn",["?rob1"]))],
                effects_fn=deterministic_effects_fn,
                execute_fn=deterministic_execute_fn,
            ),
            
            ActionSchema(
                name="nothing",
                inputs=["?rob1","?rob2"],
                input_types=["robot","robot"],
                preconditions=[Atom("turn",["?rob1"]),Not(Atom("turn",["?rob2"]))],
                effects=[Atom("turn",["?rob2"]),Not(Atom("turn",["?rob1"])),Atom("turn",["?rob2"]),Not(Atom("turn",["?rob1"]))],
                effects_fn=deterministic_effects_fn,
                execute_fn=deterministic_execute_fn,
            ),
        ]
        
        
        reward = GOAL

        spec = ProblemSpec(
            predicates=predicates,
            action_schemas=action_schemas,
            reward=reward,
        )

        return spec



In [10]:
# testing
# Planner
cfg = load_config(config_file="../tampura/config/default.yml")

# Set some print options to print out abstract belief, action, observation, and reward
cfg["print_options"] = "ab,a,o,r"
cfg["vis_graph"] = True
# batch size 100, num samples 500 num skeletons 100 works best!!
cfg["batch_size"] = 100 #100 
cfg["num_samples"] = 100#500
cfg["max_steps"] = 15
cfg["num_skeletons"] = 10
cfg["flat_sample"] = False # TODO: check; may cause progressive widening
cfg['save_dir'] = os.getcwd()+"/runs/run{}".format(time.time())

# cfg['from_scratch'] = False # imp: re-use!!! but graph gets too big

# TODO: check - can we reuse the same environment for both agents?
# for robot1
# Initialize environment
env = ToyDiscreteCentral(config=cfg)
b0, store= env.initialize(holding={ROBOTS[0]:[],ROBOTS[1]:[]},clean=[REGIONS[-1]],
                          obj_regions={MUG:REGIONS[0]},turn=ROBOTS[0])

# Set up logger to print info
setup_logger(cfg["save_dir"], logging.INFO)

# Initialize the policy
planner = TampuraPolicy(config = cfg, problem_spec = env.problem_spec)

In [11]:
b=b0
assert env.problem_spec.verify(store)

save_config(planner.config, planner.config["save_dir"])

history = RolloutHistory(planner.config)
st = time.time()
for step in range(100):
# while True:
    s = copy.deepcopy(env.state)
    a_b = b.abstract(store)
    reward = env.problem_spec.get_reward(a_b, store)
    if reward:
        print("goal achieved")
        break

    logging.info("\n" + ("=" * 10) + "t=" + str(step) + ("=" * 10))
    if "s" in planner.print_options:
        logging.info("State: " + str(s))
    if "b" in planner.print_options:
        logging.info("Belief: " + str(b))
    if "ab" in planner.print_options:
        logging.info("Abstract Belief: " + str(a_b))
    if "r" in planner.print_options:
        logging.info("Reward: " + str(reward))
    
    
    action, info, store = planner.get_action(b, store) # should only call effects functions!!??
    
    
    if "a" in planner.print_options:
        logging.info("Action: " + str(action))

    if action.name == "no-op":
        bp = copy.deepcopy(b)
        observation = None
    else:
        observation = env.step(action, b, store) # should call execute function
        bp = b.update(action, observation, store)

        if planner.config["vis"]:
            env.vis_updated_belief(bp, store)

    a_bp = bp.abstract(store)
    history.add(s, b, a_b, action, observation, reward, info, store, time.time() - st)

    reward = env.problem_spec.get_reward(a_bp, store)
    
    if "o" in planner.print_options:
        logging.info("Observation: " + str(observation))
    if "sp" in planner.print_options:
        logging.info("Next State: " + str(env.state))
    if "bp" in planner.print_options:
        logging.info("Next Belief: " + str(bp))
    if "abp" in planner.print_options:
        logging.info("Next Abstract Belief: " + str(a_bp))
    if "rp" in planner.print_options:
        logging.info("Next Reward: " + str(reward))

    # update the belief
    b = bp

history.add(env.state, bp, a_bp, None, None, reward, info, store, time.time() - st)

logging.info("=" * 20)

env.wrapup()

if not planner.config["real_execute"]:
    save_run_data(history, planner.config["save_dir"])



Abstract Belief: AbstractBelief(items=[Atom(pred_name='turn', args=['robot_1']), Atom(pred_name='in_obj', args=['mug', 'region_mug']), Atom(pred_name='clean', args=['region_stable_mug'])])
Reward: 0.0
[TampuraPolicy] Flat Action Parameter Sampling
[TampuraPolicy] Outcome Sampling


100%|██████████| 100/100 [00:00<00:00, 210.13it/s]

[TampuraPolicy] MDP Solving
Action: pick(robot_1, mug, region_mug, robot_2)
Observation: Observation()

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['robot_1', 'mug']), Atom(pred_name='turn', args=['robot_2']), Atom(pred_name='clean', args=['region_stable_mug'])])
Reward: 0.0
[TampuraPolicy] MDP Solving
Action: clean(robot_2, region_mug, robot_1)





TypeError: 'NoneType' object is not callable