In [1]:
import fsspec
import s3fs
import zarr
import time

fsspec.get_mapper

<function fsspec.mapping.get_mapper(url='', check=False, create=False, missing_exceptions=None, alternate_root=None, **kwargs)>

In [2]:
from dataclasses import dataclass
import numpy as np
from numcodecs import Blosc

@dataclass
class OutputParameters:
    path: str
    chunksize: tuple[int, int, int, int, int]
    resolution_zyx: tuple[float, float, float]
    dtype: np.dtype = np.uint16
    dimension_separator: str = "/"
    compressor = Blosc(cname='zstd', clevel=1, shuffle=Blosc.SHUFFLE)


In [7]:
# Use active aind-open-data exapsim directory
output_params = OutputParameters(
    path='s3://aind-open-data/exaSPIM_674191_2023-09-12_12-37-37_full_res_2024-01-23_02-31-19/channel_561.zarr',
    chunksize=(1, 1, 128, 128, 128),
    resolution_zyx=(1.0, 0.748, 0.748),
)
output_volume_size = (28700, 19617, 14224)

s3 = s3fs.S3FileSystem(
    use_listings_cache= True, 
    config_kwargs={
        'max_pool_connections': 50,
        's3': {
          'multipart_threshold': 64 * 1024 * 1024,  # 64 MB, avoid multipart upload for small chunks
          'max_concurrent_requests': 20  # Increased from 10 -> 20.
        },
        'retries': {
          'total_max_attempts': 100,
          'mode': 'adaptive',
        }
    }
)
store = s3fs.S3Map(root=output_params.path, s3=s3)
out_group = zarr.group(store=store, overwrite=True)   # This is the problem line
path = "0"
chunksize = output_params.chunksize
datatype = output_params.dtype
dimension_separator = "/"
compressor = output_params.compressor
output_volume_1 = out_group.create_dataset(
    path,
    shape=(
        1,
        1,
        output_volume_size[0],
        output_volume_size[1],
        output_volume_size[2],
    ),
    chunks=chunksize,
    dtype=datatype,
    compressor=compressor,
    dimension_separator=dimension_separator,
    overwrite=True,
    fill_value=0,
)

output_slice = (
    slice(0, 1),
    slice(0, 1),
    slice(0, 300),
    slice(0, 300),
    slice(0, 300),
)

start_time = time.time()
output_volume_1[output_slice] = np.zeros((1, 1, 300, 300, 300))
print(f'Time: {time.time() - start_time}')

# Interesting
# latency appears to be related to re-initalizing the output dataset. 
# Maybe there is another way to init this connection object. 


KeyboardInterrupt: 

In [8]:
# Let's see if zarr.open works

out_vol = zarr.open_group('s3://aind-open-data/exaSPIM_674191_2023-09-12_12-37-37_full_res_2024-01-23_02-31-19/channel_561.zarr', mode='a')

output_slice = (
    slice(0, 1),
    slice(0, 1),
    slice(0, 300),
    slice(0, 300),
    slice(0, 300),
)

start_time = time.time()
out_vol[output_slice] = np.zeros((1, 1, 300, 300, 300))
print(f'Time: {time.time() - start_time}')

# Got it, 
# There is an API call to create a zarr array and another api call to open/access one
# Not like a typical database connection. 

# The change: 
# - Scheduler makes the group at the s3 path (create_output_store)
# - Workers connect to group given s3 path in OutputParameters (connect_to_output_store)


Time: 8.466339111328125


In [10]:
# Alternatively, can default to zarr.open and check for this error: 

zarr.hierarchy.contains_group(store)


True

In [12]:
output_params = OutputParameters(
    path='s3://aind-scratch-data/jonathan.wong/test_2.zarr',
    chunksize=(1, 1, 128, 128, 128),
    resolution_zyx=(1.0, 0.748, 0.748),
)
s3 = s3fs.S3FileSystem(
    use_listings_cache= False, 
    config_kwargs={
        'max_pool_connections': 50,
        's3': {
          'multipart_threshold': 64 * 1024 * 1024,  # 64 MB, avoid multipart upload for small chunks
          'max_concurrent_requests': 20  # Increased from 10 -> 20.
        },
        'retries': {
          'total_max_attempts': 100,
          'mode': 'adaptive',
        }
    }
)
store = s3fs.S3Map(root=output_params.path, s3=s3)



False

In [10]:
# Init new aind-test-data exapsim directory
output_params = OutputParameters(
    path='s3://aind-scratch-data/jonathan.wong/exaSPIM_674191_2023-09-12_12-37-37_full_res_2024-01-23_02-31-19/channel_561.zarr',
    chunksize=(1, 1, 128, 128, 128),
    resolution_zyx=(1.0, 0.748, 0.748),
)
output_volume_size = (28700, 19617, 14224)

s3 = s3fs.S3FileSystem(
    use_listings_cache= False, 
    config_kwargs={
        'max_pool_connections': 50,
        's3': {
          'multipart_threshold': 64 * 1024 * 1024,  # 64 MB, avoid multipart upload for small chunks
          'max_concurrent_requests': 20  # Increased from 10 -> 20.
        },
        'retries': {
          'total_max_attempts': 100,
          'mode': 'adaptive',
        }
    }
)
store = s3fs.S3Map(root=output_params.path, s3=s3)
out_group = zarr.open_group(store=store, mode='a')
path = "0"
chunksize = output_params.chunksize
datatype = output_params.dtype
dimension_separator = "/"
compressor = output_params.compressor
output_volume_2 = out_group.create_dataset(
    path,
    shape=(
        1,
        1,
        output_volume_size[0],
        output_volume_size[1],
        output_volume_size[2],
    ),
    chunks=chunksize,
    dtype=datatype,
    compressor=compressor,
    dimension_separator=dimension_separator,
    overwrite=True,
    fill_value=0,
)

start_time = time.time()
output_volume_2[output_slice] = np.zeros((1, 1, 300, 300, 300))
print(f'Time: {time.time() - start_time}')

Time: 5.0344603061676025
