In [2]:
%matplotlib inline
%load_ext autoreload
%autoreload 3
from sparc_fuse_core import (list_primary_files,
                      download_and_convert_sparc_data)

DATASET_ID = 224

files, meta = list_primary_files(DATASET_ID)

print("primary files: ", [f["path"] for f in files])

# Case 1 – convert a single file
download_and_convert_sparc_data(
    DATASET_ID,
    primary_paths=files[0]["path"].replace("files/", ""),
    output_dir="./output_single", 
    file_format="zarr"  # or "zarr.zip"
)

  __import__("pkg_resources").declare_namespace(__name__)


primary files:  ['files/primary/sub-ChAT-Male-Subject-1/20_1021.acq', 'files/primary/sub-ChAT-Male-Subject-2/20_1022.acq', 'files/primary/sub-ChAT-Female-Subject-1/20_1023.acq', 'files/primary/sub-ChAT-Male-Subject-3/20_1027a.acq', 'files/primary/sub-ChAT-Female-Subject-2/20_1027b.acq', 'files/primary/sub-ChAT-Female-Subject-3/20_1028.acq', 'files/primary/sub-nNOS-Male-Subject-1/20_1202a.acq', 'files/primary/sub-nNOS-Male-Subject-2/20_1202b.acq', 'files/primary/sub-nNOS-Male-Subject-3/20_1203a.acq', 'files/primary/sub-nNOS-Female-Subject-1/20_1203b.acq', 'files/primary/sub-nNOS-Female-Subject-2/20_1207.acq', 'files/primary/sub-nNOS-Female-Subject-3/20_1208.acq', 'files/primary/manifest.xlsx']
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_400_adi.py: No module named 'adi._adi_cffi'
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_378.py: No module named 'adi._adi_cffi'
[INFO] Loaded 32 descriptor(s) from ./mapping_schemes


Dataset 224: 100%|██████████| 1/1 [00:14<00:00, 14.26s/it]


[{'rel_path': 'primary/sub-ChAT-Male-Subject-1/20_1021.acq',
  'local_path': '/var/folders/34/5y88z7rd5ml0ycwmy2g3ntrw0000gn/T/tmpe7z784vt/20_1021.acq',
  'std_path': 'output_single/20_1021_std.zarr',
  'descriptor_id': 'acq_mapping_001',
  'mapping_score': 3,
  'status': 'ok',
  'error': None}]

In [3]:
import zarr, s3fs

# Adjust region if needed (your bucket is in eu-north-1)
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": "eu-north-1"})
s3_store = zarr.storage.FSStore("s3://sparc-fuse-demo-ab-2025/20_1021_std.zarr", fs=fs)

# Consolidate metadata for faster remote traversal
zarr.consolidate_metadata(s3_store)
print("Consolidated metadata.")


Consolidated metadata.


In [8]:
import os
os.environ["AWS_DEFAULT_REGION"] = "eu-north-1"

import s3fs
import zarr
import xarray as xr
import numpy as np

# Connect to S3 Zarr
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": "eu-north-1"})
store = zarr.storage.FSStore("s3://sparc-fuse-demo-ab-2025/20_1021_std.zarr", fs=fs)

# Open (prefer consolidated metadata if available)
try:
    root = zarr.open_consolidated(store)
except Exception:
    root = zarr.open(store)

# Show structure
print("Zarr tree:")
root.tree()

# Inspect arrays
signals_arr = root["signals"]
time_arr = root["time"]
print("signals shape:", signals_arr.shape, "attrs:", signals_arr.attrs)
print("time shape:", time_arr.shape, "attrs:", time_arr.attrs)

# Load arrays (lazy until sliced)
signals = signals_arr[:]  # full read; can slice instead if large
time = time_arr[:]

# Heuristic: align dimensions
if signals.shape[1] == len(time):
    # signals: (channel, time)
    ds = xr.Dataset(
        {"signals": (("channel", "time"), signals)},
        coords={"time": ("time", time), "channel": ("channel", np.arange(signals.shape[0]))},
    )
