# Final Computación Paralela: 
   ## Códigos

In [None]:
import h5py
import numpy as np
from numba import njit, jit
from concurrent.futures import ProcessPoolExecutor as PE
from joblib import Parallel, delayed
from multiprocessing import Pool
%load_ext line_profiler

Se presentan los códigos y sus mejoras, los casos ''Numba'' salen de aplicar los decoradores comentados al comienzo de cada línea.

La función **tray_0** es la utilizada para Py$_0$ y Py$_\textrm{N}$ (cuando @jit no está comentado)

In [None]:
#@jit
def tray_0(i):
    if   i < 10 :       # Poner el path que corresponda
        f = h5py.File('b2y280x2fC/AAA_cme_hdf5_plt_cnt_000'+str(i),'r')
    elif i < 100 :
        f = h5py.File('b2y280x2fC/AAA_cme_hdf5_plt_cnt_00'+str(i),'r')
    else :
        f = h5py.File('b2y280x2fC/AAA_cme_hdf5_plt_cnt_0'+str(i),'r')
        
    time=i*20
    # ---->
    nxb  = list(f['integer scalars'][0])[1]     # Número de celdas en x
    nyb  = list(f['integer scalars'][1])[1]     # Número de celdas en y
    boundb = f['bounding box'][:,:,:]           # Coordenadas borde de cada bloque
    Bmax = np.max(np.abs(f['magz'][:,0,:,:]))   # campo |Bz| 
    
    # ---->
    ind, yid, xid =np.where(np.abs(f['magz'][:,0,:,:])==Bmax)
    # ---
    xlb = boundb[ind[0],0,0]                    #límites del bloque del FR
    xrb = boundb[ind[0],0,1]
    ylb = boundb[ind[0],1,0]
    yrb = boundb[ind[0],1,1]
    # ---->
    celx = xlb+xid[0]*(xrb-xlb)/nxb             # Posición (x,y)
    cely = ylb+yid[0]*(yrb-ylb)/nyb
    # ---->
    return [time, celx*10**(-8), cely*10**(-8), Bmax]

In [None]:
%%timeit
out = open('b2y280x2fC_0.txt', 'w')
for i in range(201):
    t, x, y, Bmax = tray_0(i)
    out.write("%d \t %f \t %f \t %f \n" %(t, x, y, Bmax)) 
out.close()

Para realizar las mejoras se utiliza un line profiler que permite calcular la demora y los hits de cada linea de código.

In [None]:
%lprun -f tray_0 tray_0(200)

Se realizan mejoras del algoritmo. Las funciones **tray_1** con **MaxIdx** son las utilizadas para Py$_A$ y Py$_\textrm{N+A}$ (cuando @jit no está comentado)

In [None]:
#@jit
def MaxIdx(a):
    return np.unravel_index(np.argmax(a), a.shape)
#@jit
def tray_1(i,path='b2y280x2fC'):
    f      = h5py.File(path+'/AAA_cme_hdf5_plt_cnt_{:04d}'.format(i), 'r')          

    # ---->
    nxb  = list(f['integer scalars'][0])[1]     # Número de celdas en x
    nyb  = list(f['integer scalars'][1])[1]     # Número de celdas en y
    Bz   = np.abs(f['magz'][:,0,:,:])           # Campo |Bz| 
    # ---->
    # Indice, posición y-x del bloque con Bz máximo.
    ind, yid, xid = MaxIdx(Bz) 
    # ---->
    Bmax = Bz[ind,yid,xid] # Campo máximo (módulo)
    xlb = f['bounding box'][ind,0,0]  #límites del bloque del FR
    xrb = f['bounding box'][ind,0,1]
    ylb = f['bounding box'][ind,1,0]
    yrb = f['bounding box'][ind,1,1]
    # ---->
    # Posición (x,y)
    celx = xlb+xid*(xrb-xlb)/nxb
    cely = ylb+yid*(yrb-ylb)/nyb

    # ---->
    return [i*20, celx*10**(-8), cely*10**(-8), Bmax]

