## 1. create high-res base image 

In [1]:
import numpy as np 
import zarr
from dask.distributed import Client 
from dask import array as da, delayed, compute

In [2]:
c = Client(n_workers=4, threads_per_worker=1)

2024-09-24 15:53:20,326 - distributed.scheduler - ERROR - Task write_chunk_values-2381f501-c216-4376-8f89-a5912bd0cea9 marked as failed because 4 workers died while trying to run it


In [3]:
c

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 31.18 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46199,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 31.18 GiB

0,1
Comm: tcp://127.0.0.1:33179,Total threads: 1
Dashboard: http://127.0.0.1:37917/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:38203,
Local directory: /tmp/dask-scratch-space/worker-xixgyaka,Local directory: /tmp/dask-scratch-space/worker-xixgyaka

0,1
Comm: tcp://127.0.0.1:39863,Total threads: 1
Dashboard: http://127.0.0.1:41909/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:38271,
Local directory: /tmp/dask-scratch-space/worker-xs9aj8mt,Local directory: /tmp/dask-scratch-space/worker-xs9aj8mt

0,1
Comm: tcp://127.0.0.1:35431,Total threads: 1
Dashboard: http://127.0.0.1:46347/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:39051,
Local directory: /tmp/dask-scratch-space/worker-km_hdfm2,Local directory: /tmp/dask-scratch-space/worker-km_hdfm2

0,1
Comm: tcp://127.0.0.1:32899,Total threads: 1
Dashboard: http://127.0.0.1:36385/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:38403,
Local directory: /tmp/dask-scratch-space/worker-y3ea0_yw,Local directory: /tmp/dask-scratch-space/worker-y3ea0_yw


In [4]:
# define the highest resolution grid 
highest_res = np.array((512,512,512), dtype=int)
chunks = (64, 64, 64)
grid_dims = np.array(chunks, dtype=int)

# virtual pyramid settings
refine_factor = np.array((2,2,2), dtype=int)

In [5]:
zarr_file = './zarr-test-image-pyramid.zarr'
zarr_field = 'field1'
zg = zarr.group(zarr_file, overwrite=True)
field1 = zg.create_group(zarr_field, overwrite=True)

In [6]:
# initialize base high-res level (level 0)
lev0 = da.random.random(tuple(highest_res), chunks=chunks) 
field1.empty(0, shape=highest_res, chunks=chunks)

<zarr.core.Array '/field1/0' (512, 512, 512) float64>

In [7]:
field1.info

0,1
Name,/field1
Type,zarr.hierarchy.Group
Read-only,False
Store type,zarr.storage.DirectoryStore
No. members,1
No. arrays,1
No. groups,0
Arrays,0


In [8]:
lev0

Unnamed: 0,Array,Chunk
Bytes,1.00 GiB,2.00 MiB
Shape,"(512, 512, 512)","(64, 64, 64)"
Dask graph,512 chunks in 1 graph layer,512 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.00 GiB 2.00 MiB Shape (512, 512, 512) (64, 64, 64) Dask graph 512 chunks in 1 graph layer Data type float64 numpy.ndarray",512  512  512,

Unnamed: 0,Array,Chunk
Bytes,1.00 GiB,2.00 MiB
Shape,"(512, 512, 512)","(64, 64, 64)"
Dask graph,512 chunks in 1 graph layer,512 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
# write to disk
da.to_zarr(lev0, field1["0"])

## 2. logic for downsampling 


Given a single pixel at a given pyramid level, find and average a finer level

In [10]:
def get_fine_ijk(ijk_coarse, level_coarse, level_fine, refine_factor):
    ndim = len(ijk_coarse)
    d_level = level_coarse - level_fine
    ijk_0 = np.array([ijk_coarse[idim] * refine_factor[idim]**d_level for idim in range(ndim)], dtype=int)
    return ijk_0

In [11]:
def find_lev_0_average(ijk_coarse, level_coarse, level_fine, refine_factor, fine_array, offset=None):
    
    ijk = ijk_coarse 
    d_level = level_coarse - level_fine
    
    # get the level 0 index start    
    ijk_0 = get_fine_ijk(ijk, level_coarse, level_fine, refine_factor)
    if offset is None:
        offset = np.zeros(ijk_0.shape,dtype=int)
    ijk_0 = ijk_0 - offset 
    
    # number of fine level pixels covered by one pixel at current level
    lev0_Npixels = np.array(refine_factor**d_level, dtype=int)

    # get end index
    ijk_1 = ijk_0 + lev0_Npixels
    
    val = np.mean(fine_array[ijk_0[0]:ijk_1[0], ijk_0[1]:ijk_1[1], ijk_0[2]:ijk_1[2]])
    return val 

In [12]:
get_fine_ijk((3,1,4), 1, 0, refine_factor), get_fine_ijk((3,1,4), 2, 0, refine_factor)

(array([6, 2, 8]), array([12,  4, 16]))

In [13]:
find_lev_0_average((10,11,12), 2, 0, refine_factor, field1['0'])

0.5043423510981484

In [14]:
def get_level_shape(level_coarse, level_0_res, refine_factor):
    d_level = level_coarse - 0
    return level_0_res // refine_factor**d_level

In [15]:
get_level_shape(3, highest_res, refine_factor)

array([64, 64, 64])

