In [1]:
import random
from dataclasses import dataclass, field
from typing import List, Dict, Any

import numpy as np
import os
from tampura.environment import TampuraEnv
from tampura.spec import ProblemSpec
from tampura.structs import (
    AbstractBelief,
    ActionSchema,
    StreamSchema,
    AliasStore,
    Belief,
    NoOp,
    Predicate,
    State,
    effect_from_execute_fn,
    Observation
)
import logging 
from tampura.symbolic import OBJ, Or, Atom, ForAll, Exists, OneOf, And, Not, Eq, negate
from tampura.policies.tampura_policy import TampuraPolicy
from tampura.config.config import get_default_config, setup_logger


PICK_SUCCESS = 0.7
DROP_SUCCESS = 0.8
PICK_BREAK_PROB = 0.5
DROP_BREAK_PROB = 0.7
OBJECTS = {f"{OBJ}o1":{"region":"r1","shape":"cube","material":"glass"}}#,f"{OBJ}o2":{"region":"r2","shape":"cube","material":"glass"}}#,f"{OBJ}o3":{"region":"r3","shape":"cube","material":"paper"}}
MATERIALS = ["glass","paper"]
DESIRED_SHAPE = "cube"
SHAPES = ["cube","sphere","pyramid"]
REGIONS =  {"r1":1,"r2":2,"r3":3,"robot":4,"goal":5}
THRESH = 0.5

In [20]:
# Observation space

@dataclass 
class ObjectObservation(Observation):
    
    located: List[str]=field(default_factory=lambda: [])
    at: Dict[str,str]=field(default_factory=lambda: {})
    shapes: Dict[str,str]=field(default_factory=lambda: [])

# Belief space
class ObjectsBelief(Belief):
    def __init__(self, located=[], at={}, shapes={}):
        
        self.located = located
        self.at=at
        self.shapes=shapes

    def update(self, a, o, s):
        
        located=o.located
        at=o.at
        shapes=o.shapes
        return ObjectsBelief(located=located,at=at,shapes=shapes) 

    def abstract(self, store: AliasStore):
        
        atoms = []
        
        for o in self.located:
            atoms.append(Atom("located",[o]))
        
        for o in self.at.keys():
            atoms.append(Atom("at",[o,self.at[o]]))
            
        for o in self.shapes.keys():
            atoms.append(Atom(self.shapes[o], [o]))
       
            
        return AbstractBelief(atoms)

    def vectorize(self):
        pass
    
# Sample function for stream schema
    
def sample_grasp(input_sym, store):
    
    g = random.random()
    
    # TODO: make more sophisticated
    
    return g

    
# Action simulators


def locate_execute_fn(a, b, s, store):
    
    o = a.args[0]

    located=b.located.copy()
    at=b.at.copy()
    shapes=b.shapes.copy()
    
    # TODO: improve 
    located=list(set(located + [o]))
    at[o]=OBJECTS[o]["region"]
    
    return State(), ObjectObservation(located=located,at=at,shapes=shapes)

    
def inspect_execute_fn(a, b, s, store):
    
    o = a.args[0]    
    
    located=b.located.copy()
    at=b.at.copy()
    shapes=b.shapes.copy()
    
    
    # Noisy detection
    actual_shape = OBJECTS[o]["shape"]
    max_prob = random.uniform(0.5,1.0) # assign maximum probability to the known shape
    shape_probs = []
    
    for shape in SHAPES:
        if shape == actual_shape:
            shape_probs.append(max_prob)
        else:
            shape_probs.append(random.uniform(0.0,max_prob))
    
    # normalise
    total = sum(shape_probs)
    shape_probs = [p/total for p in shape_probs] 
    
    max_prob = max(shape_probs)
    max_idx = shape_probs.index(max_prob)
    max_prob_shape = SHAPES[max_idx]
    
    if max_prob > THRESH:
        shapes[o] = max_prob_shape
    
    return State(), ObjectObservation(located=located,at=at,shapes=shapes)

def pick_execute_fn(a, b, s, store):
    
    o = a.args[0]
    g = a.args[1]
    r = a.args[2]
    
    located=b.located.copy()
    at=b.at.copy()
    shapes=b.shapes.copy()
    
    
    if random.random()<PICK_SUCCESS:
        at[o]="robot"
    

    return State(), ObjectObservation(located=located,at=at,shapes=shapes)
    

def drop_execute_fn(a, b, s, store):
    
    o = a.args[0]
    g = a.args[1]
    r = a.args[2]
    
    located=b.located.copy()
    at=b.at.copy()
    shapes=b.shapes.copy()
    
    
    if random.random()<DROP_SUCCESS:
        at[o]=r
    

    return State(), ObjectObservation(located=located,at=at,shapes=shapes)

# closed world assumption: not explicitly True predicates are false

