In [59]:
import random


class Object:
    def __init__(
        self, name: str, type: str, loc_probs: dict, dependent_probs: dict = None
    ) -> None:
        """Entity, e.g. human, object, room.

        Args
        ----
        name: e.g., Tae, laptop, bed
        type: e.g., static, independent, dependent
        loc_probs: e.g., {"bedroom": 0.5, "living_room": 0.5}
        dependent_probs: This only applies to dependent objects.

        """
        self.name = name
        self.type = type
        assert self.type in ["static", "independent", "dependent", "agent"]
        self.loc_probs = loc_probs
        self.loc = random.choices(
            list(self.loc_probs.keys()),
            weights=list(self.loc_probs.values()),
            k=1,
        )[0]
        self.dependent_probs = dependent_probs

    def __repr__(self) -> str:
        return f"Entity(name: {self.name}, type: {self.type}, location: {self.loc})"

    def change_location(self, loc: str) -> None:
        self.loc = loc


In [77]:
import random
from pprint import pprint


class Pomdp:
    def __init__(
        self, rooms: list = ["bedroom", "officeroom", "livingroom"], seed: int = 42
    ) -> None:
        random.seed(seed)
        self.rooms = rooms

    def reset(self):
        self.objects = []

        # add static objects
        self.objects.append(
            Object(
                "bed",
                "static",
                {"officeroom": 0, "livingroom": 0, "bedroom": 1},
            )
        )
        self.objects.append(
            Object(
                "desk",
                "static",
                {"officeroom": 1, "livingroom": 0, "bedroom": 0},
            )
        )
        self.objects.append(
            Object(
                "table",
                "static",
                {"officeroom": 0, "livingroom": 1, "bedroom": 0},
            )
        )

        # add independent objects
        self.objects.append(
            Object(
                "Tae",
                "independent",
                {"officeroom": 0.5, "livingroom": 0.5, "bedroom": 0},
            )
        )
        self.objects.append(
            Object(
                "Michael",
                "independent",
                {"officeroom": 0, "livingroom": 0.5, "bedroom": 0.5},
            )
        )
        self.objects.append(
            Object(
                "Vincent",
                "independent",
                {"officeroom": 0.5, "livingroom": 0, "bedroom": 0.5},
            )
        )

        # add dependent objects
        self.objects.append(
            Object(
                "laptop",
                "dependent",
                {"officeroom": 0.5, "livingroom": 0.5, "bedroom": 0},
                {"Tae": 0.7, "Michael": 0.1, "Vincent": 0.3},
            )
        )
        self.objects.append(
            Object(
                "phone",
                "dependent",
                {"officeroom": 0, "livingroom": 0.5, "bedroom": 0.5},
                {"Tae": 0.3, "Michael": 0.1, "Vincent": 0.7},
            )
        )

        # add agent
        self.objects.append(
            Object(
                "agent",
                "agent",
                {"officeroom": 0.333, "livingroom": 0.333, "bedroom": 0.333},
            )
        )

        # get basic info
        self.num_rooms = len(self.rooms)

        self.num_static_objects = sum(
            [1 for obj in self.objects if obj.type == "static"]
        )
        self.num_independent_objects = sum(
            [1 for obj in self.objects if obj.type == "independent"]
        )
        self.num_dependent_objects = sum(
            [1 for obj in self.objects if obj.type == "dependent"]
        )

        self.num_hidden_states = self.num_rooms ** (
            self.num_independent_objects + self.num_dependent_objects + 1
        )
        self.num_observations = (
            2 ** (self.num_independent_objects + self.num_dependent_objects)
            * self.num_rooms
        )
        self.time = 0

        return self._get_observation()

    def _compute_hidden_state(self):
        if self.time == 0:
            self.hidden_state_room_perspective = {room: [] for room in self.rooms}
            for obj in self.objects:
                self.hidden_state_room_perspective[obj.loc].append(obj)

            self.hidden_state_kg_perspective = [
                (obj.name, "AtLocation", obj.loc)
                for room, objects in self.hidden_state_room_perspective.items()
                for obj in objects
            ]
        else:
            for room, objects in self.hidden_state_room_perspective.items():
                independent_objects = [
                    obj for obj in objects if obj.type == "independent"
                ]
                dependent_objects = [obj for obj in objects if obj.type == "dependent"]

                for do in dependent_objects:
                    matches =[]
                    for io in independent_objects:
                        if io.name in list(do.dependent_probs.keys()):
                            matches.append(io)


                
                for obj in objects:
                    if obj.type == "static":
                        continue

                    if obj.type == "independent":
                        obj.change_location(
                            random.choices(
                                list(obj.loc_probs.keys()),
                                weights=list(obj.loc_probs.values()),
                                k=1,
                            )[0]
                        )
                    elif obj.type == "dependent":
                        if (
                            random.random()
                            < obj.dependent_probs[
                                self.hidden_state_room_perspective[obj.loc][0].name
                            ]
                        ):
                            obj.change_location(
                                random.choices(
                                    list(obj.loc_probs.keys()),
                                    weights=list(obj.loc_probs.values()),
                                    k=1,
                                )[0]
                            )

    def _get_observation(self):
        self._compute_hidden_state()
        for obj in self.objects:
            if obj.name == "agent":
                self.agent_location = obj.loc

        self.sub_graph_room_perspective = [
            obj for obj in self.objects if obj.loc == self.agent_location
        ]
        self.sub_graph_kg_perspective = [
            (obj.name, "AtLocation", obj.loc, self.time)
            for obj in self.sub_graph_room_perspective
        ]
        self.question = random.choice(self.hidden_state_kg_perspective)

        return self.sub_graph_kg_perspective, self.question

    def step(self):
        self.time += 1
        self._get_observation()


