In [1]:
%load_ext autoreload
%autoreload 2
import matplotlib.pyplot as plt
import numpy as np
import gymnasium as gym
import sys
sys.path.append('..')
import shutil
import random
from plot_utils import *
import math
#!wget https://dl.fbaipublicfiles.com/habitat/ReplicaCAD/hab2_bench_assets.zip -P data
#!cd data && unzip -q hab2_bench_assets.zip -d hab2_bench_assets
from plot_utils import *
from utils import *
from agent_env_utils import *
from config import (
    data_dir, initial_dir, final_dir,
    size_types, space_types, bound_types,
    moving_directions, color_maps,
    potential_sizes, direction_length, scales,
    background_quadrants, directions
)
import warnings
import itertools
import os

In [8]:
def create_object_config(obj_type, obj_id, size, color, initiate_area, static):
    actual_size = size * 1.1 / 2 if obj_type == "custom" else size
    return {
        'obj_type': obj_type,
        'name': obj_id,
        'size': actual_size,
        'color': color,
        'ranges': initiate_area,
        'static': static
    }

def generate_configs(obj_infos):
    configs = []
    bounds = []
    for i, obj_name in enumerate(obj_infos['obj_name']):
        tokens = obj_name.split("-")
        if(len(tokens) > 1):
            obj_type = tokens[0]
            obj_id = tokens[1]
        else:
            obj_id = None
            obj_type = tokens[0]
        config = create_object_config(
            obj_type, obj_id, 
            obj_infos['size_settings'][i] * 0.02 + np.random.uniform(0, 0.01),
            obj_infos['color'][i],
            obj_infos['initiate_area'],
            obj_infos['static'] or "top" in obj_infos['bound_direction']
        )
        configs.append(config)
        if obj_id == "bound":
            bounds.append(i)
    return configs, bounds

def create_extra_configs(configs, bounds):
    return [configs[bound_id].copy() for bound_id in bounds for _ in range(7)]

def place_main_objects(env, configs):
    base_size = 0.08
    main_placements = [(i+1, i, "right") for i in range(len(configs) - 1)]
    distance = np.random.uniform(3*base_size, 3.5*base_size) * np.sqrt(2)
    
    for direction_config in main_placements:
        if direction_config[2] == "top" and configs[direction_config[0]]['obj_type'] == "custom":
            new_distance = distance + 2*base_size
        else:
            new_distance = distance
        env.spawn_next(direction_config, distance=new_distance)

def place_bound_objects(env, configs, bounds, bound_directions):
    for k, bound_id in enumerate(bounds):
        id_offset = k * 7 + len(configs)
        if bound_directions[k] == "front":
            place_front_bound(env, configs, bound_id, id_offset)
        elif bound_directions[k] == "top":
            place_top_bound(env, configs, bound_id, id_offset)

def place_front_bound(env, configs, bound_id, id_offset):
    bound_direction_configs = [
        (0+id_offset, bound_id, "left"), (bound_id, bound_id, "right"),
        (1+id_offset, 0+id_offset, "behind"), (2+id_offset, 1+id_offset, "right"),
        (3+id_offset, 2+id_offset, "right"), (4+id_offset, 0+id_offset, "front"),
        (5+id_offset, 4+id_offset, "right"), (6+id_offset, 5+id_offset, "right")
    ]
    distances = [2 * configs[bound_id]['size']] * len(bound_direction_configs)
    env.place_cubes_in_direction(bound_direction_configs, distances=distances)

def place_top_bound(env, configs, bound_id, id_offset):
    bound_direction_configs = [
        (0+id_offset, bound_id, "left"), (1+id_offset, bound_id, "right"),
        (2+id_offset, 0+id_offset, "top"), (3+id_offset, 1+id_offset, "top"),
        (4+id_offset, 2+id_offset, "top"), (5+id_offset, 3+id_offset, "top"),
        (6+id_offset, 5+id_offset, "left")
    ]
    distances = [2.5 * configs[bound_id]['size'] if cfg[2] == "top" else 2 * configs[bound_id]['size'] 
                 for cfg in bound_direction_configs]
    env.place_cubes_in_direction(bound_direction_configs, distances=distances)

def save_scene(env, save_dir, steps=5, mode="all"):
    collect_and_save(env, save_dir, steps=steps, mode=mode)

def generate_output(save_dir, obj_infos, configs):
    return {
        "source": save_dir,
        "obj_names": obj_infos['obj_name'],
        'sizes': [config['size'] for config in configs],
        "colors": obj_infos['color'],
        "initiate_area": obj_infos['initiate_area'],
        "bound_directions": obj_infos['bound_direction'],
    }

def generate_volume_comparison(env, obj_infos, save_info, background=0, steps=5):
    configs, bounds = generate_configs(obj_infos)
    extra_configs = create_extra_configs(configs, bounds)
    
    env.register_configures(collate_infos(configs))
    if extra_configs:
        env.register_configures(collate_infos(extra_configs), replace=False)
    
    _ = env.reset(options={"reconfigure": True})
    env.initialize_objects(regular=True, background=background)
    
    place_main_objects(env, configs)
    place_bound_objects(env, configs, bounds, obj_infos['bound_direction'])
    
    save_dir = f"{save_info['output_dir']}/object{save_info['episode_id']}.png"
    save_scene(env, save_dir, steps=steps, mode=[f"front{background}", f"top{background}", f"side{background}"])
    
    return [generate_output(save_dir, obj_infos, configs)]

In [9]:
# Define directories
data_dir = "benchmark/occupancy/volume_object_float"
initial_dir = f"{data_dir}/initial"
final_dir = f"{data_dir}/final"
#os.makedirs(initial_dir, exist_ok=True)
os.makedirs(final_dir, exist_ok=True)

flush = True
if flush:
    if os.path.exists(final_dir):
        shutil.rmtree(final_dir)
        os.makedirs(final_dir)
    if os.path.exists(os.path.join(data_dir, "index.jsonl")):
        os.remove(os.path.join(data_dir, "index.jsonl"))

# Define color pairs and other parameters

strings = color_maps.keys()
pairs = list(itertools.permutations(strings, 2))
objects = ["cube", "sphere"]
np.random.seed(42)
np.random.shuffle(pairs)

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    env = gym.make("CustomEnv-v0", obs_mode="rgbd")

    outputs = []
    idx = 0
    for background in range(len(background_quadrants)):
        if(background > 0):
            break
        for quadrant in range(4):
            initiate_area = background_quadrants[background][quadrant]
            xlim = initiate_area[0] * 0.5
            ylim = initiate_area[1] * 0.5
            ranges = [(xlim, 0), (ylim, 0)]
            for size_settings in itertools.permutations(size_types, len(size_types)):
                for obj_names in itertools.permutations(objects, 2):
                    for colors in pairs[0:]:
                        colors = [color_maps[color] for color in colors]
                        num_bound_cases = sum(1 for obj in obj_names if "bound" in obj)
                        for bound_directions in list(itertools.product(bound_types, repeat=num_bound_cases))[:1]:
                            obj_infos = {
                                'obj_name': obj_names,
                                'size_settings': size_settings,
                                'color': colors,
                                'initiate_area': ranges,
                                'bound_direction': bound_directions,
                                'static': True
                            }

                            save_infos = {
                                'episode_id': idx,
                                'output_dir': final_dir,
                            }
                            outputs += generate_volume_comparison(env, obj_infos, save_infos, background=background, steps=1)
                            idx += 1

    write_out(f"{data_dir}/index.jsonl", outputs)

del env

print("Spatial Occupancy data generation complete.")



Spatial Occupancy data generation complete.
