In [13]:
import numpy as np
from tqdm import tqdm
import wandb as wb
import imageio
from io import BytesIO
import pickle
import zarr
from pathlib import Path
import os
import random

from src.codecs.imagecodecs import Jpeg2k, register_codecs

In [2]:
register_codecs()

In [3]:
def create_gif_or_mp4(np_images, filename, fps=10):
    # duration = 1000 / fps
    with imageio.get_writer(filename, fps=fps) as writer:
        for img in tqdm(np_images):
            writer.append_data(img)
    print(f"File saved as {filename}")

In [2]:
data_base_dir = Path(os.environ.get("FURNITURE_DATA_DIR", "data"))

In [5]:
old_zarr = zarr.open(
    "/data/scratch/ankile/furniture-data-old/data/processed/sim/image_small/one_leg/data.zarr",
    mode="r",
)

In [6]:
old_zarr["color_image1"][0].shape

(224, 224, 3)

In [7]:
old_zarr["color_image1"].shape

(370234, 224, 224, 3)

In [8]:
image_compressor = Jpeg2k(level=75)

In [9]:
output_store = zarr.open(
    "/data/scratch/ankile/furniture-data-old/data/processed/sim/image_small/one_leg/data_compressed.zarr",
    mode="w",
)

In [10]:
_ = output_store.require_dataset(
    name="color_image1",
    shape=(10_000,) + old_zarr["color_image1"].shape[1:],
    chunks=(224, 224, 3),
    compressor=image_compressor,
    dtype=np.uint8,
)

# for i in tqdm(range(len(old_zarr["color_image1"]))):
output_store["color_image1"][:] = old_zarr["color_image1"][:10_000]

## Have a look at the compressed data to verify that it looks alright

In [6]:
store = zarr.open(
    "/data/scratch/ankile/furniture-data-old/data/processed/sim/image_small/one_leg/data_compressed.zarr",
)

## Inspect the current image data file we're using

In [8]:
datapath = data_base_dir / "processed/sim/image_small/one_leg/data.zarr"

store = zarr.open(datapath, mode="r")

In [9]:
store["color_image1"].shape, store["color_image1"].chunks

((370234, 224, 224, 3), (1, 224, 224, 3))

In [10]:
# Check the storage size of the dataset in GB
store["color_image1"].nbytes / 1e9

55.730583552

In [None]:
# Time how long it takes to iterate through the dataset
for i in tqdm(range(0, len(store["color_image1"]), 16)):
    store["color_image1"][i : i + 16]

 78%|███████▊  | 17987/23140 [35:23<10:08,  8.47it/s]  


KeyboardInterrupt: 

## Test different chunking strategies

### JPEG compression with one image per chunk

In [23]:
output1 = zarr.open(
    "/data/scratch/ankile/tmp-compression-test/data_chunk_1_jpeg.zarr",
    mode="w",
)

In [25]:
_ = output1.require_dataset(
    name="color_image1",
    shape=(10_000,) + store["color_image1"].shape[1:],
    chunks=(1, 224, 224, 3),
    compressor=image_compressor,
    dtype=np.uint8,
)

output1["color_image1"][:] = store["color_image1"][:10_000]

In [4]:
# Time how long it takes to do 10_000 random reads
random_reads = random.sample(range(0, 10_000), 10_000)

In [None]:
for i in tqdm(random_reads):
    output1["color_image1"][i : i + 16]

### Default compression with 10 images per chunk    

In [11]:
output2 = zarr.open(
    "/data/scratch/ankile/tmp-compression-test/data_chunk_10_default.zarr",
    mode="w",
)
_ = output2.require_dataset(
    name="color_image1",
    shape=(50_000,) + store["color_image1"].shape[1:],
    chunks=(10, 224, 224, 3),
    # compressor=image_compressor,
    dtype=np.uint8,
)

output2["color_image1"][:] = store["color_image1"][:50_000]

In [14]:
# Time how long it takes to iterate through the dataset
for i in tqdm(random_reads):
    output2["color_image1"][i : i + 16]

100%|██████████| 10000/10000 [00:20<00:00, 482.74it/s]


### Default compression with 100 images per chunk