elif signals.shape[0] == len(time):
    # signals: (time, channel)
    ds = xr.Dataset(
        {"signals": (("time", "channel"), signals)},
        coords={"time": ("time", time), "channel": ("channel", np.arange(signals.shape[1]))},
    )
else:
    # Unknown layout; wrap generically
    ds = xr.Dataset({"signals": (("dim0", "dim1"), signals)})

print("\nConstructed xarray Dataset:")
print(ds)

# Example lazy access: slice without pulling everything
if "time" in ds.dims:
    print("\nExample slice:", ds["signals"].isel(time=0))


Zarr tree:
signals shape: (4, 1408754) attrs: <zarr.attrs.Attributes object at 0x31cf0d660>
time shape: (1408754,) attrs: <zarr.attrs.Attributes object at 0x31cf0ca90>

Constructed xarray Dataset:
<xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
  * channel  (channel) int64 32B 0 1 2 3
Data variables:
    signals  (channel, time) float64 45MB -0.2869 -0.3036 ... 0.0008336

Example slice: <xarray.DataArray 'signals' (channel: 4)> Size: 32B
array([-0.28689462, -0.33375651,  0.02725497,  0.00112855])
Coordinates:
    time     float64 8B 0.0
  * channel  (channel) int64 32B 0 1 2 3


In [10]:
import os
import numpy as np
import xarray as xr
import s3fs
import zarr

os.environ.setdefault("AWS_DEFAULT_REGION", "eu-north-1")
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": "eu-north-1"})

# Load raw SPARC-FUSE Zarr (as you did)
raw_store = zarr.storage.FSStore("s3://sparc-fuse-demo-ab-2025/20_1021_std.zarr", fs=fs)
try:
    root = zarr.open_consolidated(raw_store)
except Exception:
    root = zarr.open(raw_store)

signals = root["signals"][:]
time = root["time"][:]

# Build the Xarray Dataset
ds = xr.Dataset(
    {"signals": (("channel", "time"), signals)},
    coords={"time": ("time", time), "channel": ("channel", np.arange(signals.shape[0]))},
)

# Write out a self-describing Zarr store with consolidated metadata
out_path = "s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr"
out_store = zarr.storage.FSStore(out_path, fs=fs)
ds.to_zarr(out_store, mode="w", consolidated=True)


<xarray.backends.zarr.ZarrStore at 0x31b3ad6c0>

In [11]:
import xarray as xr
ds = xr.open_zarr("s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr", consolidated=True)
print(ds)


<xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * channel  (channel) int64 32B 0 1 2 3
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
Data variables:
    signals  (channel, time) float64 45MB dask.array<chunksize=(1, 88048), meta=np.ndarray>


In [12]:
# === Cloud-first preparation: convert a SPARC file, push to S3, consolidate, and emit an Xarray-compatible Zarr ===

import os
import subprocess
import json
import numpy as np
import xarray as xr
import s3fs
import zarr
from sparc_fuse_core import list_primary_files, download_and_convert_sparc_data

# ---------- parameters (demo) ----------
DATASET_ID = 224
BUCKET = "sparc-fuse-demo-ab-2025"        # your bucket name
REGION = "eu-north-1"                     # bucket region
RAW_ZARR = "20_1021_std.zarr"
XARRAY_ZARR = "20_1021_std_xarray.zarr"

# Ensure AWS region is visible to libraries
os.environ.setdefault("AWS_DEFAULT_REGION", REGION)

# 1. Convert a single primary file to Zarr locally
files, _ = list_primary_files(DATASET_ID)
primary_path = files[0]["path"].replace("files/", "")
print(f"Converting primary file {primary_path} to Zarr...")
download_and_convert_sparc_data(
    DATASET_ID,
    primary_paths=primary_path,
    output_dir="./output_single",
    file_format="zarr"
)

