In [1]:
import rasterio
import threading
import SharedArray
import numpy as np
import multiprocessing.sharedctypes
import ctypes

In [2]:
geotiffs = ['/g/data/v10/testing_ground/wofs_summary/wofs_' + cell + '_frequency.tif' for cell in ['15_-40', '16_-40']]

In [3]:
netcdfs = ['/g/data/v10/testing_ground/wofs_brl/output/LS8_OLI_WATER/15_-40/LS8_OLI_WATER_3577_15_-40_20130831235831000000_v1502857924.nc',
           '/g/data/v10/testing_ground/wofs_brl/output/LS8_OLI_WATER/15_-40/LS8_OLI_WATER_3577_15_-40_20130831235855000000_v1502857924.nc']

In [4]:
def readtif(filename):
    with rasterio.open(filename, 'r') as f:
        return f.read(1)
def readnc(filename):
    return readtif('NetCDF:'+filename+':water')

In [22]:
def sequential(func, arg1, arg2):
    out = np.zeros((2,4000,4000), dtype=np.float32)
    out[0] = func(arg1)
    out[1] = func(arg2)
    return out

def parthread(func, arg1, arg2):
    out = np.zeros((2,4000,4000), dtype=np.float32)
    def f(i, arg):
        out[i] = func(arg)
    a = threading.Thread(target=f, args=(0, arg1))
    b = threading.Thread(target=f, args=(1, arg2))
    a.start()
    b.start()
    a.join()
    b.join()
    return out

def task(i, f, arg):
    a = SharedArray.attach('shm://array')
    a[i] = f(arg)
def parprocess(func, arg1, arg2): # processes not threads
    array = SharedArray.create('shm://array', (2,4000,4000), dtype=np.float32)
    a = multiprocessing.Process(target=task, args=(0, func, arg1))
    b = multiprocessing.Process(target=task, args=(1, func, arg2))
    a.start()
    b.start()
    a.join()
    b.join()
    SharedArray.delete('array')
    return array

def task2(i, f, arg, array):
    array = task2.arr
    array[i] = f(arg)
def parprocess2(func, arg1, arg2): # processes not threads
    share = multiprocessing.sharedctypes.RawArray('b', int(2*4000*4000*int(32/8)))
    array = np.frombuffer(share, dtype=np.float32).reshape((2,4000,4000))
    a = multiprocessing.Process(target=task2, args=(0, func, arg1, array))
    b = multiprocessing.Process(target=task2, args=(1, func, arg2, array))
    a.start()
    b.start()
    a.join()
    b.join()
    return array

class tasker:
    def __init__(self, func, array):
        self.f = func
        self.array = array
    def __call__(self, i, arg):
        self.array[i] = self.f(arg)

def parprocess3(func, arg1, arg2): # processes not threads
    share = multiprocessing.sharedctypes.RawArray('b', int(2*4000*4000*int(32/8)))
    array = np.frombuffer(share, dtype=np.float32).reshape((2,4000,4000))
    task2.arr = array
    with multiprocessing.Pool(4) as pool:
        pool.starmap(task2, [(0, func, arg1, array),(1, func, arg2, array)])
    return array

def parprocess4(func, arg1, arg2): # processes not threads
    share = multiprocessing.sharedctypes.RawArray('b', int(2*4000*4000*int(32/8)))
    array = np.frombuffer(share, dtype=np.float32).reshape((2,4000,4000))
    t = tasker(func, array)
    with multiprocessing.Pool(4) as pool:
        pool.starmap(t, enumerate([arg1, arg2]))
    #[t(*x) for x in enumerate([arg1, arg2])]
    return array

In [None]:
%timeit sequential(readtif, *geotiffs)

In [None]:
%timeit parthread(readtif, *geotiffs)

In [None]:
%timeit parprocess2(readtif, *geotiffs)

In [None]:
%timeit parprocess(readtif, *geotiffs)

Note, this netcdf is uint8 not float32

In [None]:
%timeit sequential(readnc, *netcdfs)

In [None]:
%timeit parthread(readnc, *netcdfs)

In [None]:
%timeit parprocess(readnc, *netcdfs)

In [None]:
%timeit parprocess2(readnc, *netcdfs)

In [23]:
parprocess4(readnc, *netcdfs)

array([[[ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        ..., 
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.]],

       [[ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        ..., 
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.],
        [ 0.,  0.,  0., ...,  0.,  0.,  0.]]], dtype=float32)

In [32]:
import multiprocessing.sharedctypes
import numpy as np

def task(i):
    global x
    x[i] = i

sharedresource = multiprocessing.sharedctypes.RawArray('b', 10)
x = np.frombuffer(sharedresource, dtype=np.uint8)

with multiprocessing.Pool(4) as pool:
    pool.map(task, range(10)) # implicitly modify x
    
print(x) # [0 1 2 3 4 5 6 7 8 9]

def task2(i, array):
    array[i] = 7

with multiprocessing.Pool(4) as pool:
    pool.starmap(task2, ((i,x) for i in range(10))) # explicit fail

print(x) # still [0 1 2 3 4 5 6 7 8 9]

[0 1 2 3 4 5 6 7 8 9]
[0 1 2 3 4 5 6 7 8 9]