In [None]:
%%timeit 
out = open('b2y280x2fC_Alg.txt', 'w')
for i in range(201):
    t, x, y, Bmax = tray_1(i)
    out.write("%d \t %f \t %f \t %f \n" %(t, x, y, Bmax)) 
out.close()

La función **unravel_index** no es soportada por Numba ya que devuelve tuplas. Se reescribe esta función manualmente y específicamente para las 3 dimensiones del problema (n, x, y) y se aplican otras mejoras más en el algoritmo.
Estas mejoras por ejemplo evitan re-calcular nxb y nyb ya que son cantidades fijas durante toda la simulación. Además se define la variable **'bounding box'** para localizar la memoria. 

In [None]:
path='b2y280x2fC'
g = h5py.File(path+'/AAA_cme_hdf5_plt_cnt_0000','r')
nx  = list(g['integer scalars'][0])[1]     # Número de celdas en x
ny  = list(g['integer scalars'][1])[1]     # Número de celdas en y

Las funciones **tray_2** con **MaxIdx_n** son las utilizadas para Py$_{A2}$ y Py$_\textrm{N+A2}$ (cuando los decoradores de Numba no están comentados)

In [None]:
#@njit ó @njit(parallel=True) ó @njit(fastmath=True) ó @njit(parallel=True,fastmath=True) 
def MaxIdx_n(a):
    ind = np.argmax(a)
    out = np.ones(3,dtype=np.intc)
    for i in range(3):
        dim = a.shape[2-i]
        out[2-i] = ind % dim
        ind = ind // dim
    return out  
#@jit
def tray_2(i,path=path,nxb=nx,nyb=ny):
    f      = h5py.File(path+'/AAA_cme_hdf5_plt_cnt_{:04d}'.format(i), 'r')        
    # ---->
    Bz   = np.abs(f['magz'][:,0,:,:]) # Campo |Bz| 
    # ---->
    # Indice, posición y x del bloque con Bz máximo.
    ind, yid, xid = MaxIdx_n(Bz) 
    # ---->
    bb  = f['bounding box'][ind,:,:]
    xlb = bb[0,0]  #límites del bloque 
    xrb = bb[0,1]
    ylb = bb[1,0]
    yrb = bb[1,1]
    # ---->
    # Posición (x,y)
    celx = xlb+xid*(xrb-xlb)/nxb
    cely = ylb+yid*(yrb-ylb)/nyb    
    # ---->
    
    return [i*20, celx*10**(-8), cely*10**(-8), Bz[ind,yid,xid]]

In [None]:
%%timeit 
out = open('b2y280x2fC_Alg2.txt', 'w')
for i in range(201):
    t, x, y, Bmax = tray_2(i)
    out.write("%d \t %f \t %f \t %f \n" %(t, x, y, Bmax)) 
out.close()

Para las corridas en paralelo se define un vector donde se guardan las salidas, una lista *ies* para el PoolExecutor de la librería **concurrent.futures** y el número *a* para el Pool.map de **multiprocessing** y Parallel de **Joblib**.

Se usa la función **tray_2** sin los decoradores de Numba ya que empeoran la *performance* en la corrida sobre varios procesadores.

In [None]:
vec = np.zeros((201,4),dtype=np.float32)
ies = list(range(201))
a   = 201

In [None]:
%%timeit
#multiprocessing
with Pool(processes=1) as pool:
    results = pool.map(tray_2, range(a))
vec= [result[:] for result in results]
np.savetxt('b2y280x2fC_parM.txt', vec, fmt='%e')

In [None]:
%%timeit
#concurrent.futures
with PE(max_workers=1) as exce:
    results = exce.map(tray_2, ies)
vec= [result[:] for result in results]
np.savetxt('b2y280x2fC_parCF.txt', vec, fmt='%e')

In [None]:
%%timeit
#Joblib
vec = Parallel(n_jobs=1)(
...     delayed(tray_2)(i) for i in range(a))
np.savetxt('b2y280x2fC_parJ.txt', vec, fmt='%e')