In [16]:
def get_global_start_index(chunk_linear_index, chunks):    
    n_chunks_by_dim = [len(ch) for ch in chunks]
    chunk_index = np.unravel_index(chunk_linear_index, n_chunks_by_dim)    
    ndims = len(chunks)
    si = []
    ei = []
    for idim in range(ndims):
        dim_chunks = np.array(chunks[idim], dtype=int)
        
        covered_chunks = dim_chunks[0:chunk_index[idim]]                
        si.append(np.sum(covered_chunks).astype(int))
        ei.append(si[-1]+chunks[idim][chunk_index[idim]])

    si = np.array(si, dtype=int)
    ei = np.array(ei, dtype=int)
    return si, ei

In [17]:
import numba 

@numba.jit
def coarsen(output_shape, level_coarse, level_fine, refine_factor, covered_vals, offset):

    d_level = level_coarse - level_fine    
    
    lev0_Npixels_0 = refine_factor[0]**d_level
    lev0_Npixels_1 = refine_factor[1]**d_level
    lev0_Npixels_2 = refine_factor[2]**d_level

    output_array = np.zeros(output_shape, dtype=np.float64)    
    
    for i0_coarse in range(0, output_shape[0]):
        i0_fine_0 = i0_coarse * refine_factor[0]  ** d_level       
        i0_fine_1 = i0_fine_0 + lev0_Npixels_0
        
        for i1_coarse in range(0, output_shape[1]):
            i1_fine_0 = i1_coarse * refine_factor[1] ** d_level 
            i1_fine_1 = i1_fine_0 + lev0_Npixels_1
            for i2_coarse in range(0, output_shape[2]):
                i2_fine_0 = i2_coarse * refine_factor[2] ** d_level                
                i2_fine_1 = i2_fine_0 + lev0_Npixels_2                
                val = 0.0
                nvals = 0.0                
                for i0 in range(i0_fine_0, i0_fine_1):
                    for i1 in range(i1_fine_0, i1_fine_1):
                        for i2 in range(i2_fine_0, i2_fine_1):
                                        
                            val += covered_vals[i0, i1, i2]
                            nvals += 1.0
                val = float(val / nvals)
                output_array[i0_coarse,i1_coarse, i2_coarse] = val
                
    return output_array

In [18]:
level = 1
level_fine = 0
ichunk = 0 

lev_shape = get_level_shape(level, highest_res, refine_factor)
da_lev = da.empty(lev_shape,chunks=chunks)

si, ei = get_global_start_index(ichunk, da_lev.chunks)

# read in the level 0 range covered by this chunk
si0 = get_fine_ijk(si, level, 0, refine_factor)
ei0 = get_fine_ijk(ei, level, 0, refine_factor)

fine_zarr = zarr.open(zarr_file)[zarr_field]['0']
covered_vals = fine_zarr[si0[0]:ei0[0], si0[1]:ei0[1], si0[2]:ei0[2]]

In [19]:
%%timeit
output_array = coarsen(tuple(ei-si), level, level_fine, tuple(refine_factor), covered_vals, tuple(si))

5.21 ms ± 74.9 μs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [20]:
def get_chunks_by_dim(level_shape, chunks):
    nchunks = np.array(level_shape) // np.array(chunks)
    chunksizes = []
    for dim in range(len(chunks)):
        dim_chunks = []
        for ichunk in range(nchunks[dim]):
            dim_chunks.append(chunks[dim])
        chunksizes.append(tuple(dim_chunks))
    return tuple(chunksizes), nchunks

In [21]:
def write_chunk_values(ichunk, chunks, zarr_file, zarr_field, level, fine_level):
    si, ei = get_global_start_index(ichunk, chunks)
    
    # read in the level 0 range covered by this chunk
    si0 = get_fine_ijk(si, level, 0, refine_factor)
    ei0 = get_fine_ijk(ei, level, 0, refine_factor)

    fine_zarr = zarr.open(zarr_file)[zarr_field][str(fine_level)]
    covered_vals = fine_zarr[si0[0]:ei0[0], si0[1]:ei0[1], si0[2]:ei0[2]]

    outvals = coarsen(tuple(ei-si), level, fine_level, tuple(refine_factor), covered_vals, tuple(si))

    coarse_zarr = zarr.open(zarr_file)[zarr_field][str(level)]
    coarse_zarr[si[0]:ei[0], si[1]:ei[1]:, si[2]:ei[2]] = outvals
    
    return 1


In [22]:
%%time
for coarse_level in range(1, 5):    
    level = coarse_level
    fine_level = level - 1
    lev_shape = get_level_shape(level, highest_res, refine_factor)
    chunks_by_dim, nchunks_by_dim = get_chunks_by_dim(lev_shape, chunks)

    if np.any(nchunks_by_dim==0): 
        break
    
    field1 = zarr.open(zarr_file)[zarr_field]
    field1.empty(level, shape=lev_shape, chunks=chunks)
    
    numchunks = field1[str(level)].nchunks

    print(f"processing level {coarse_level}")
    chunk_writes = []
    for ichunk in range(0,numchunks):    
        chunk_writes.append(delayed(write_chunk_values)(ichunk, chunks_by_dim, zarr_file, zarr_field, level, fine_level))
    
    _ = compute(*chunk_writes)

processing level 1
processing level 2


KilledWorker: Attempted to run task 'write_chunk_values-2381f501-c216-4376-8f89-a5912bd0cea9' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:39863. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.