In [None]:
import os
import random
import torch

In [None]:
objects_masterlist = open("../../object_list.txt", "r").read().splitlines()
tables = [o for o in objects_masterlist if o.startswith("table")]
supplyshelf = [o for o in objects_masterlist if o.startswith("supply")]
surfaces = tables + supplyshelf
room = [o for o in objects_masterlist if o.startswith("room")]
objects = [o for o in objects_masterlist if o not in tables and o not in supplyshelf and o not in room]

print("Tables:", ', '.join(tables))
print("Supply Shelves:", ', '.join(supplyshelf))
print("Rooms:", ', '.join(room))
print("Objects:", ', '.join(objects))

In [None]:
static_graph_edges = torch.zeros(len(objects_masterlist), len(objects_masterlist), dtype=torch.float32)
active_edges_mask = torch.zeros_like(static_graph_edges, dtype=torch.float32)
for surface in tables + supplyshelf:
    static_graph_edges[objects_masterlist.index(surface), objects_masterlist.index("room")] = 1.0
    for object in objects:
        active_edges_mask[objects_masterlist.index(object), objects_masterlist.index(surface)] = 1.0


In [None]:
nodes = torch.tensor([i for i in range(len(objects_masterlist))], dtype=torch.float32)
times = torch.arange(0, 60*3, 5, dtype=torch.float32)
activity = torch.zeros((len(times), 5), dtype=torch.float32)

In [None]:
def initialize_scene_graph_edges():
    scene_graph = static_graph_edges.clone()
    for object in objects:
        number_of_instances = random.randint(1, 5)
        for _ in range(number_of_instances):
            scene_graph[objects_masterlist.index(object), objects_masterlist.index(random.choice(surfaces))] = 1.0
    return scene_graph

def initialize_activity_labels():
    num_people = torch.zeros((1, 5), dtype=torch.float32)
    for i in range(5):
        num_people[0, i] = random.randint(0, 10)  # Random number of people for each activity
    return num_people

def sample_next_scene_graph_edges(scene_graph):
    next_scene_graph = scene_graph.clone()
    for object in objects:
        if random.random() < 0.1:  # 50% chance to add a new edge
            next_scene_graph[objects_masterlist.index(object), objects_masterlist.index(random.choice(surfaces))] = 1.0
        current_edges = next_scene_graph[objects_masterlist.index(object), :].nonzero(as_tuple=False).squeeze(dim=-1)
        if len(current_edges) > 0 and random.random() < 0.1:  # 10% chance to remove an edge
            next_scene_graph[objects_masterlist.index(object), current_edges[random.randint(0, len(current_edges) - 1)]] = 0.0
    return next_scene_graph

def sample_next_activity_labels(activity):
    next_activity = activity.clone()
    for i in range(5):
        if random.random() < 0.1:  # 10% chance to change the number of people
            next_activity[:, i] = random.randint(0, 10)
    return next_activity


In [None]:
dirname = "train"
os.makedirs(dirname, exist_ok=True)
for sample_num in range(50):
    scene_graph_list = [initialize_scene_graph_edges()]
    activity_list = [initialize_activity_labels()]
    for time in times[1:]:
        next_scene_graph = sample_next_scene_graph_edges(scene_graph_list[-1])
        scene_graph_list.append(next_scene_graph)
        next_activity = sample_next_activity_labels(activity_list[-1])
        activity_list.append(next_activity)
    
    # Save the scene graph and activity data
    torch.save({
        'nodes': nodes,
        'edges': scene_graph_list,
        'times': times,
        'active_edges': active_edges_mask,
        'activity': activity_list,
    }, f"{dirname}/{sample_num:03d}.pt")