In [15]:
output3 = zarr.open(
    "/data/scratch/ankile/tmp-compression-test/data_chunk_100_default.zarr",
    mode="w",
)
_ = output3.require_dataset(
    name="color_image1",
    shape=(50_000,) + store["color_image1"].shape[1:],
    chunks=(100, 224, 224, 3),
    # compressor=image_compressor,
    dtype=np.uint8,
)

output3["color_image1"][:] = store["color_image1"][:50_000]

In [None]:
# Time how long it takes to iterate through the dataset
for i in tqdm(random_reads):
    output3["color_image1"][i : i + 16]

100%|██████████| 10000/10000 [00:30<00:00, 333.10it/s]


### Default compression with 32 images per chunk


In [None]:
output4 = zarr.open(
    "/data/scratch/ankile/tmp-compression-test/data_chunk_32_default.zarr",
    mode="w",
)
_ = output4.require_dataset(
    name="color_image1",
    shape=(50_000,) + store["color_image1"].shape[1:],
    chunks=(32, 224, 224, 3),
    # compressor=image_compressor,
    dtype=np.uint8,
)

output4["color_image1"][:] = store["color_image1"][:50_000]

In [None]:
# Time how long it takes to iterate through the dataset
for i in tqdm(random_reads):
    output4["color_image1"][i : i + 16]

100%|██████████| 10000/10000 [00:20<00:00, 482.22it/s]


## Make a copy of the original data file with chunksize 32

In [44]:
outdatapath = data_base_dir / "processed/sim/image_small/one_leg/data_chunks_32.zarr"

In [48]:
# Make a new dataset with the same data but different chunking
output_store = zarr.open(outdatapath, mode="w")

for key in store.keys():
    print(key)
    _ = output_store.require_dataset(
        name=key,
        shape=store[key].shape,
        chunks=(32,) + store[key].chunks[1:],
        # compressor=image_compressor,
        dtype=store[key].dtype,
    )

    for i in tqdm(range(0, len(store[key]), 5_000)):
        output_store[key][i : i + 5_000] = store[key][i : i + 5_000]

color_image1


  8%|▊         | 6/75 [02:31<29:05, 25.29s/it]


KeyboardInterrupt: 

## Look at the new, full dataset with chunksize 32

In [3]:
datapath32 = data_base_dir / "processed/sim/image_small/one_leg/data_batch_32.zarr"

store32 = zarr.open(datapath32, mode="r")

In [5]:
for i in tqdm(random_reads):
    store32["color_image1"][i : i + 16]

 26%|██▋       | 2640/10000 [00:37<01:45, 69.77it/s] 


KeyboardInterrupt: 

## Check compression strategies

In [None]:
import numpy as np
from tqdm import tqdm
import wandb as wb
import imageio
from io import BytesIO
import pickle
import zarr
from pathlib import Path
import os
import random

from src.codecs.imagecodecs import Jpeg2k, register_codecs

In [None]:
data_base_dir = Path(os.environ.get("FURNITURE_DATA_DIR", "data"))

## Inspect the current image data file we're using

In [None]:
datapath = data_base_dir / "processed/sim/image_small/one_leg/data.zarr"

store = zarr.open(datapath, mode="r")

In [None]:
store["color_image1"].shape, store["color_image1"].chunks

((370234, 224, 224, 3), (1, 224, 224, 3))

In [None]:
# Check the storage size of the dataset in GB
store["color_image1"].nbytes / 1e9

55.730583552

## Do a one-off conversion of a dataset with chunksize 1 to 32

In [None]:
datapath = data_base_dir / "processed/sim/image_small/one_leg/data_batch_32.zarr"

store = zarr.open(datapath, mode="a")

In [None]:
# z.create_dataset(
#     "reward",
#     shape=(0,),
#     dtype=np.float32,
#     chunks=(chunksize,),
# )
# z.create_dataset(
#     "skill",
#     shape=(0,),
#     dtype=np.float32,
#     chunks=(chunksize,),
# )

In [None]:
store.create_dataset(
    "reward",
    shape=store["reward_old"].shape,
    dtype=np.int8,
    chunks=(100,),
)

# Iterate over the dataset and copy over
for i in tqdm(range(0, store["reward_old"].shape[0], 100)):
    store["reward"][i : i + 100] = store["reward_old"][i : i + 100]

  0%|          | 0/3696 [00:00<?, ?it/s]