pomdp = Pomdp(seed=42)
observation, question = pomdp.reset()
print(f"observation: {observation}")
print()
print(f"question: {question}")

# pprint(f"objects: {pomdp.objects}")
# print()
# pprint(f"hidden_state_room_perspective: {pomdp.hidden_state_room_perspective}")
# print()
# pprint(f"hidden_state_kg_perspective: {pomdp.hidden_state_kg_perspective}")
# print()
# pprint(f"sub_graph_room_perspective: {pomdp.sub_graph_room_perspective}")
# print()
# pprint(f"sub_graph_kg_perspective: {pomdp.sub_graph_kg_perspective}")
# print()
# pprint(f"question: {pomdp.question}\n")
# print()


observation: [('table', 'AtLocation', 'livingroom', 0), ('laptop', 'AtLocation', 'livingroom', 0), ('phone', 'AtLocation', 'livingroom', 0), ('agent', 'AtLocation', 'livingroom', 0)]

question: ('bed', 'AtLocation', 'bedroom')


In [78]:
observation, question = pomdp.reset()
pomdp.hidden_state_room_perspective

{'bedroom': [Entity(name: bed, type: static, location: bedroom),
  Entity(name: Michael, type: independent, location: bedroom),
  Entity(name: Vincent, type: independent, location: bedroom)],
 'officeroom': [Entity(name: desk, type: static, location: officeroom),
  Entity(name: laptop, type: dependent, location: officeroom),
  Entity(name: agent, type: agent, location: officeroom)],
 'livingroom': [Entity(name: table, type: static, location: livingroom),
  Entity(name: Tae, type: independent, location: livingroom),
  Entity(name: phone, type: dependent, location: livingroom)]}

In [62]:
pomdp.objects[0].change_location("livingroom")

In [63]:
pomdp.objects[0]

Entity(name: bed, type: static, location: livingroom)

In [64]:
pomdp.objects[0].change_location("officeroom")

In [65]:
pomdp.objects[0]

Entity(name: bed, type: static, location: officeroom)

In [2]:
pomdp.time


0

In [50]:
pomdp.entities[0]


<__main__.Entity at 0x7fbd1ee355b0>

In [4]:
baz = {"foo": 0.2, "bar": 0.8}
random.sample(baz)


TypeError: sample() missing 1 required positional argument: 'k'

In [11]:
random.choice?

