In [2]:
from datasets import load_from_disk

ds = load_from_disk('/Projects/SG_VLN_HumanData/spatial_training/data/habitat_web_pose_v1/train')

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
%pip install "datasets==3.4.1"

In [21]:
import numpy as np
def analyze_motion(example):
    # Extract Translation [x, y, z] from the 7D vector
    # Shape: (Seq_Len, 7)
    poses = np.array(example['pos_rots'])
    trans = poses[:, :3]
    
    # Calculate Displacement from Start
    # d = || t_i - t_0 ||
    start = trans[0]
    displacements = np.linalg.norm(trans - start, axis=1)
    
    # Calculate Step-wise Delta (Velocity)
    # v = || t_i - t_{i-1} ||
    deltas = np.linalg.norm(np.diff(trans, axis=0), axis=0)
    
    return {
        "max_displacement": np.max(displacements),
        "mean_velocity": np.mean(deltas),
        "is_stationary": np.max(displacements) < 1e-4
    }

print("Computing motion statistics...")
# Check first 1000 examples
stats = ds.select(range(1000)).map(analyze_motion, num_proc=16)

# Filter for the "Ghost" episodes
stationary_eps = [x for x in stats if x['is_stationary']]

print(f"Total Analyzed: {len(stats)}")
print(f"Stationary Episodes: {len(stationary_eps)}")

if len(stationary_eps) > 0:
    print(f"Example Stationary ID: {stationary_eps[0]['episode_id']}")
    print("WARNING: You have episodes where the agent pose NEVER changes.")
else:
    print("Good news: No purely stationary episodes found.")
    print(f"Avg Max Displacement: {np.mean(stats['max_displacement']):.2f}m")

Computing motion statistics...
Total Analyzed: 1000
Stationary Episodes: 0
Good news: No purely stationary episodes found.
Avg Max Displacement: 6.29m


In [3]:
from datasets import Image,Sequence
from PIL import Image as PILImage
ds_i = ds.cast_column('images',(Sequence(Image(decode=False))))
def validate_episode_images(example):
    """
    Checks if ALL images in the episode's sequence can be opened.
    Returns False if even one image is broken or missing.
    """
    # We access 'images' because we rename 'rgb_paths' -> 'images' earlier in the pipeline
    image_paths = example.get("images", [])
    # print("hi")
    if not image_paths:
        return False 
    
    for path in image_paths:
        path = path['path']
        try:
            with PILImage.open(path) as img:
                img.verify() 
                # print("verified")
        except:
            # print("failed")
            return False
    return True
ds_i = ds_i.filter(validate_episode_images,num_proc=32,batch_size=10)
ds_i = ds_i.cast_column('images',Sequence(Image(decode=False)))

Filter (num_proc=32): 100%|██████████| 22117/22117 [02:18<00:00, 160.16 examples/s]
Casting the dataset: 100%|██████████| 22111/22111 [00:04<00:00, 4428.70 examples/s]


In [4]:
ds_i.cache_files

[{'filename': '/Projects/SG_VLN_HumanData/spatial_training/data/habitat_web_pose_v1/train/cache-4093a82b0089f078.arrow'}]

In [1]:
!echo $HF_HOME

/Projects/huggingface


In [8]:
def load_image_bytes(batch):
    """
    Explicitly reads file paths into binary bytes.
    Operates on a batch of raw dictionaries: {'bytes': None, 'path': '/...'}
    """
    updated_images_col = []
    
    for row_list in batch['images']:
        new_row = []
        for img_struct in row_list:
            # Check if we have a path and no bytes
            path = img_struct.get('path')
            bytes_data = img_struct.get('bytes')
            
            if path and bytes_data is None:
                try:
                    with open(path, "rb") as f:
                        bytes_data = f.read()
                    # We populate bytes, keeping path for reference (optional)
                    new_row.append({"bytes": bytes_data, "path": path})
                except Exception as e:
                    print(f"Error reading {path}: {e}")
                    # Keep original struct on failure
                    new_row.append(img_struct)
            else:
                # Already loaded or empty
                new_row.append(img_struct)
        
        updated_images_col.append(new_row)
    
    return {"images": updated_images_col}



In [None]:
print("2. Manually reading images (Batch Size: 50)...")
# KEY FIX: batch_size=50 ensures we never process >2GB at once.
# writer_batch_size=50 ensures the cache files on disk are also fragmented safely.
ds_embedded = ds_i.map(
    load_image_bytes,
    batched=True,
    batch_size=10, 
    writer_batch_size=32,
    desc="Embedding Images",
    num_proc=32 # Feel free to increase if you have fast disk I/O
)