# 2. Sync raw Zarr to S3
print(f"Uploading raw Zarr to s3://{BUCKET}/{RAW_ZARR} ...")
subprocess.run([
    "aws", "s3", "sync",
    f"./output_single/{RAW_ZARR}",
    f"s3://{BUCKET}/{RAW_ZARR}",
    "--region", REGION
], check=True)

# 3. Consolidate metadata on S3 (raw store)
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": REGION})
raw_store = zarr.storage.FSStore(f"s3://{BUCKET}/{RAW_ZARR}", fs=fs)
print("Consolidating raw Zarr metadata...")
zarr.consolidate_metadata(raw_store)

# 4. Load raw Zarr and assemble an xarray.Dataset
try:
    root = zarr.open_consolidated(raw_store)
except Exception:
    root = zarr.open(raw_store)

signals = root["signals"][:]  # (channel, time)
time = root["time"][:]
ds = xr.Dataset(
    {"signals": (("channel", "time"), signals)},
    coords={"time": ("time", time), "channel": ("channel", np.arange(signals.shape[0]))},
)

print("Constructed xarray.Dataset:", ds)

# 5. Write out self-describing (Xarray-compatible) Zarr to S3
xarray_store = zarr.storage.FSStore(f"s3://{BUCKET}/{XARRAY_ZARR}", fs=fs)
print(f"Writing self-describing Zarr to s3://{BUCKET}/{XARRAY_ZARR} ...")
ds.to_zarr(xarray_store, mode="w", consolidated=True)

# 6. Emit a lightweight discovery manifest
manifest = {
    "dataset_id": DATASET_ID,
    "zarr_path": f"s3://{BUCKET}/{XARRAY_ZARR}",
    "generated_at": f"{__import__('datetime').datetime.utcnow().isoformat()}Z",
    "file_format": "zarr"
}
with open("latest.json", "w") as f:
    json.dump(manifest, f, indent=2)
subprocess.run([
    "aws", "s3", "cp", "latest.json",
    f"s3://{BUCKET}/latest.json",
    "--region", REGION
], check=True)

print("\n✅ Preparation complete.")
print("Self-describing Zarr at:", f"s3://{BUCKET}/{XARRAY_ZARR}")
print("Manifest at:", f"s3://{BUCKET}/latest.json")


Converting primary file primary/sub-ChAT-Male-Subject-1/20_1021.acq to Zarr...
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_400_adi.py: No module named 'adi._adi_cffi'
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_378.py: No module named 'adi._adi_cffi'
[INFO] Loaded 32 descriptor(s) from ./mapping_schemes


Dataset 224: 100%|██████████| 1/1 [00:07<00:00,  7.53s/it]


Uploading raw Zarr to s3://sparc-fuse-demo-ab-2025/20_1021_std.zarr ...
Consolidating raw Zarr metadata...
Constructed xarray.Dataset: <xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
  * channel  (channel) int64 32B 0 1 2 3
Data variables:
    signals  (channel, time) float64 45MB -0.2869 -0.3036 ... 0.0008336
Writing self-describing Zarr to s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr ...
upload: ./latest.json to s3://sparc-fuse-demo-ab-2025/latest.json   

✅ Preparation complete.
Self-describing Zarr at: s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr
Manifest at: s3://sparc-fuse-demo-ab-2025/latest.json


In [14]:
# === Consume the prepared self-describing Zarr lazily ===

import os
import xarray as xr

# Parameters (must match what was produced)
ZARR_PATH = "s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr"
os.environ.setdefault("AWS_DEFAULT_REGION", "eu-north-1")

# Lazy open
ds_lazy = xr.open_zarr(ZARR_PATH, consolidated=True)
print("Dataset structure (metadata only):")
print(ds_lazy)

# Example zero-copy slice (only fetches needed chunk)
print("\nExample lazy slice (first 1k timepoints of channel 0):")
print(ds_lazy["signals"].isel(channel=0, time=slice(0, 1000)))


