In [10]:
import numpy as np
import time
from scipy import ndimage
from joblib import Parallel, delayed

# marfurt_semblance is an example function that we like to apply in a moving window to a large array. This process can be very slow. 

### please note that "marfurt_semblance" is an example. You can replace it with any filter.

In [11]:

def marfurt_semblance(region):

    region = region.reshape(-1, region.shape[-1])
    ntraces, nsamples = region.shape
   

    square_of_sums = np.sum(region, axis=0)**2
    sum_of_squares = np.sum(region**2, axis=0)
    
    sembl = square_of_sums.sum() / sum_of_squares.sum()
    
    r=sembl/ ntraces
    #print(r)
    return r



# Let's try the normal way, which only uses 1 core and can be very slow:

In [25]:
np.random.seed(10)
a=np.random.random((100,100,100))
window=(3,3,2)

s=time.time()
wrapped = lambda region: marfurt_semblance(region.reshape(window))
r0=ndimage.generic_filter(a, wrapped, window)
print('Runtime for single core (seconds): ',round(time.time()-s,2))

Runtime for single core (seconds):  20.23


# "sliceit" will create n smaller arrays that later we calcualte the filter for each one in parallel 

In [26]:
def sliceit(a,n_cpu,w):
    N=a.shape[0]
    step = N / n_cpu
    t_a_s=[]
    i_s=[]
    j_s=[]
    for i in range(n_cpu):    
        i1=round(step*i)
        i2=round(step*(i+1))
        i_s.append([i1,i2])
        d=i2-i1
        j1=max(0,i1-w)
        j2=min(i2+w,N)
        m=j2-j1
        ta=a[j1:j2,:]
        t_a_s.append(ta)
        j1=int((i!=0))*w
        j2=min(j1+d,m)        
        j_s.append([j1,j2])
    return(t_a_s,i_s,j_s)


def runit(ta):
    return ndimage.generic_filter(ta, wrapped, window)

In [27]:
s=time.time() 
r=np.zeros(a.shape)
n_cpu=16
t_a_s,i_s,j_s=sliceit(a,n_cpu,window[0])


s=time.time()     
tr_lst=Parallel(n_jobs=n_cpu,verbose=0)(
            delayed(runit)(t_a_s[i]) for i in range(n_cpu))


for i in range(n_cpu):
    tr=tr_lst[i]
    i1,i2=i_s[i]
    j1,j2=j_s[i]
    r[i1:i2,:]=tr[j1:j2,:]
print('Runtime for 16 cores (seconds): ',round(time.time()-s,2))

Runtime for 16 cores (seconds):  4.45


# Calculating the errpr: the difference between single core and the parallel process is calculated. The error should be zero.

In [28]:
rr=(r-r0)**2
print('Error:',np.sum(rr))

Error: 0.0