print("3. Restoring Image feature...")
# Now that 'bytes' are populated, we tell datasets to treat them as images again
final_ds = ds_embedded

print("4. Pushing to Hub (skipping internal embedding)...")
# embed_external_files=False is CRITICAL here. 
# It tells the library: "I already did the work, just upload what I gave you."


2. Manually reading images (Batch Size: 50)...


Embedding Images (num_proc=32): 100%|██████████| 22111/22111 [08:40<00:00, 42.47 examples/s] 


3. Restoring Image feature...
4. Pushing to Hub (skipping internal embedding)...


Creating parquet from Arrow format: 100%|██████████| 2/2 [00:01<00:00,  1.34ba/s]
Processing Files (0 / 0)                : |          |  0.00B /  0.00B            
[A
Processing Files (0 / 1)                :   1%|▏         | 4.19MB /  334MB, 6.99MB/s  
Processing Files (0 / 1)                :  10%|█         | 33.6MB /  334MB, 41.9MB/s  
Processing Files (0 / 1)                :  25%|██▌       | 83.9MB /  334MB, 83.8MB/s  
Processing Files (0 / 1)                :  46%|████▋     |  155MB /  334MB,  129MB/s  
Processing Files (0 / 1)                :  69%|██████▉   |  231MB /  334MB,  165MB/s  
Processing Files (0 / 1)                :  93%|█████████▎|  310MB /  334MB,  193MB/s  
Processing Files (0 / 1)                :  99%|█████████▉|  331MB /  334MB,  184MB/s  
Processing Files (0 / 1)                :  99%|█████████▉|  331MB /  334MB,  166MB/s  
[A
Processing Files (0 / 1)                :  99%|█████████▉|  333MB /  334MB,  139MB/s  
Processing Files (0 / 1)                : 10

KeyboardInterrupt: 

In [None]:
final_ds.push_to_hub(
    'Aasdfip/habitat_web_pose_train', 
    # max_shard_size='500MB',
    embed_external_files=False 
)

In [7]:
ds_i.to_parquet('my_test_parquet')

Creating parquet from Arrow format: 100%|██████████| 222/222 [00:04<00:00, 50.54ba/s]


1050821080

In [None]:

ds_e  = ds_i.select(range(10)).map(embed_table_storage, batched=True)

ImportError: cannot import name 'embed_table_storage' from 'datasets' (/root/conda/envs/unsloth_env/lib/python3.11/site-packages/datasets/__init__.py)

In [13]:
embed_table_storage(ds_i.select(2).data)

TypeError: 'int' object is not iterable

MemoryMappedTable
episode_id: string
scene_id: string
goal_category: string
goal_text: string
images: list<item: struct<bytes: binary, path: string>>
  child 0, item: struct<bytes: binary, path: string>
      child 0, bytes: binary
      child 1, path: string
pos_rots: list<item: list<item: double>>
  child 0, item: list<item: double>
      child 0, item: double
action_sequence: list<item: string>
  child 0, item: string
action_ids: list<item: int64>
  child 0, item: int64
messages: list<item: struct<content: list<item: struct<text: string, type: string>>, role: string>>
  child 0, item: struct<content: list<item: struct<text: string, type: string>>, role: string>
      child 0, content: list<item: struct<text: string, type: string>>
          child 0, item: struct<text: string, type: string>
              child 0, text: string
              child 1, type: string
      child 1, role: string
----
episode_id: [["A9K0CV70JWG1W:3A9AA95ATYPB4MWO73HNAMIZS99P5I","A26LOVXF4QZZCO:3E47SOBEYSZ4ML

In [24]:
import pyarrow as pa
from datasets.features.image import Image
from datasets.utils.file_utils import xopen
from datasets.utils.py_utils import no_op_if_value_is_null
import os

def robust_embed_storage(self, storage):
    """
    Monkeypatch for Image.embed_storage.
    Fixes 'ArrowInvalid: offset overflow' by forcing LargeBinary (64-bit) storage.
    """
    
    @no_op_if_value_is_null
    def path_to_bytes(path):
        with xopen(path, "rb") as f:
            bytes_ = f.read()
        return bytes_

    def force_flat_array(data, type_):
        arr = pa.array(data, type=type_)
        if isinstance(arr, pa.ChunkedArray):
            return arr.combine_chunks()
        return arr

    # 1. Convert to python list
    storage_list = storage.to_pylist()

    # 2. Create bytes array using LARGE_BINARY (supports >2GB chunks)
    bytes_data = [
        (path_to_bytes(x["path"]) if x["bytes"] is None else x["bytes"]) if x is not None else None
        for x in storage_list
    ]
    # !!! CRITICAL CHANGE: pa.large_binary() instead of pa.binary()
    bytes_array = force_flat_array(bytes_data, pa.large_binary())
    
    # 3. Create path array
    path_data = [
        os.path.basename(path) if path is not None else None 
        for path in storage.field("path").to_pylist()
    ]
    path_array = force_flat_array(path_data, pa.string())

    # 4. Create mask
    mask_data = bytes_array.is_null().to_pylist()
    mask_array = force_flat_array(mask_data, pa.bool_())

    # 5. Build StructArray
    storage = pa.StructArray.from_arrays(
        [bytes_array, path_array], 
        ["bytes", "path"], 
        mask=mask_array
    )
    
    # !!! CRITICAL CHANGE: Do NOT call array_cast(storage, self.pa_type).
    # self.pa_type expects standard 32-bit binary. Calling it would attempt 
    # to downcast our new 64-bit array and cause the overflow crash again.
    return storage

# Apply the patch
print("Applying 64-bit LargeBinary patch to datasets...")
Image.embed_storage = robust_embed_storage
print("Patch applied.")

Applying 64-bit LargeBinary patch to datasets...
Patch applied.


In [25]:
ds_i.push_to_hub('Aasdfip/habitat_web_pose_train')

Map:   0%|          | 0/19 [01:56<?, ? examples/s]166 [00:00<?, ?it/s]
Uploading the dataset shards:   0%|          | 0/1166 [01:56<?, ?it/s]


ArrowInvalid: Failed casting from large_binary to binary: input array too large

In [19]:
print(ds_i[0]['images'][0])

{'bytes': None, 'path': '/Projects/SG_VLN_HumanData/SG-VLN/data/datasets/objectnav/objectnav_mp3d_thda_70k/objectnav_images/17DRP5sb8fy/A9K0CV70JWG1W_3A9AA95ATYPB4MWO73HNAMIZS99P5I/0000.png'}


In [11]:
# test_ds = ds_i
from PIL import Image as PILImage
def validate_episode_images(example):
    """
    Checks if ALL images in the episode's sequence can be opened.
    Returns False if even one image is broken or missing.
    """
    # We access 'images' because we rename 'rgb_paths' -> 'images' earlier in the pipeline
    image_paths = example.get("images", [])
    # print("hi")
    if not image_paths:
        return False 
    
    for path in image_paths:
        path = path['path']
        try:
            with PILImage.open(path) as img:
                img.verify() 
                # print("verified")
        except:
            # print("failed")
            return False
    return True

def trim_images(example):
    example['images'] = example['images'][:]
    return example

test_ds = ds_i.map(trim_images,writer_batch_size=10,num_proc=96)
test_ds_2 = ds_i.select(range(99)).map(trim_images,writer_batch_size=10)


Map: 100%|██████████| 99/99 [01:12<00:00,  1.37 examples/s]


In [13]:
test_ds.features

{'episode_id': Value(dtype='string', id=None),
 'scene_id': Value(dtype='string', id=None),
 'goal_category': Value(dtype='string', id=None),
 'goal_text': Value(dtype='string', id=None),
 'images': Sequence(feature=Image(mode=None, decode=True, id=None), length=-1, id=None),
 'pos_rots': Sequence(feature=Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None), length=-1, id=None),
 'action_sequence': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
 'action_ids': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None),
 'messages': [{'content': [{'text': Value(dtype='string', id=None),
     'type': Value(dtype='string', id=None)}],
   'role': Value(dtype='string', id=None)}]}

In [15]:
test_ds_2.features

{'episode_id': Value(dtype='string', id=None),
 'scene_id': Value(dtype='string', id=None),
 'goal_category': Value(dtype='string', id=None),
 'goal_text': Value(dtype='string', id=None),
 'images': Sequence(feature=Image(mode=None, decode=True, id=None), length=-1, id=None),
 'pos_rots': Sequence(feature=Sequence(feature=Value(dtype='float64', id=None), length=-1, id=None), length=-1, id=None),
 'action_sequence': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
 'action_ids': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None),
 'messages': [{'content': [{'text': Value(dtype='string', id=None),
     'type': Value(dtype='string', id=None)}],
   'role': Value(dtype='string', id=None)}]}

In [9]:
test_ds = test_ds.cast_column('images',Sequence(Image()))

Casting the dataset:   0%|          | 0/22111 [00:00<?, ? examples/s]

Casting the dataset: 100%|██████████| 22111/22111 [00:15<00:00, 1414.17 examples/s]


In [16]:
test_ds_2.select(range(2)).push_to_hub('tiny_test_ds')

NameError: name 'test_ds_2' is not defined