Dataset structure (metadata only):
<xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * channel  (channel) int64 32B 0 1 2 3
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
Data variables:
    signals  (channel, time) float64 45MB dask.array<chunksize=(1, 88048), meta=np.ndarray>

Example lazy slice (first 1k timepoints of channel 0):
<xarray.DataArray 'signals' (time: 1000)> Size: 8kB
dask.array<getitem, shape=(1000,), dtype=float64, chunksize=(1000,), chunktype=numpy.ndarray>
Coordinates:
    channel  int64 8B 0
  * time     (time) float64 8kB 0.0 0.01 0.02 0.03 0.04 ... 9.96 9.97 9.98 9.99


In [15]:
import os
import subprocess
import json
import numpy as np
import xarray as xr
import s3fs
import zarr
from sparc_fuse_core import list_primary_files, download_and_convert_sparc_data

# Parameters
DATASET_ID = 224
BUCKET = "sparc-fuse-demo-ab-2025"
REGION = "eu-north-1"
RAW_ZARR = "20_1021_std.zarr"
XARRAY_ZARR = "20_1021_std_xarray.zarr"

# AWS region setup
os.environ.setdefault("AWS_DEFAULT_REGION", REGION)

# Convert SPARC file to Zarr locally
files, _ = list_primary_files(DATASET_ID)
primary_path = files[0]["path"].replace("files/", "")
download_and_convert_sparc_data(
    DATASET_ID,
    primary_paths=primary_path,
    output_dir="./output_single",
    file_format="zarr"
)

# Upload raw Zarr to S3
subprocess.run([
    "aws", "s3", "sync",
    f"./output_single/{RAW_ZARR}",
    f"s3://{BUCKET}/{RAW_ZARR}",
    "--region", REGION
], check=True)

# Consolidate metadata
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"region_name": REGION})
raw_store = zarr.storage.FSStore(f"s3://{BUCKET}/{RAW_ZARR}", fs=fs)
zarr.consolidate_metadata(raw_store)

# Create Xarray-compatible Zarr
root = zarr.open_consolidated(raw_store)
signals = root["signals"][:]
time = root["time"][:]
ds = xr.Dataset(
    {"signals": (("channel", "time"), signals)},
    coords={"time": ("time", time), "channel": ("channel", np.arange(signals.shape[0]))},
)

# Write Xarray-compatible Zarr to S3
xarray_store = zarr.storage.FSStore(f"s3://{BUCKET}/{XARRAY_ZARR}", fs=fs)
ds.to_zarr(xarray_store, mode="w", consolidated=True)

# Generate discovery manifest
manifest = {
    "dataset_id": DATASET_ID,
    "zarr_path": f"s3://{BUCKET}/{XARRAY_ZARR}",
    "generated_at": f"{__import__('datetime').datetime.utcnow().isoformat()}Z",
    "file_format": "zarr"
}
with open("latest.json", "w") as f:
    json.dump(manifest, f, indent=2)
subprocess.run([
    "aws", "s3", "cp", "latest.json",
    f"s3://{BUCKET}/latest.json",
    "--region", REGION
], check=True)

print("✅ Preparation complete.")


