In [1]:
# Dataset parameters
SAMPLE_NUM = 50 # Number of participants to downsample to
RNG_SEED = 42 # Random seed for reproducibility

# Dask parameters
PARTITION_SIZE = "100M" # Chunk size for Dask operations
DASK_N_WORKERS = 1 # Number of Dask workers to use
DASK_THREADS_PER_WORKER = 1 # Number of threads per Dask worker
DASK_MEMORY_LIMIT = '6G' # Memory limit

In [2]:
# Set Dask client (workaround for WSL)
from dask.distributed import Client

client = Client(
    n_workers=DASK_N_WORKERS,
    threads_per_worker=DASK_THREADS_PER_WORKER,
    memory_limit=DASK_MEMORY_LIMIT,
    processes=True,
)



In [3]:
import dask.dataframe as dd

# Load the dataset using Dask with specified dtypes, filter task PUR
ddf = dd.read_parquet(
    "dataset/gazebasevr.parquet", filters=[("task", "=", 2)]
)
ddf = ddf.repartition(partition_size=PARTITION_SIZE)
ddf = ddf.shuffle(on="participant")

# Print partition information
print(f"Number of partitions: {ddf.npartitions}")

Number of partitions: 36


In [4]:
import json
import numpy as np

# Randomly sample some participants from the dataset
# Set random seed for reproducibility
np.random.seed(RNG_SEED)
participants = set(np.random.choice(ddf["participant"].unique(), SAMPLE_NUM, replace=False))

# Save the selected participants to a file
with open("dataset/gazebasevr_filtered.json", "w") as f:
    json.dump(
        {
            "participants": list(map(int, participants)),  # Convert np.int16 to int for JSON serialization
            "task": 2,
            "task_name": "PUR",
            "sample_num": SAMPLE_NUM,
            "rng_seed": RNG_SEED,
        },
        f,
    )

In [6]:
# Filter the Dask DataFrame to only include the selected participants
ddf = ddf.map_partitions(lambda df: df[df["participant"].isin(participants)])

# Save to one parquet file
ddf.repartition(npartitions=1).to_parquet(
    "dataset/gazebasevr_filtered.parquet",
    write_index=False
)