In [1]:
import random
from dataclasses import dataclass, field
from typing import List

import numpy as np
import os
from tampura.environment import TampuraEnv
from tampura.spec import ProblemSpec
from tampura.structs import (
    AbstractBelief,
    ActionSchema,
    AliasStore,
    Belief,
    NoOp,
    Predicate,
    State,
    effect_from_execute_fn,
    Observation
)
import logging 
from tampura.symbolic import OBJ, Atom, ForAll
from tampura.policies.tampura_policy import TampuraPolicy
from tampura.config.config import get_default_config, setup_logger

PICK_ONE_SUCCESS = 0.8
PICK_BOTH_SUCCESS = 0.5
OBJECTS = [f"{OBJ}o1", f"{OBJ}o2"]


In [2]:

# Observation space
@dataclass
class HoldingObservation(Observation):
    holding: List[str] = field(default_factory=lambda: [])

# Belief space
class HoldingBelief(Belief):
    def __init__(self, holding=[]):
        self.holding = holding

    def update(self, a, o, s):
        return HoldingBelief(holding=o.holding)

    def abstract(self, store: AliasStore):
        return AbstractBelief([Atom("holding", [o]) for o in self.holding])

    def vectorize(self):
        return np.array([int(obj in self.holding) for obj in OBJECTS])

# Action simulators
def pick_execute_fn(a, b, s, store):
    holding = (
        list(set(b.holding + list(a.args))) if random.random() < PICK_ONE_SUCCESS else b.holding
    )
    return State(), HoldingObservation(holding)


def pick_both_execute_fn(a, b, s, store):
    holding = (
        list(set(b.holding + list(a.args))) if random.random() < PICK_BOTH_SUCCESS else b.holding
    )
    return State(), HoldingObservation(holding)


# Set up environment dynamics
class ToyDiscrete(TampuraEnv):
    def initialize(self):
        store = AliasStore()
        for o in OBJECTS:
            store.set(o, o, "physical")

        return HoldingBelief(), store

    def get_problem_spec(self) -> ProblemSpec:
        predicates = [
            Predicate("holding", ["physical"]),
        ]

        action_schemas = [
            ActionSchema(
                name="pick",
                inputs=["?o1"],
                input_types=["physical"],
                verify_effects=[Atom("holding", ["?o1"])],
                execute_fn=pick_execute_fn,
                effects_fn=effect_from_execute_fn(pick_execute_fn),
            ),
            ActionSchema(
                name="pick-both",
                inputs=["?o1", "?o2"],
                input_types=["physical", "physical"],
                verify_effects=[Atom("holding", ["?o1"]), Atom("holding", ["?o2"])],
                execute_fn=pick_both_execute_fn,
                effects_fn=effect_from_execute_fn(pick_both_execute_fn),
            ),
            NoOp(),
        ]

        reward = ForAll(Atom("holding", ["?o"]), ["?o"], ["physical"])

        spec = ProblemSpec(
            predicates=predicates,
            action_schemas=action_schemas,
            reward=reward,
        )

        return spec



## Create environment and planner

In [3]:
# Planner
cfg = get_default_config(save_dir=os.getcwd())

# Set some print options to print out abstract belief, action, observation, and reward
cfg["print_options"] = "ab,a,o,r"
cfg["vis_graph"] = True
cfg["batch_size"] = 100
cfg["num_samples"] = 1000

# Initialize environment
env = ToyDiscrete(config=cfg)
b0, store = env.initialize()

# Set up logger to print info
setup_logger(cfg["save_dir"], logging.INFO)

# Initialize the policy
planner = TampuraPolicy(config = cfg, problem_spec = env.problem_spec)

## Run Planner
Make sure symk is installed (see README) before running the Tampura planner.
With the default settings, the planner should pick both every time.

In [5]:
_ = planner.rollout(env, b0, store)


Abstract Belief: AbstractBelief(items=[])
Reward: 0.0
Action: pick-both(o_o1, o_o2)
Observation: HoldingObservation(holding=['o_o2', 'o_o1'])

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['o_o1']), Atom(pred_name='holding', args=['o_o2'])])
Reward: 1.0
Action: no-op()
Observation: None

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['o_o1']), Atom(pred_name='holding', args=['o_o2'])])
Reward: 1.0
Action: no-op()
Observation: None

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['o_o1']), Atom(pred_name='holding', args=['o_o2'])])
Reward: 1.0
Action: no-op()
Observation: None

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['o_o1']), Atom(pred_name='holding', args=['o_o2'])])
Reward: 1.0
Action: no-op()
Observation: None

Abstract Belief: AbstractBelief(items=[Atom(pred_name='holding', args=['o_o1']), Atom(pred_name='holding', args=['o_o2'])])
Reward: 1.0
Action: no-op()
Observation: None

A