[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_400_adi.py: No module named 'adi._adi_cffi'
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_378.py: No module named 'adi._adi_cffi'
[INFO] Loaded 32 descriptor(s) from ./mapping_schemes


Dataset 224: 100%|██████████| 1/1 [00:07<00:00,  7.73s/it]


upload: ./latest.json to s3://sparc-fuse-demo-ab-2025/latest.json   
✅ Preparation complete.


In [16]:
import os
import xarray as xr

os.environ.setdefault("AWS_DEFAULT_REGION", "eu-north-1")

# Lazily open the data directly from S3
ds = xr.open_zarr(
    "s3://sparc-fuse-demo-ab-2025/20_1021_std_xarray.zarr",
    consolidated=True
)
print(ds)

# Stream just a slice you need
subset = ds["signals"].sel(channel=0).isel(time=slice(0, 1000))
print(subset)


<xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * channel  (channel) int64 32B 0 1 2 3
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
Data variables:
    signals  (channel, time) float64 45MB dask.array<chunksize=(1, 88048), meta=np.ndarray>
<xarray.DataArray 'signals' (time: 1000)> Size: 8kB
dask.array<getitem, shape=(1000,), dtype=float64, chunksize=(1000,), chunktype=numpy.ndarray>
Coordinates:
    channel  int64 8B 0
  * time     (time) float64 8kB 0.0 0.01 0.02 0.03 0.04 ... 9.96 9.97 9.98 9.99


In [17]:
from sparc_fuse_core import (
    list_primary_files, download_and_convert_sparc_data,
    upload_to_s3, consolidate_s3_metadata,
    create_xarray_zarr_from_raw, generate_and_upload_manifest
)

# Parameters
DATASET_ID = 224
BUCKET = "sparc-fuse-demo-ab-2025"
REGION = "eu-north-1"
RAW_ZARR = "20_1021_std.zarr"
XARRAY_ZARR = "20_1021_std_xarray.zarr"

# Convert SPARC file to Zarr locally
files, _ = list_primary_files(DATASET_ID)
primary_path = files[0]["path"].replace("files/", "")
download_and_convert_sparc_data(
    DATASET_ID,
    primary_paths=primary_path,
    output_dir="./output_single",
    file_format="zarr"
)

# Upload raw Zarr to S3
upload_to_s3(f"./output_single/{RAW_ZARR}", BUCKET, RAW_ZARR, REGION)

# Consolidate metadata
consolidate_s3_metadata(BUCKET, RAW_ZARR, REGION)

# Create Xarray-compatible Zarr and upload to S3
create_xarray_zarr_from_raw(BUCKET, RAW_ZARR, XARRAY_ZARR, REGION)

# Generate discovery manifest and upload
generate_and_upload_manifest(DATASET_ID, BUCKET, XARRAY_ZARR, REGION)

print("✅ Preparation complete.")


[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_400_adi.py: No module named 'adi._adi_cffi'
[WARN] Failed to load descriptor from ./mapping_schemes/mapping_scheme_378.py: No module named 'adi._adi_cffi'
[INFO] Loaded 32 descriptor(s) from ./mapping_schemes


Dataset 224: 100%|██████████| 1/1 [00:08<00:00,  8.99s/it]


upload: ./latest.json to s3://sparc-fuse-demo-ab-2025/latest.json   
✅ Preparation complete.


In [18]:
from sparc_fuse_core import open_zarr_from_s3

# Open dataset lazily from S3
ds = open_zarr_from_s3(
    bucket="sparc-fuse-demo-ab-2025",
    zarr_path="20_1021_std_xarray.zarr"
)

print(ds)  # Immediately available metadata, lazy data loading

# Stream a specific data slice
subset = ds["signals"].sel(channel=0).isel(time=slice(0, 1000))
print(subset)


<xarray.Dataset> Size: 56MB
Dimensions:  (channel: 4, time: 1408754)
Coordinates:
  * channel  (channel) int64 32B 0 1 2 3
  * time     (time) float64 11MB 0.0 0.01 0.02 ... 1.409e+04 1.409e+04 1.409e+04
Data variables:
    signals  (channel, time) float64 45MB dask.array<chunksize=(1, 88048), meta=np.ndarray>
<xarray.DataArray 'signals' (time: 1000)> Size: 8kB
dask.array<getitem, shape=(1000,), dtype=float64, chunksize=(1000,), chunktype=numpy.ndarray>
Coordinates:
    channel  int64 8B 0
  * time     (time) float64 8kB 0.0 0.01 0.02 0.03 0.04 ... 9.96 9.97 9.98 9.99
