In [2]:
import numpy as np
shape = 4096000

a = np.arange(shape, dtype=np.int64)
b = np.empty_like(a, dtype=np.int64)

print(a)
# prints [0 1 2 ... 4095997 4095998 4095999]

[      0       1       2 ... 4095997 4095998 4095999]


In [3]:
def process_fn():
    # iterating and squaring each element in a and store to b
    with np.nditer([a, b],
                   op_flags=[['readonly'], ['readwrite']]) as it:
        with it:
           for x,y in it:
                y[...] = x**2

%timeit process_fn()  # 3.55 s ± 22.7 ms per loop
print(b)
# prints [0 1 4 ... 16777191424009 16777199616004 16777207808001]

3.48 s ± 27 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[             0              1              4 ... 16777191424009
 16777199616004 16777207808001]


In [4]:
import daisy
import zarr

shape = 4096000
block_shape = 1024*16

# input array is wrapped in daisy.Array for easy of Roi indexing
a = daisy.Array(np.arange(shape, dtype=np.int64),
                roi=daisy.Roi((0,), shape),
                voxel_size=(1,))

# to parallelize across processes, we need persistent read/write arrays
# we'll use zarr here to do do that
b = zarr.open_array(zarr.TempStore(), 'w', (shape,),
                    chunks=(block_shape,),
                    dtype=np.int64)
# output array is wrapped in daisy.Array for easy of Roi indexing
b = daisy.Array(b,
                roi=daisy.Roi((0,), shape),
                voxel_size=(1,))

In [7]:
# same process function as previously, but with additional code
# to read and write data to persistent arrays
def process_fn_daisy(block):

    a_sub = a[block.read_roi].to_ndarray()
    b_sub = np.empty_like(a_sub)
    with np.nditer([a_sub, b_sub],
                   op_flags=[['readonly'], ['readwrite']],
                  ) as it:
        with it:
           for x,y in it:
                y[...] = x**2
    
    b[block.write_roi] = b_sub

In [11]:
total_roi = daisy.Roi((0,), shape)  # total ROI to map process over
block_roi = daisy.Roi((0,), (block_shape,))  # block ROI for parallel processing

# creating a Daisy task, note that we do not specify how each
# worker should read/write to input/output arrays
task = daisy.Task(
    total_roi=total_roi,
    read_roi=block_roi,
    write_roi=block_roi,
    process_function=process_fn_daisy,
    num_workers=8,
    task_id='square',
)

daisy.run_blockwise([task])
'''
prints Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully
'''

print(b.to_ndarray())
# prints [0 1 4 ... 16777191424009 16777199616004 16777207808001]

square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully
[             0              1              4 ... 16777191424009
 16777199616004 16777207808001]


In [10]:
# run this to benchmark daisy!
%timeit daisy.run_blockwise([task])

square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully


square ▶:   0%|          | 0/250 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task square:

    num blocks : 250
    completed ✔: 250 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully
1.25 s ± 9.63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
import multiprocessing

reduce_shape = shape/16

# while using zarr with daisy.Array can be easier to understand and less error prone, it is not a requirement.
# Here we make a shared memory array for collecting results from different workers
c = multiprocessing.Array('Q', range(int(shape/reduce_shape)))

def process_fn_sum_reduce(block):
    b_sub = b[block.write_roi].to_ndarray()
    s = np.sum(b_sub)
    # compute c idx based on block offset and shape
    idx = (block.write_roi.offset / block.write_roi.shape)[0]
    c[idx] = s

total_roi = daisy.Roi((0,), shape)  # total ROI to map process over
block_roi = daisy.Roi((0,), reduce_shape)  # block ROI for parallel processing

task1 = daisy.Task(
    total_roi=total_roi,
    read_roi=block_roi,
    write_roi=block_roi,
    process_function=process_fn_sum_reduce,
    num_workers=8,
    task_id='sum_reduce',
)

daisy.run_blockwise([task1])
print(c[:])


sum_reduce ▶:   0%|          | 0/16 [00:00<?, ?blocks/s]


Execution Summary
-----------------

  Task sum_reduce:

    num blocks : 16
    completed ✔: 16 (skipped 0)
    failed    ✗: 0
    orphaned  ∅: 0

    all blocks processed successfully
[5592372565376000, 39146739029376000, 106255537493376000, 206918767957376000, 341136430421376000, 508908524885376000, 710235051349376000, 945116009813376000, 1213551400277376000, 1515541222741376000, 1851085477205376000, 2220184163669376000, 2622837282133376000, 3059044832597376000, 3528806815061376000, 4032123229525376000]