# Set up environment dynamics
class ToyDiscrete(TampuraEnv):
    def initialize(self):
        
        store = AliasStore()
        
        for o in OBJECTS.keys():
            store.set(o,o,"physical")
            
        for region in REGIONS.keys():
            store.set(region,REGIONS[region],"region")
        
        return ObjectsBelief(), store

    def get_problem_spec(self) -> ProblemSpec:
        predicates = [ 
            Predicate("located",["physical"]),
            Predicate("at",["physical","region"]),
            Predicate("grasped",["physical","grasp"]),
        ] + [Predicate(shape,["physical"])for shape in SHAPES]
        
        stream_schemas = [
            StreamSchema(
                name="sample-grasp",
                inputs=["?o1"],
                input_types=["physical"],
                output="?g1",
                output_type="grasp",
                certified=[Atom("grasped",["?o1","?g1"])], # generates a valid grasp g1 for o1
                sample_fn=sample_grasp                
            ),

        ]
        
        action_schemas = [
            ActionSchema(
                name="locate",
                inputs=["?o1"],
                input_types=["physical"],
                # preconditions=[ForAll(Not(Atom("at",["?o1","?r"])),["?r"],["region"])], # not located yet
                # verify_effects=[OneOf([Atom("at",["?o1",r]) for r in REGIONS.keys()])],#[Exists(Atom("at",["?o1","?r"]),["?r"],["region"])], DONT USE VERIFY_EFFECTS FOR CERTAIN OUTCOMES!!!
                effects=[Exists(Atom("at",["?o1","?r"]),["?r"],["region"])],#=[Atom("located",["?o1"])],
                execute_fn=locate_execute_fn,
                effects_fn=effect_from_execute_fn(locate_execute_fn),
            ),
            
            ActionSchema(
                name="inspect",
                inputs=["?o1"],
                input_types=["physical"],
                preconditions=[Atom("located",["?o1"])],#+[Not(Atom(shape,["?o1"]))for shape in SHAPES], # not certain about shape yet
                # verify_effects=[OneOf([Atom(shape,["?o1"])for shape in SHAPES])],
                execute_fn=inspect_execute_fn,
                effects_fn=effect_from_execute_fn(inspect_execute_fn),
            ),           
             
            ActionSchema(
                name="pick",
                inputs=["?o1","?g1","?r1"],
                input_types=["physical","grasp","region"],
                preconditions=[Atom("at",["?o1","?r1"]), # ?o1 is in region ?r1
                               Atom("grasped",["?o1","?g1"]), # ?g1 is a valid grasp for ?o1
                               ForAll(Not(Atom("at",["?o","robot"])),["?o"],["physical"]), # robot hand is free
                               Atom(DESIRED_SHAPE,["?o1"]), # ?o1 is of the desired shape
                               Not(Atom("at",["?o1","robot"])), # current location is not the same as robot 
                               ], 
                verify_effects=[Atom("at", ["?o1","robot"])],
                execute_fn=pick_execute_fn,
                effects_fn=effect_from_execute_fn(pick_execute_fn),
            ),
            ActionSchema(
                name="place",
                inputs=["?o1","?g1","?r1"],
                input_types=["physical","grasp","region"],
                preconditions=[Atom("at",["?o1","robot"]), # robot is holding ?o1
                               Atom("grasped",["?o1","?g1"]), # ?g1 is a valid grasp for ?o1
                               Not(Atom("at",["?o1","robot"])), # current location is not the same as goal location
                               ], 
                verify_effects=[Atom("at", ["?o1","?r1"])],
                execute_fn=drop_execute_fn,
                effects_fn=effect_from_execute_fn
            ),
            NoOp(),
        ]
        
        reward = ForAll(Atom("at",["?o","goal"]) , ["?o"], ["physical"])# WORKS
        

        spec = ProblemSpec(
            predicates=predicates,
            stream_schemas=stream_schemas,
            action_schemas=action_schemas,
            reward=reward,
        )

        return spec



## Create environment and planner

In [21]:
# Planner
cfg = get_default_config(save_dir=os.getcwd())

# Set some print options to print out abstract belief, action, observation, and reward
cfg["print_options"] = "ab,a,o,r"
cfg["vis_graph"] = True
cfg["flat_sample"] = True
cfg["batch_size"] = 100
cfg["num_samples"] = 1000
cfg["max_steps"] = 20


# Initialize environment
env = ToyDiscrete(config=cfg)
b0, store = env.initialize()

# Set up logger to print info
setup_logger(cfg["save_dir"], logging.INFO)

# Initialize the policy
planner = TampuraPolicy(config = cfg, problem_spec = env.problem_spec)

## Run Planner
Make sure symk is installed (see README) before running the Tampura planner.
With the default settings, the planner should pick both every time.

In [22]:
print(store)
history,store = planner.rollout(env, b0, store)

AliasStore(als={'o_o1': 'o_o1', 'r1': 1, 'r2': 2, 'r3': 3, 'robot': 4, 'goal': 5}, als_type={'o_o1': 'physical', 'r1': 'region', 'r2': 'region', 'r3': 'region', 'robot': 'region', 'goal': 'region'}, alph_counter={}, certified=[], sample_counts={}, branching_factor={})

Abstract Belief: AbstractBelief(items=[])
Reward: 0.0
Flat Stream Sampling
Sampling StreamSchema(name='sample-grasp', inputs=['?o1'], input_types=['physical'], output='?g1', output_type='grasp', preconditions=[], certified=[Atom(pred_name='grasped', args=['?o1', '?g1'])], sample_fn=<function sample_grasp at 0x7102c04e5240>)(['o_o1'])


FileNotFoundError: [Errno 2] No such file or directory: '/home/am/IsaacLab/tampura_in_isaac_lab/tampura/notebooks/runs/run1728893274.0468898/pddl_t=0_s=0/output.sas'

In [33]:
history.actions

[Action(name='locate', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='pick', args=['o_o1', 'o_gr_0', 'o_re_0']),
 Action(name='pick', args=['o_o1', 'o_gr_0', 'r1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 Action(name='inspect', args=['o_o1']),
 None]

In [29]:
history.abstract_beliefs

[AbstractBelief(items=[]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'robot']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'robot']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at', args=['o_o1', 'r1']), Atom(pred_name='cube', args=['o_o1'])]),
 AbstractBelief(items=[Atom(pred_name='at',

In [62]:
[r for r in REGIONS]

['r1', 'r2', 'r3', 'robot', 'goal']