100%|██████████| 3696/3696 [58:34<00:00,  1.05it/s]


In [None]:
store.create_dataset(
    "skill",
    shape=store["skill_old"].shape,
    dtype=np.int8,
    chunks=(100,),
)

# Iterate over the dataset and copy over
for i in tqdm(range(0, store["skill_old"].shape[0], 100)):
    store["skill"][i : i + 100] = store["skill_old"][i : i + 100]

100%|██████████| 3696/3696 [52:59<00:00,  1.16it/s]


## Run comparisons fo different chunking strategies and compression schemes

Read and write speeds of the data is incredibly slow now, how can we recitfy this?

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import zarr
import numpy as np
from tqdm import tqdm, trange
from pathlib import Path
import os
from datetime import datetime

from numcodecs import Blosc, blosc
from glob import glob

from concurrent.futures import ProcessPoolExecutor

In [5]:
# Get some data
data_base_dir = Path(os.environ["DATA_DIR_PROCESSED"])
zarr_path = data_base_dir / "processed" / "sim/lamp/scripted/low/success.zarr"

source = zarr.open(
    zarr_path,
    mode="r",
)

list(source.keys())

['action',
 'color_image1',
 'color_image2',
 'episode_ends',
 'feature',
 'furniture',
 'parts_poses',
 'pickle_file',
 'reward',
 'robot_state',
 'skill',
 'success']

In [4]:
# Load the images into memory
images = source["color_image2"][:]

In [5]:
f"{images.shape[0]:,}"

'179,739'

In [8]:
# Create a new dataset
target = zarr.open(
    "tmp/uncompressed_chunks_1.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(1,) + images.shape[1:],
    # compressor=image_compressor,
    dtype=np.uint8,
)

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], 1)):
    target["color_image2"][i : i + 1] = images[i : i + 1]

# Iterate over the dataset and copy over 1000 at a time
for i in tqdm(range(0, images.shape[0], 1000)):
    target["color_image2"][i : i + 1000] = images[i : i + 1000]

# Iterate over the dataset and copy over 10_000 at a time
for i in tqdm(range(0, images.shape[0], 10_000)):
    target["color_image2"][i : i + 10_000] = images[i : i + 10_000]

100%|██████████| 179739/179739 [12:48<00:00, 233.95it/s]
100%|██████████| 180/180 [11:04<00:00,  3.69s/it]
100%|██████████| 18/18 [11:05<00:00, 36.98s/it]


In [7]:
# Create a new dataset
target = zarr.open(
    "tmp/uncompressed_chunks_100.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(100,) + images.shape[1:],
    # compressor=image_compressor,
    dtype=np.uint8,
)

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], 100)):
    target["color_image2"][i : i + 100] = images[i : i + 100]

# Iterate over the dataset and copy over 1000 at a time
for i in tqdm(range(0, images.shape[0], 1000)):
    target["color_image2"][i : i + 1000] = images[i : i + 1000]

# # Iterate over the dataset and copy over 10_000 at a time
# for i in tqdm(range(0, images.shape[0], 10_000)):
#     target["color_image2"][i : i + 10_000] = images[i : i + 10_000]

100%|██████████| 1798/1798 [02:16<00:00, 13.14it/s]
100%|██████████| 180/180 [02:18<00:00,  1.30it/s]


In [8]:
# Create a new dataset
target = zarr.open(
    "tmp/uncompressed_chunks_1000.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(1000,) + images.shape[1:],
    # compressor=image_compressor,
    dtype=np.uint8,
)

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], 1000)):
    target["color_image2"][i : i + 1000] = images[i : i + 1000]

# Iterate over the dataset and copy over 1000 at a time
for i in tqdm(range(0, images.shape[0], 5000)):
    target["color_image2"][i : i + 5000] = images[i : i + 5000]

# # Iterate over the dataset and copy over 10_000 at a time
# for i in tqdm(range(0, images.shape[0], 10_000)):
#     target["color_image2"][i : i + 10_000] = images[i : i + 10_000]

100%|██████████| 180/180 [02:09<00:00,  1.39it/s]
100%|██████████| 36/36 [02:10<00:00,  3.61s/it]


