In [1]:
import os

os.environ["JAX_PLATFORMS"] = "cpu" 


from datasets import Dataset

from gensbi_examples.tasks import get_task

from jax import numpy as jnp

import json
from huggingface_hub import upload_file

import numpy as np

# Your dictionary with metadata




In [7]:
tasks = ["two_moons", "slcp", "gaussian_linear", "gaussian_mixture"]
repo_name = "aurelio-amerio/SBI-benchmarks"

In [3]:
metadata = {}

for task_name in tasks:
    task = get_task(task_name)
    dim_data = task.data["dim_data"].item()
    dim_theta = task.data["dim_theta"].item()

    metadata[task_name] = {"dim_data": dim_data, "dim_theta": dim_theta}

file_path = "metadata.json"
with open(file_path, 'w') as f:
    json.dump(metadata, f, indent=4)

./task_data/data_two_moons.npz already exists, skipping download.
./task_data/data_slcp.npz already exists, skipping download.
./task_data/data_gaussian_linear_uniform.npz already exists, skipping download.
./task_data/data_gaussian_linear.npz already exists, skipping download.
./task_data/data_gaussian_mixture.npz already exists, skipping download.


In [4]:
upload_file(
    path_or_fileobj=file_path,
    path_in_repo="metadata.json",  # The name of the file in the repo
    repo_id=repo_name,
    repo_type="dataset",
)

