# Numba CPU MPI

In [14]:
%%bash
module load intel_psxe/2020
source /opt/intel/parallel_studio_xe_2020/intelpython3/etc/profile.d/conda.sh
python --version

Python 3.7.7 :: Intel(R) Corporation


In [3]:
# Mostra os recursos do nó de login
! lscpu | head -n 15 | grep "Model \|CPU(s):\|Thre\|Core\|NUMA\|MHz"

CPU(s):                24
Thread(s) per core:    1
Core(s) per socket:    12
NUMA node(s):          2
Model name:            Intel(R) Xeon(R) CPU E5-2695 v2 @ 2.40GHz
CPU MHz:               2800.195


## Código fonte

In [2]:
%%writefile numbampi.py
import numpy as np
from time import time
from numba import njit, set_num_threads, get_num_threads
from mpi4py import MPI   

# parameters
n            = 2400    # n x n grid
energy       = 1.0     # energy to be injected per iteration
niters       = 250     # number of iterations
# other variables
heat         = np.zeros((1), np.float64)     # system total heat
size         = n + 2
anew         = np.zeros((size, size), np.float64)
aold         = np.zeros((size, size), np.float64)
sources      = np.empty((3,2), np.int32)
sources[:,:] = [ [n//2, n//2], [n//3, n//3], [n*4//5, n*8//9] ]
niters       = (niters+1) // 2
nsources     = 3    # sources of energy
sources      = np.zeros((nsources, 2), np.intc)
sources[:,:] = [ [n//2, n//2], [n//3, n//3], [n*4//5, n*8//9] ]

# computationally intensive core
@njit('(float64[:,:],float64[:,:])', fastmath=True, parallel=True, nogil=True)
def kernel1(anew, aold) :
    anew[1:-1,1:-1]=1/2.0*(aold[1:-1,1:-1]+1/4.0*(aold[2:,1:-1]+aold[:-2,1:-1]+aold[1:-1,2:]+aold[1:-1,:-2]))

# main routine
comm = MPI.COMM_WORLD
mpirank = comm.rank
mpisize = comm.size

# sources in my area, local to my rank
locnsources = 0
locsources = np.empty((nsources,2), np.intc)

rheat = np.zeros(1, np.double)
bheat = np.zeros(1, np.double)

# determine my coordinates (x,y)
pdims = MPI.Compute_dims(mpisize, 2)
px    = pdims[0]
py    = pdims[1]
rx    = mpirank % px
ry    = mpirank // px

# determine my four neighbors
north = (ry - 1) * px + rx
if (ry - 1) < 0 :
    north = MPI.PROC_NULL
south = (ry + 1) * px + rx
if (ry + 1) >= py :
    south = MPI.PROC_NULL
west = ry * px + rx - 1
if (rx - 1) < 0 :
    west = MPI.PROC_NULL
east = ry * px + rx + 1
if (rx + 1) >= px :
    east = MPI.PROC_NULL

# decompose the domain
bx = n // px            # block size in x
by = n // py            # block size in y
offx = rx * bx + 1      # offset in x
offy = ry * by + 1      # offset in y

# determine which sources are in my patch
for i in range(nsources) :
    locx = sources[i, 0] - offx
    locy = sources[i, 1] - offy
    if(locx >= 0 and locx <= bx and locy >= 0 and locy <= by) :
        locsources[locnsources, 0] = locx + 2 - 1
        locsources[locnsources, 1] = locy + 2 - 1
        locnsources += 1

# working arrays with 1-wide halo zones
anew = np.zeros((bx+2, by+2), np.double)
aold = np.zeros((bx+2, by+2), np.double)

if not mpirank : t0 = time()

for _ in range(niters) :
    # exchange data with neighbors
    if north != MPI.PROC_NULL :
        r1=comm.irecv(source=north, tag=1)
        s1=comm.isend(aold[1, 1:bx+1], dest=north, tag=1)
    if south != MPI.PROC_NULL :
        r2=comm.irecv(source=south, tag=1)
        s2=comm.isend(aold[bx, 1:bx+1], dest=south, tag=1)
    if east != MPI.PROC_NULL :
        r3 = comm.irecv(source=east, tag=1)
        s3 = comm.isend(aold[1:bx+1, bx], dest=east, tag=1)
    if west != MPI.PROC_NULL :
        r4 = comm.irecv(source=west, tag=1)
        s4 = comm.isend(aold[1:bx+1, 1], dest=west, tag=1)
    # wait
    if north != MPI.PROC_NULL :
        s1.wait()
        aold[0, 1:bx+1] = r1.wait()
    if south != MPI.PROC_NULL :
        s2.wait()
        aold[bx+1, 1:bx+1] = r2.wait()
    if east != MPI.PROC_NULL :
        s3.wait()
        aold[1:bx+1, bx+1] = r3.wait()
    if west != MPI.PROC_NULL :
        s4.wait
        aold[1:bx+1, 0] = r4.wait()

    # update grid
    kernel1(anew, aold)

    # refresh heat sources
    for i in range(locnsources) :
        anew[locsources[i, 0]-1, locsources[i, 1]-1] += energy

    # exchange data with neighbors
    if north != MPI.PROC_NULL :
        r1=comm.irecv(source=north, tag=1)
        s1=comm.isend(anew[1, 1:bx+1], dest=north, tag=1)
    if south != MPI.PROC_NULL :
        r2=comm.irecv(source=south, tag=1)
        s2=comm.isend(anew[bx, 1:bx+1], dest=south, tag=1)
    if east != MPI.PROC_NULL :
        r3 = comm.irecv(source=east, tag=1)
        s3 = comm.isend(anew[1:bx+1, bx], dest=east, tag=1)
    if west != MPI.PROC_NULL :
        r4 = comm.irecv(source=west, tag=1)
        s4 = comm.isend(anew[1:bx+1, 1], dest=west, tag=1)
    # wait
    if north != MPI.PROC_NULL :
        s1.wait()
        anew[0, 1:bx+1] = r1.wait()
    if south != MPI.PROC_NULL :
        s2.wait()
        anew[bx+1, 1:bx+1] = r2.wait()
    if east != MPI.PROC_NULL :
        s3.wait()
        anew[1:bx+1, bx+1] = r3.wait()
    if west != MPI.PROC_NULL :
        s4.wait
        anew[1:bx+1, 0] = r4.wait()

    # update grid
    kernel1(aold, anew)

    # refresh heat sources
    for i in range(locnsources) :
        aold[locsources[i, 0]-1, locsources[i, 1]-1] += energy 

# get final heat in the system
bheat[0] = np.sum(aold[1:-1, 1:-1])
comm.Reduce(bheat, rheat)

if not mpirank :
    t0 = time() - t0
    print("Heat: %0.4f | Time: %0.4f | MPISize: %g" % (rheat[0], t0, mpisize) )

Overwriting numbampi.py


### Testa a execução

É importante verificar possíveis erros, antes de enviar para as filas de execução

In [9]:
%%bash
module load intel_psxe/2020
source /opt/intel/parallel_studio_xe_2020/intelpython3/etc/profile.d/conda.sh
unset I_MPI_PMI_LIBRARY
mpiexec -n 1 python numbampi.py

Heat: 750.0000 | Time= 1.0243 | MPISize= 1


In [13]:
%%bash
module load intel_psxe/2020
source /opt/intel/parallel_studio_xe_2020/intelpython3/etc/profile.d/conda.sh
unset I_MPI_PMI_LIBRARY
mpiexec -n 16 python numbampi.py

Heat: 750.0000 | Time: 1.1808 | MPISize: 16


### Copia arquivo com código python para /scratch

* /prj e /scratch possuem o mesmo diretório, criados em uma etapa anterior.
* o nó de execução não enxerga o /prj do nó de login, o que for rodar precisa ser copiado para /scratch, incluindo eventuais bibliotecas ou algum ambiente python que foi criado.
* o .srm (arquivo do slurm) não precisa copiar.

In [3]:
! cp  numbampi.py  /scratch${PWD#/prj}

In [4]:
%%writefile numbampi.srm
#!/bin/bash
#SBATCH --ntasks=81            #Total de tarefas
#SBATCH --job-name numbampi    #Nome do job, 8 caracteres
#SBATCH --partition cpu_dev    #Fila (partition) a ser utilizada
#SBATCH --time=00:01:00        #Tempo max. de execução
#SBATCH --exclusive            #Utilização exclusiva dos nós

echo '- Job ID:' $SLURM_JOB_ID
echo '- Tarefas por no:' $SLURM_NTASKS_PER_NODE
echo '- Qtd. de nos:' $SLURM_JOB_NUM_NODES
echo '- Tot. de tarefas:' $SLURM_NTASKS
echo '- Nos alocados:' $SLURM_JOB_NODELIST
nodeset -e $SLURM_JOB_NODELIST

#Modulos
module load intel_psxe/2020
source /opt/intel/parallel_studio_xe_2020/intelpython3/etc/profile.d/conda.sh

#Entra no diretório de trabalho
cd /scratch${PWD#/prj}

#Executavel
EXEC='python numbampi.py'

#Dispara a execucao
srun --mpi=pmi2  -n $SLURM_NTASKS  $EXEC

Overwriting numbampi.srm


## Envia para a fila de execução

In [5]:
! sbatch numbampi.srm

Submitted batch job 1355513


Verifica se já executou:

In [7]:
! squeue -n numbampi

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)


Mostra o arquivo contendo a saída:

In [8]:
! cat /scratch${PWD#/prj}/slurm-1355513.out

- Job ID: 1355513
- Tarefas por no:
- Qtd. de nos: 4
- Tot. de tarefas: 81
- Nos alocados: sdumont[1243-1246]
sdumont1243 sdumont1244 sdumont1245 sdumont1246
Heat: 750.0000 | Time: 2.5570 | MPISize: 81


Neste caso enviamos para fila `cpu_dev` que é uma fila "rápida" para executar testes, e para trabalhos pequenos.