[0;31mSignature:[0m [0mrandom[0m[0;34m.[0m[0mchoice[0m[0;34m([0m[0mseq[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m Choose a random element from a non-empty sequence.
[0;31mFile:[0m      /usr/lib/python3.9/random.py
[0;31mType:[0m      method

In [21]:
random.choices(list(baz.keys()), weights=list(baz.values()), k=1)[0]


'bar'

In [None]:
import random


class Pomdp:
    def __init__(
        self,
        humans: dict,
        entities: dict,
        world_knowledge: list,
        init_loc_probs: list,
        seed: int = 42,
    ) -> None:
        """POMDP"""
        self.seed = seed
        random.seed(self.seed)
        self.humans = humans
        self.objects = entities
        self.world_knowledge = world_knowledge

        self.static_objects = [
            entity
            for entity, characteristic in entities.items()
            if characteristic["type"] == "static_object"
        ]
        self.non_static_objects = [
            entity
            for entity, characteristic in entities.items()
            if characteristic["type"] == "non_static_object"
        ]
        self.rooms = [
            entity
            for entity, characteristic in entities.items()
            if characteristic["type"] == "room"
        ]

        self.objects = self.static_objects + self.non_static_objects
        self.world_knowledge = world_knowledge
        self.init_loc_probs = init_loc_probs

        self.num_humans = len(self.humans)  # number of humans
        self.num_static_objects = len(self.static_objects)  # number of static objects
        self.num_non_static_objects = len(
            self.non_static_objects
        )  # number of static objects
        self.num_rooms = len(self.rooms)  # number of rooms

        self.num_hidden_states = self.num_rooms ** (
            self.num_humans + self.num_non_static_objects + 1
        )
        self.num_observations = (
            2 ** (self.num_humans + self.num_non_static_objects) * self.num_rooms
        )
        self._init_environment()

    def _init_environment(self) -> None:
        """Populate the rooms with objects and humans at uniformly random."""
        self.env = {
            "bedroom": {"static_objects": []},
            "non_static_objects": [],
            "humans": [],
        }
        self.env["bedroom"] = {"static_object": ["bed"]}
        self.env["officeroom"] = {"static_object": ["desk"]}
        self.env["livingroom"] = {"static_object": ["table"]}

        for human in self.humans:
            room = random.choice(["bedroom", "officeroom", "livingroom"])
            self.env[room].append(human)

        for obj in self.non_static_objects:
            room = random.choice(["bedroom", "officeroom", "livingroom"])
            self.env[room].append(obj)

        room = random.choice(["bedroom", "officeroom", "livingroom"])
        self.env[room].append("agent")

    def step(self, action: int) -> tuple:
        """Take a step in the environment."""
        self.env["agent"] = ("AtLocation", self.rooms[action])


config = {
    "seed": 42,
    "humans": {
        "Tae": {
            "room": {"bedroom": 0.7, "officeroom": 0.1, "livingroom": 0.2},
            "object": {
                "laptop": 0.3,
                "phone": 0.3,
            },
        },
        "Michael": {
            "room": {"bedroom": 0.2, "officeroom": 0.7, "livingroom": 0.1},
            "object": {
                "laptop": 0.9,
            },
        },
        "Vincent": {
            "room": {"bedroom": 0.1, "officeroom": 0.2, "livingroom": 0.7},
            "object": {
                "phone": 0.8,
            },
        },
    },
    "entities": {
        "bedroom": {"type": "room"},
        "officeroom": {"type": "room"},
        "livingroom": {"type": "room"},
        "bed": {"type": "static_object"},
        "desk": {"type": "static_object"},
        "table": {"type": "static_object"},
        "laptop": {"type": "non_static_object"},
        "phone": {"type": "non_static_object"},
        "Tae": {"type": "human"},
        "Michael": {"type": "human"},
        "Vincent": {"type": "human"},
        "agent": {"type": "agent"},
    },
    "world_knowledge": [
        ("static_object", "PartOf", "room"),
        ("non_static_object", "AtLocation", "static_object"),
        ("human", "AtLocation", "room"),
        ("agent", "AtLocation", "room"),
        ("human", "Owns", "non_static_object"),
    ],
    "init_loc_probs": [
        {"bed": [("PartOf", "bedroom", 1.0)]},
        {"desk": [("PartOf", "officeroom", 1.0)]},
        {"bed": [("PartOf", "livingroom", 1.0)]},
        {"laptop": [("AtLocation", "desk", 0.7), ("AtLocation", "bed", 0.3)]},
        {"phone": [("AtLocation", "table", 0.7), ("AtLocation", "bed", 0.3)]},
        {
            "Tae": [
                ("AtLocation", "bedroom", 0.7),
                ("AtLocation", "officeroom", 0.1),
                ("AtLocation", "livingroom", 0.2),
            ]
        },
        {
            "Michael": [
                ("AtLocation", "bedroom", 0.2),
                ("AtLocation", "officeroom", 0.7),
                ("AtLocation", "livingroom", 0.1),
            ]
        },
        {
            "Vincent": [
                ("AtLocation", "bedroom", 0.1),
                ("AtLocation", "officeroom", 0.2),
                ("AtLocation", "livingroom", 0.7),
            ]
        },
        {
            "agent": [
                ("AtLocation", "bedroom", 0.333),
                ("AtLocation", "officeroom", 0.333),
                ("AtLocation", "livingroom", 0.333),
            ]
        },
    ],
}

pomdp = Pomdp(**config)
pomdp.humans, pomdp.objects, pomdp.rooms


In [None]:
class Entity:
    def __init__(self, name: str, kind: str) -> None:
        """Entity"""
        self.name = name
        self.kind


In [None]:
pomdp.env


In [None]:
pomdp.step(1)


In [None]:
pomdp.env


In [None]:
random.choice(["bedroom", "officeroom", "livingroom"])


In [None]:
pomdp.humans.keys(), pomdp.rooms


In [None]:
pomdp.hidden_states


In [None]:
pomdp.num_humans, pomdp.num_static_objects, pomdp.num_non_static_objects, pomdp.num_rooms, pomdp.num_hidden_states, pomdp.num_observations


In [None]:
pomdp.num_observations, pomdp.num_hidden_states


In [None]:
list(config["objects"].keys())


In [None]:
len(config["objects"])


In [None]:
w = np.array(
    [
        [0, 0.4, 0.8, 0],
        [1, 0, 0, 0.7],
        [0, 0, 0, 0.3],
        [0, 0.6, 0.2, 0],
    ]
)
w_ = np.eye(4)

foo = []
for i in range(1000):
    w_ = w @ w_
    print(w_)


In [None]:
import numpy as np

alpha = 0.3
beta = 0.7

w = np.array(
    [
        [1 - beta, beta, 0, alpha * beta],
        [(1 - alpha) * beta, 1 - beta, 0, 0],
        [0, 0, 1 - beta, (1 - alpha) * beta],
        [alpha * beta, 0, beta, 1 - beta],
    ]
)
w_ = np.eye(4)

foo = []
for i in range(1000):
    w_ = w @ w_
    print(w_)


In [None]:
import numpy as np

alpha = 0.3
beta = 0.7

w = np.array(
    [
        [0.3, 0.7, 0, 0],
        [0.2, 0.3, 0, 0],
        [0, 0, 0.3, 0.5],
        [0.5, 0, 0.7, 0.5],
    ]
)
w_ = np.eye(4)

foo = []
for i in range(1000):
    w_ = w @ w_
    print(w_)


In [None]:
w = np.array(
    [
        [0, 0, 1],
        [1, 0, 0],
        [0, 1, 0],
    ]
)
w_ = np.eye(3)

foo = []
for i in range(1000):
    w_ = w @ w_
    print(w_)


In [None]:
import numpy as np

alpha = 0.3
beta = 0.7

w = np.array(
    [
        [1 - alpha, alpha * (1 - beta), 0, alpha * beta],
        [alpha * (1 - beta), 1 - alpha, 0, 0],
        [0, 0, 1 - alpha, alpha * (1 - beta)],
        [alpha * beta, 0, alpha * (1 - beta), 1 - alpha],
    ]
)
w_ = np.eye(4)

foo = []
for i in range(1000):
    w_ = w @ w_
    print(w_)
    foo.append(w_[0, 0])


In [None]:
np.random.randint(0, 10, 5)


In [None]:
import numpy as np


w = np.array(
    [
        [0, 0.2, 0],
        [1, 0, 1],
        [0, 0.8, 0],
    ]
)
x = np.array([1 / 3, 1 / 3, 1 / 3])

for i in range(100):
    x = w @ x
    print(x)
    print()


In [None]:
np.random.randint(0, 10, 5)


In [None]:
import numpy as np

w = np.random.randint(0, 10, (5, 5))
w = w / w.sum(axis=1)[:, np.newaxis]

x = np.random.randint(0, 10, 5)
x = x / x.sum()
# foo, foo.sum(axis=1)

for i in range(100):
    x = w @ x
    print(x)


In [None]:
x


In [None]:
w


In [None]:
foo.sum(axis=1)[:, np.newaxis]


In [None]:
import numpy as np


w = np.array(
    [
        [3 / 4, 1 / 4, 0, 0, 0],
        [3 / 4, 0, 1 / 4, 0, 0],
        [3 / 4, 0, 0, 1 / 4, 0],
        [3 / 4, 0, 0, 0, 1 / 4],
        [1, 0, 0, 0, 0],
    ]
)
w_ = np.eye(5)

for i in range(100):
    w_ = np.dot(w, w_)
    print(w_)


In [None]:
w.shape, x.shape