In [9]:
compressor = Blosc(cname="lz4", clevel=5)

# Create a new dataset
target = zarr.open(
    "tmp/compressed_chunks_100_level_5.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(100,) + images.shape[1:],
    compressor=compressor,
    dtype=np.uint8,
)

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], 100)):
    target["color_image2"][i : i + 100] = images[i : i + 100]

# Iterate over the dataset and copy over 1000 at a time
for i in tqdm(range(0, images.shape[0], 1_000)):
    target["color_image2"][i : i + 1_000] = images[i : i + 1_000]

# # Iterate over the dataset and copy over 10_000 at a time
# for i in tqdm(range(0, images.shape[0], 10_000)):
#     target["color_image2"][i : i + 10_000] = images[i : i + 10_000]

100%|██████████| 1798/1798 [02:16<00:00, 13.20it/s]
100%|██████████| 180/180 [02:15<00:00,  1.33it/s]


In [10]:
compressor = Blosc(cname="lz4", clevel=5)

# Create a new dataset
target = zarr.open(
    "tmp/compressed_chunks_1000_level_5.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(1000,) + images.shape[1:],
    compressor=compressor,
    dtype=np.uint8,
)

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], 1_000)):
    target["color_image2"][i : i + 1_000] = images[i : i + 1_000]

# Iterate over the dataset and copy over 1000 at a time
for i in tqdm(range(0, images.shape[0], 5_000)):
    target["color_image2"][i : i + 5_000] = images[i : i + 5_000]

# # Iterate over the dataset and copy over 10_000 at a time
# for i in tqdm(range(0, images.shape[0], 10_000)):
#     target["color_image2"][i : i + 10_000] = images[i : i + 10_000]

100%|██████████| 180/180 [02:09<00:00,  1.39it/s]
100%|██████████| 36/36 [02:10<00:00,  3.64s/it]


### Do similar experiments in the context of the pickle processing code

In [11]:
from src.data_processing.process_pickles import (
    parallel_process_pickle_files,
    initialize_zarr_store,
)

Importing module 'gym_38' (/data/scratch/ankile/isaacgym/python/isaacgym/_bindings/linux-x86_64/gym_38.so)
Setting GYM_USD_PLUG_INFO_PATH to /data/scratch/ankile/isaacgym/python/isaacgym/_bindings/linux-x86_64/usd/plugInfo.json


In [12]:
pkl_glob = "/data/scratch-oc40/pulkitag/ankile/furniture-data/raw/sim/square_table/teleop/low/success/*.pkl*"
outpath = "/data/scratch/ankile/furniture-data/processed/sim/square_table/teleop/low/success_test.zarr"

pickle_paths = [Path(p) for p in glob(pkl_glob, recursive=True)]
len(pickle_paths)

50

In [19]:
# Process all pickle files
chunksize = 1_000
noop_threshold = 0.0
# n_cpus = min(os.cpu_count(), 64)
n_cpus = 16

print(
    f"Processing pickle files with {n_cpus} CPUs, chunksize={chunksize}, noop_threshold={noop_threshold}"
)

all_data = parallel_process_pickle_files(pickle_paths, noop_threshold, n_cpus)

# Define the full shapes for each dataset
full_data_shapes = [
    # These are of length: number of timesteps
    ("robot_state", all_data["robot_state"].shape, np.float32),
    ("color_image1", all_data["color_image1"].shape, np.uint8),
    ("color_image2", all_data["color_image2"].shape, np.uint8),
    ("action/delta", all_data["action/delta"].shape, np.float32),
    ("action/pos", all_data["action/pos"].shape, np.float32),
    ("skill", all_data["skill"].shape, np.float32),
    ("reward", all_data["reward"].shape, np.float32),
    ("parts_poses", all_data["parts_poses"].shape, np.float32),
    # These are of length: number of episodes
    ("episode_ends", (len(all_data["episode_ends"]),), np.uint32),
    ("furniture", (len(all_data["furniture"]),), str),
    ("success", (len(all_data["success"]),), np.uint8),
    ("pickle_file", (len(all_data["pickle_file"]),), str),
]

Processing pickle files with 16 CPUs, chunksize=1000, noop_threshold=0.0