CommitInfo(commit_url='https://huggingface.co/datasets/aurelio-amerio/SBI-benchmarks/commit/907cb05a9fbc842761e77c2a037cc9bc8efd0b52', commit_message='Upload metadata.json with huggingface_hub', commit_description='', oid='907cb05a9fbc842761e77c2a037cc9bc8efd0b52', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/aurelio-amerio/SBI-benchmarks', endpoint='https://huggingface.co', repo_type='dataset', repo_id='aurelio-amerio/SBI-benchmarks'), pr_revision=None, pr_num=None)

In [None]:
# upload dataset function

def upload_dataset(task_name: str, repo_name: str):
    task = get_task(task_name)
    data_dict = dict(task.data)

    max_samples = int(1e6)
    dtype = jnp.float32

    xs = data_dict["xs"][: max_samples]
    xs = xs.astype(dtype)
    thetas = data_dict["thetas"][: max_samples]
    thetas = thetas.astype(dtype)

    xs_val = data_dict["xs"][max_samples :]
    xs_val = xs_val.astype(dtype)
    thetas_val = data_dict["thetas"][max_samples :]
    thetas_val = thetas_val.astype(dtype)

    observations = data_dict["observations"]
    observations = observations.astype(dtype)

    reference_samples = data_dict["reference_samples"]
    reference_samples = reference_samples.astype(dtype)

    true_parameters = data_dict["true_parameters"]
    true_parameters = true_parameters.astype(dtype)

    # dim_data = data_dict["dim_data"]
    # dim_theta = data_dict["dim_theta"]
    # dim_joint = dim_data + dim_theta
    # num_observations = data_dict["num_observations"]

    dataset_train = Dataset.from_dict({"xs": xs, "thetas": thetas})
    dataset_val = Dataset.from_dict({"xs": xs_val, "thetas": thetas_val})
    dataset_reference_posterior = Dataset.from_dict(
        {"reference_samples": reference_samples, "observations": observations, "true_parameters": true_parameters}
    )

    dataset_train.push_to_hub(repo_name, config_name=task_name, split="train", private=False)
    dataset_val.push_to_hub(repo_name, config_name=task_name, split="validation", private=False)
    dataset_reference_posterior.push_to_hub(repo_name, config_name=f"{task_name}_posterior", split="reference_posterior", private=False)

    return

# upload datasets

In [6]:
upload_dataset("two_moons", "aurelio-amerio/SBI-benchmarks")

./task_data/data_two_moons.npz already exists, skipping download.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

In [7]:
upload_dataset("gaussian_linear", "aurelio-amerio/SBI-benchmarks")

./task_data/data_gaussian_linear.npz already exists, skipping download.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

In [8]:
upload_dataset("gaussian_linear_uniform", "aurelio-amerio/SBI-benchmarks")

./task_data/data_gaussian_linear_uniform.npz already exists, skipping download.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

In [9]:
upload_dataset("gaussian_mixture", "aurelio-amerio/SBI-benchmarks")

./task_data/data_gaussian_mixture.npz already exists, skipping download.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

In [10]:
upload_dataset("slcp", "aurelio-amerio/SBI-benchmarks")

./task_data/data_slcp.npz already exists, skipping download.


Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ? shards/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

Processing Files (0 / 0): |          |  0.00B /  0.00B            

New Data Upload: |          |  0.00B /  0.00B            

# read the dataset

In [3]:
from datasets import load_dataset
import grain

In [8]:
#test load the datastets 
for task_name in tasks:
    task = get_task(task_name)

gaussian_linear/train-00000-of-00001.par(…):   0%|          | 0.00/81.3M [00:00<?, ?B/s]

gaussian_linear/validation-00000-of-0000(…):   0%|          | 0.00/117k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1000000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1000 [00:00<?, ? examples/s]

gaussian_linear_posterior/reference_post(…):   0%|          | 0.00/4.70M [00:00<?, ?B/s]

Generating reference_posterior split:   0%|          | 0/10 [00:00<?, ? examples/s]

'(ReadTimeoutError("HTTPSConnectionPool(host='huggingface.co', port=443): Read timed out. (read timeout=10)"), '(Request ID: 7b755ccc-f25c-4501-8705-91e228c1f1e1)')' thrown while requesting HEAD https://huggingface.co/datasets/aurelio-amerio/SBI-benchmarks/resolve/5c43656a7592d0e784d61790f99dcdf764b2d388/gaussian_mixture/train-00000-of-00001.parquet
Retrying in 1s [Retry 1/5].


gaussian_mixture/train-00000-of-00001.pa(…):   0%|          | 0.00/17.2M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/23.2k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1000000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1000 [00:00<?, ? examples/s]

gaussian_mixture_posterior/reference_pos(…):   0%|          | 0.00/1.25M [00:00<?, ?B/s]

Generating reference_posterior split:   0%|          | 0/10 [00:00<?, ? examples/s]

In [284]:
batch_size=4000

In [5]:
dataset = load_dataset("aurelio-amerio/SBI-benchmarks", "two_moons").with_format("numpy")
datset_posteriors = load_dataset("aurelio-amerio/SBI-benchmarks", "two_moons_posterior").with_format("numpy")

ScannerError: while scanning a simple key
  in "<unicode string>", line 215, column 1:
    ------
    ^
could not find expected ':'
  in "<unicode string>", line 216, column 1:
    license: mit
    ^

In [399]:
df = dataset["train"].with_format("numpy").select(range(int(1e5)))[:]
xs = df["xs"]
thetas = df["thetas"]

In [400]:
xs

array([[ 0.281683  ,  0.2242634 ],
       [ 0.2971876 , -0.03324421],
       [-0.75120413,  0.08253357],
       ...,
       [-0.60237926, -0.2914176 ],
       [ 0.22753447, -1.1962396 ],
       [-0.18617475, -0.7867467 ]], shape=(100000, 2), dtype=float32)

In [386]:
xs.shape

(100000, 2, 1)

In [286]:
df_train, df_val = dataset["train"], dataset["validation"]

In [309]:
df_train.to_parquet("df_train.parquet")

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

24000000

In [364]:
dataset_grain = (
    grain.MapDataset.source(df_train)
    .shuffle(seed=42)
    .repeat()
    # .to_iter_dataset()
    # .batch(30)
    )

In [365]:
performance_config = grain.experimental.pick_performance_config(
        ds=dataset_grain,
        ram_budget_mb=1024,
        max_workers=None,
        max_buffer_size=None
    )
ds_threads = dataset_grain.to_iter_dataset(read_options=performance_config.read_options).batch(batch_size)



In [289]:
performance_config = grain.experimental.pick_performance_config(
        ds=dataset_grain.to_iter_dataset(),
        ram_budget_mb=1024,
        max_workers=None,
        max_buffer_size=None
    )

prefetch_lazy_iter_ds = dataset_grain.to_iter_dataset().batch(batch_size).mp_prefetch(
        performance_config.multiprocessing_options,
    )

In [290]:
df_it_mp = iter(prefetch_lazy_iter_ds)
df_it_threads = iter(ds_threads)

In [291]:
next(df_it_mp);



In [292]:
next(df_it_threads);

In [293]:
import jax.numpy as jnp

In [294]:
%timeit jnp.mean(next(df_it_mp)["xs"])

40.4 ms ± 9.97 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [295]:
%timeit jnp.mean(next(df_it_threads)["xs"])

1.57 s ± 415 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [296]:
df_train_itr = iter(df_train.to_iterable_dataset().shuffle(42).repeat(None).batch(batch_size))

In [297]:
%timeit jnp.mean(next(df_train_itr)["xs"])

345 ms ± 7.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [298]:
task = get_task("two_moons")
data_dict = dict(task.data)

max_samples = int(1e6)
dtype = jnp.float32

xs = np.array(data_dict["xs"][: max_samples])

thetas = np.array(data_dict["thetas"][: max_samples])



./task_data/data_two_moons.npz already exists, skipping download.


In [299]:
itr_task = iter(task.get_train_dataset(batch_size))

In [300]:
%timeit jnp.mean(next(itr_task))

127 ms ± 31.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [301]:
dataset_grain_np = (
    grain.MapDataset.source(xs)
    .shuffle(seed=42)
    .repeat()
    .to_iter_dataset()
    .batch(batch_size))

In [302]:
itr_grain_np = iter(dataset_grain_np)

In [303]:
next(itr_grain_np);

In [304]:
%timeit jnp.mean(next(itr_grain_np))

104 ms ± 6.19 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [380]:
dataset_grain_np = (
    grain.MapDataset.source(xs)
    .shuffle(seed=42)
    .repeat())

performance_config = grain.experimental.pick_performance_config(
        ds=dataset_grain_np.to_iter_dataset(),
        ram_budget_mb=1024,
        max_workers=None,
        max_buffer_size=None
    )

prefetch_lazy_iter_ds_2 = dataset_grain_np.to_iter_dataset().batch(batch_size).mp_prefetch(
        # performance_config.multiprocessing_options,
        grain.MultiprocessingOptions(num_workers=64, per_worker_buffer_size=10)
    )

In [381]:
itr_lazy2 = iter(prefetch_lazy_iter_ds_2)
next(itr_lazy2);



In [383]:
%timeit jnp.mean(next(itr_lazy2))

5.27 ms ± 150 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [342]:
ds = grain.experimental.ParquetIterDataset('./df_train.parquet')

In [353]:
ds_1 = grain.experimental.WindowShuffleIterDataset(ds, window_size=10*batch_size, seed=42).batch(batch_size)

In [354]:
ds_1_itr = iter(ds_1)

In [356]:
from tqdm import tqdm

In [358]:
for i in tqdm(range(1000)):
    next(ds_1_itr)

 14%|█▍        | 142/1000 [00:05<00:32, 26.50it/s]


StopIteration: 

In [334]:
performance_config = grain.experimental.pick_performance_config(
        ds=ds_1,
        ram_budget_mb=1024,
        max_workers=4,
        max_buffer_size=None
    )

# ds_1_pre = ds_1.batch(batch_size).mp_prefetch(
#         performance_config.multiprocessing_options,
#     )
ds_1_pre = ds_1.batch(batch_size).mp_prefetch(
        grain.MultiprocessingOptions(num_workers=3, per_worker_buffer_size=10),
    )



In [335]:
ds_1_pre_itr = iter(ds_1_pre)

In [336]:
next(ds_1_pre_itr);

ERROR:absl:Error occurred in child process with worker_index: 2
Traceback (most recent call last):
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/grain_pool.py", line 262, in _worker_loop
    next_element = next(element_producer)
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 540, in __call__
    _set_slice_iter_dataset(
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 451, in _set_slice_iter_dataset
    _set_slice_iter_dataset(parent, sl, sequential_slice)
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 451, in _set_slice_iter_dataset
    _set_slice_iter_dataset(parent, sl, sequential_slice)
  File "/lhome/ific/

ValueError: Grain worker 2 failed with the following error:

Traceback (most recent call last):
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/grain_pool.py", line 262, in _worker_loop
    next_element = next(element_producer)
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 540, in __call__
    _set_slice_iter_dataset(
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 451, in _set_slice_iter_dataset
    _set_slice_iter_dataset(parent, sl, sequential_slice)
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 451, in _set_slice_iter_dataset
    _set_slice_iter_dataset(parent, sl, sequential_slice)
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 451, in _set_slice_iter_dataset
    _set_slice_iter_dataset(parent, sl, sequential_slice)
  [Previous line repeated 1 more time]
  File "/lhome/ific/a/aamerio/miniforge3/envs/gensbi/lib/python3.12/site-packages/grain/_src/python/dataset/transformations/prefetch.py", line 446, in _set_slice_iter_dataset
    raise ValueError(f"Cannot slice `IterDataset` source. {type(ds)}")
ValueError: Cannot slice `IterDataset` source. <class 'grain._src.python.dataset.sources.parquet_dataset.ParquetIterDataset'>


In [361]:
df_tf = df_train.to_tf_dataset(

   shuffle=True,

   batch_size=batch_size,

   prefetch=True

)

ImportError: Called a Tensorflow-specific function but Tensorflow is not installed.