Processing files: 100%|██████████| 50/50 [03:02<00:00,  3.66s/it] 


In [29]:
f"{all_data['color_image1'].shape[0]:,}"

'63,930'

In [21]:
# Initialize Zarr store with full dimensions
z = initialize_zarr_store(outpath, full_data_shapes, chunksize=chunksize)

# Write the data to the Zarr store
it = tqdm(all_data)
for name in it:
    it.set_description(f"Writing data to zarr: {name}")
    dataset = z[name]
    data = all_data[name]
    for i in trange(
        0, len(all_data[name]), chunksize, desc="Writing chunks", leave=False
    ):
        dataset[i : i + chunksize] = all_data[name][i : i + chunksize]

# Update final metadata
z.attrs["time_finished"] = datetime.now().astimezone().isoformat()
z.attrs["noop_threshold"] = noop_threshold
z.attrs["chunksize"] = chunksize
z.attrs["rotation_mode"] = "rot_6d"

Writing data to zarr: color_image2:   9%|▉         | 1/11 [00:34<05:48, 34.85s/it]


KeyboardInterrupt: 

In [None]:
compressor = Blosc(cname="lz4", clevel=5)

# Create a new dataset
target = zarr.open(
    "tmp/compressed_chunks_1000_level_5.zarr",
    mode="w",
)

# Create the dataset
target.create_dataset(
    "color_image2",
    shape=images.shape,
    chunks=(1000,) + images.shape[1:],
    compressor=compressor,
    dtype=np.uint8,
)

images2 = all_data["color_image2"]

# Iterate over the dataset and copy over one by one
for i in tqdm(range(0, images.shape[0], chunksize)):
    target["color_image2"][i : i + 1] = images2[i : i + 1]

## Code for combining a list of zarr files into a single numpy array in memory for training

In [8]:
def combine_zarr_datasets(zarr_paths, keys):
    # Initialize dictionary to hold total shapes
    total_shapes = {key: 0 for key in keys}
    last_episode_end = 0

    # First pass to calculate total shapes
    for path in zarr_paths:
        dataset = zarr.open(path, mode="r")
        for key in keys:
            data_shape = dataset[key].shape[0]
            if key == "episode_ends":
                total_shapes[key] += data_shape
            else:
                total_shapes[key] += data_shape

    # Preallocate numpy arrays
    combined_data = {}
    for path in zarr_paths:
        dataset = zarr.open(path, mode="r")
        for key in keys:
            dtype = dataset[key].dtype
            if key not in combined_data:
                if key == "episode_ends":
                    combined_data[key] = np.zeros(total_shapes[key], dtype=dtype)
                else:
                    # Assuming other arrays are 2D, adjust if not
                    combined_data[key] = np.zeros(
                        (total_shapes[key], *dataset[key].shape[1:]), dtype=dtype
                    )

    # Current indices for insertion into preallocated arrays
    current_indices = {key: 0 for key in keys}

    # Second pass to populate arrays
    for path in tqdm(zarr_paths, desc="Loading zarr files"):
        dataset = zarr.open(path, mode="r")

        for key in tqdm(keys, desc="Loading data", position=1):
            data = dataset[key][...]
            data_length = data.shape[0]

            if key == "episode_ends":
                if last_episode_end != 0:
                    data += last_episode_end
                last_episode_end = data[-1]

            combined_data[key][
                current_indices[key] : current_indices[key] + data_length
            ] = data
            current_indices[key] += data_length

    return combined_data

In [9]:
# Example usage
zarr_paths = [
    "/data/scratch/ankile/furniture-data/processed/sim/round_table/scripted/low/success.zarr",
    "/data/scratch/ankile/furniture-data/processed/sim/round_table/scripted/med/success.zarr",
]

keys = [
    "robot_state",
    "color_image1",
    "episode_ends",
]

combined_data = combine_zarr_datasets(zarr_paths, keys)

Loading zarr files: 100%|██████████| 2/2 [02:26<00:00, 73.20s/it]


In [10]:
combined_data["robot_state"].shape, combined_data["color_image1"].shape, combined_data[
    "episode_ends"
].shape

((328332, 16), (328332, 240, 320, 3), (252,))

In [11]:
combined_data["episode_ends"][-1]

328332