Accessing IPython cluster clients and printing their ids to check.

In [38]:
from ipyparallel import Client

clients = Client(cluster_id='mpi')
clients.block = True  # use synchronous computations
print(clients.ids)

[0, 1, 2]


Importing mpi4py and numpy.

In [39]:
%%px
from mpi4py import MPI
import numpy as np

Parallel execution on engine(s): all


Implementing Convolution, you don't need to modify this code.

In [40]:
%%px
def convolve_func(main,kernel,KERNEL_DIM,DIMx,DIMy,upper_pad,lower_pad):
	num_pads = int((KERNEL_DIM - 1) / 2)
	conv = np.zeros(main.shape,dtype=int)
	main = np.concatenate((upper_pad,main,lower_pad))
	for i in range(DIMy):
		for j in range(DIMx):
			for k in range(KERNEL_DIM):
				for l in range(KERNEL_DIM):
					if j+l <= DIMx+1 and i+k>=num_pads and i+k<=DIMy:
						conv[j*DIMy+i] += main[(j+l)*DIMy+i-num_pads+k]#*kernel[k][l]
	return conv

Parallel execution on engine(s): all


5 points: 
Load MPI communicator, get the total number of processes and rank of the process                              

In [41]:
%%px
#and also print total number of processes and rank from each process
from mpi4py import MPI

comm = MPI.COMM_WORLD     
rank = comm.Get_rank()    
size = comm.Get_size()    

print(f"Hello from process {rank} out of {size}")


Parallel execution on engine(s): all


[stdout:0] Hello from process 0 out of 3


[stdout:2] Hello from process 2 out of 3


[stdout:1] Hello from process 1 out of 3


5 points: 
Load or initialize data array and kernel array only in process 0(rank 0)                                      

In [42]:
%%px --targets 0
DIMx = 0
DIMy = 0
KERNEL_DIM = 0
img = None
kernel = None
#Add a condition such that these intializations below should happen in only process 0
img = np.array([[3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5],[3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5],[3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5]])
kernel = np.array([[0, 1, 0],[0, 0, 0],[0, -1, 0]])
DIMx = img.shape[0]
DIMy = img.shape[1]
KERNEL_DIM = int(kernel.shape[0])

print(f"[rank {rank}] img={None if img is None else img.shape}, "
      f"kernel={None if kernel is None else kernel.shape}, "
      f"DIMy={DIMy}, DIMx={DIMx}, KERNEL_DIM={KERNEL_DIM}")

Parallel execution on engine(s): 0


[stdout:0] [rank 0] img=(9, 4), kernel=(3, 3), DIMy=4, DIMx=9, KERNEL_DIM=3


10 points: 
Broadcast data and kernel array sizes from process 0 to  all other processes                                 

In [43]:
%%px
#broadcast data and kernel array sizes (think why we are broadcasting sizes)
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

DIMx = comm.bcast(DIMx if rank == 0 else None, root=0)
DIMy = comm.bcast(DIMy if rank == 0 else None, root=0)
KERNEL_DIM = comm.bcast(KERNEL_DIM if rank == 0 else None, root=0)

print(f"[rank {rank}] received sizes -> DIMx={DIMx}, DIMy={DIMy}, KERNEL_DIM={KERNEL_DIM}")


Parallel execution on engine(s): all


[stdout:0] [rank 0] received sizes -> DIMx=9, DIMy=4, KERNEL_DIM=3


[stdout:1] [rank 1] received sizes -> DIMx=9, DIMy=4, KERNEL_DIM=3


[stdout:2] [rank 2] received sizes -> DIMx=9, DIMy=4, KERNEL_DIM=3


Initialize empty kernel array for all  processes except rank = 0, why we are not initialzing kernel array for rank 0?

Ans:Kernel array is not initialized for rank 0 because process 0 already holds the real kernel data. Other processes only need an empty array to receive the kernel when it is broadcasted.

In [None]:
!cd


In [44]:
%%px
#initialize empty kernel array except for process 0(rank=0)
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank != 0:
    kernel = np.zeros((KERNEL_DIM, KERNEL_DIM), dtype=int)

print(f"[rank {rank}] kernel init ->",
      "has data" if rank==0 else f"zeros {kernel.shape}")


Parallel execution on engine(s): all


[stdout:2] [rank 2] kernel init -> zeros (3, 3)


[stdout:0] [rank 0] kernel init -> has data


[stdout:1] [rank 1] kernel init -> zeros (3, 3)


10 points: 
Broadcast Kernel array from rank 0 to all other processes.                                                   

In [45]:
%%px
#broadcast kernel array from rank 0 to all other processes
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

kernel = comm.bcast(kernel if rank == 0 else None, root=0)

print(f"[rank {rank}] kernel shape = {getattr(kernel, 'shape', None)}  first_row = {kernel[0].tolist()}")


Parallel execution on engine(s): all


[stdout:0] [rank 0] kernel shape = (3, 3)  first_row = [0, 1, 0]


[stdout:2] [rank 2] kernel shape = (3, 3)  first_row = [0, 1, 0]


[stdout:1] [rank 1] kernel shape = (3, 3)  first_row = [0, 1, 0]


25 points: 
Split the rows in data array equally and scatter them from process 0 to all other process. To split them 
equally, number of rows in the data array must be a integral multiple of number of processes. MPI has ways 
to send unequal chunks of data between processses. But for here you can do with equal number.

In [46]:
%%px
#split and send data array to corresponding processses (you need to initialize a buffer to receive data from 
#process 0, similar to the random initializing done for kernel array)


#Here does we initialize buffer for process 0 also, if so why?(Hint: because of the function we are using to send 
#and receieve data)
rows_per_proc = DIMx // size
assert DIMx % size == 0, f"DIMx({DIMx}) must be a multiple of size({size})"

if rank == 0:
    chunks = [img[p*rows_per_proc:(p+1)*rows_per_proc, :] for p in range(size)]
else:
    chunks = None

local_img = comm.scatter(chunks, root=0)

gstart = rank * rows_per_proc
gend   = (rank + 1) * rows_per_proc
print(f"[rank {rank}] got block shape={local_img.shape} "
      f"(global rows {gstart}:{gend}) first_row={local_img[0].tolist()}")

Parallel execution on engine(s): all


[stdout:0] [rank 0] got block shape=(3, 4) (global rows 0:3) first_row=[3, 9, 5, 9]


[stdout:1] [rank 1] got block shape=(3, 4) (global rows 3:6) first_row=[3, 9, 5, 9]


[stdout:2] [rank 2] got block shape=(3, 4) (global rows 6:9) first_row=[3, 9, 5, 9]


25 points: 
For convolution of kernel array and data array, you have to pass the kernel padding rows from one
process to another. please see objective for more details. Send and Recieve rows from one process 
to other. Careful with the data size and tags you are sending and receiving should match otherwise
commincator will wait for them indefintely.                                                                  

In [47]:
%%px
#send padding rows from one process to other (carefully observe which process to send data to which process and
# which process receives the data)
top_n = rank - 1 if rank > 0       else MPI.PROC_NULL
bot_n = rank + 1 if rank < size-1  else MPI.PROC_NULL

send_to_top = np.ascontiguousarray(local_img[0, :])
send_to_bot = np.ascontiguousarray(local_img[-1, :])

upper_pad = np.zeros((1, DIMy), dtype=int) if top_n == MPI.PROC_NULL else np.empty((1, DIMy), dtype=int)
lower_pad = np.zeros((1, DIMy), dtype=int) if bot_n == MPI.PROC_NULL else np.empty((1, DIMy), dtype=int)

comm.Sendrecv(sendbuf=send_to_top, dest=top_n, sendtag=11,
              recvbuf=upper_pad[0, :], source=top_n, recvtag=22)

comm.Sendrecv(sendbuf=send_to_bot, dest=bot_n, sendtag=22,
              recvbuf=lower_pad[0, :], source=bot_n, recvtag=11)

print(f"[rank {rank}] upper_pad.sum()={int(upper_pad.sum())}  "
      f"lower_pad.sum()={int(lower_pad.sum())}  "
      f"local_img shape={local_img.shape}")

Parallel execution on engine(s): all


[stdout:1] [rank 1] upper_pad.sum()=14  lower_pad.sum()=26  local_img shape=(3, 4)


[stdout:2] [rank 2] upper_pad.sum()=14  lower_pad.sum()=0  local_img shape=(3, 4)


[stdout:0] [rank 0] upper_pad.sum()=0  lower_pad.sum()=26  local_img shape=(3, 4)


Why we are loading data into process 0 and broadcasting input data to all other processes? are there any other methods to load data into all processes (not for evaluation)

Ans:The data is first loaded in process 0 so it can send the data to all other processes. This makes it easier to manage and keeps the data the same for everyone. Another way is to let each process read its own part of the data at the same time, but that needs special support like MPI-IO.

5 points: 
Perform Convolution operation by calling convolve_func() provided for each of the process with 
corresponding rows as arguments.                                                                             

In [48]:
%%px
#convolution function arguments
#main - data array (flattened array), only the part of the data array that is processed for each process
#kernel - kernel array
#DIMy - ColumnSize
#DIMx - RowSize
#upper_pad = upper padding row
#lower_pad = lower padding row
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()

DIMx_local = local_img.shape[0]
assert kernel.shape == (KERNEL_DIM, KERNEL_DIM), f"rank {rank} kernel shape wrong: {kernel.shape}"
assert local_img.shape[1] == DIMy,               f"rank {rank} DIMy mismatch: {local_img.shape}"

main_1d  = local_img.reshape(-1)          # or .ravel()
up_1d    = upper_pad.reshape(-1)
low_1d   = lower_pad.reshape(-1)

local_conv = convolve_func(
    main_1d,
    kernel,
    KERNEL_DIM,
    DIMx_local,
    DIMy,
    up_1d,
    low_1d
)

print(f"[rank {rank}] local_conv shape = {local_conv.shape}  first_row = {local_conv[0].tolist() if hasattr(local_conv[0],'tolist') else local_conv[0]}")


Parallel execution on engine(s): all


[stdout:1] [rank 1] local_conv shape = (12,)  first_row = 23


[stdout:0] [rank 0] local_conv shape = (12,)  first_row = 20


[stdout:2] [rank 2] local_conv shape = (12,)  first_row = 23


10 points: 
Gather the computed convolutional matrix rows to process 0.                                                 

In [49]:
%%px
#To receive data from all processes, process 0 should have a buffer
sendbuf = local_conv.astype(int)

recvbuf = None
if rank == 0:
    recvbuf = np.empty(DIMx_local * DIMy * size, dtype=int)
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
    print(f"[rank 0] gathered length = {recvbuf.size}, expected = {DIMx_local*DIMy*size}")
    print(f"[rank 0] head 8 values:", recvbuf[:8].tolist())

Parallel execution on engine(s): all


[stdout:0] [rank 0] gathered length = 36, expected = 36
[rank 0] head 8 values: [20, 29, 37, 21, 23, 38, 49, 32]


Reshape the flattened array to match input dimensions

In [50]:
%%px
#Reshape the collected array to the input image dimensions
if rank == 0:
    DIMx_full = DIMx_local * size
    full_conv = recvbuf.reshape(DIMx_full, DIMy)

    print(f"[rank 0] full_conv shape = {full_conv.shape}")
    print(f"[rank 0] first row:  {full_conv[0].tolist()}")
    print(f"[rank 0] middle row: {full_conv[DIMx_local].tolist()}")   
    print(f"[rank 0] last row:   {full_conv[-1].tolist()}")

Parallel execution on engine(s): all


[stdout:0] [rank 0] full_conv shape = (9, 4)
[rank 0] first row:  [20, 29, 37, 21]
[rank 0] middle row: [23, 38, 49, 32]
[rank 0] last row:   [11, 21, 26, 18]


5 points: 
Test to check sequential convolution and MPI based parallel convolution outputs                               

In [51]:
%%px


if rank == 0:
    #main_grid is the actual input input image array that is flattened
    #convolution function arguments
    #main_grid - data array (flattened array)
    #kernel - kernel array
    #DIMy - ColumnSize
    #Dimx - RowSize
    #upper_pad = upper padding row
    #lower_pad = lower padding row
    up0  = np.zeros((1, DIMy), dtype=int)
    low0 = np.zeros((1, DIMy), dtype=int)
    #rename the below arguments according to your variable names
    
    #Entire convolution in a single process
    conv1 = convolve_func(
        img.reshape(-1),
        kernel,
        KERNEL_DIM,
        DIMx,
        DIMy,
        up0.reshape(-1),
        low0.reshape(-1)
    )
    conv1 = conv1.reshape(DIMx, DIMy)
    #recvbuf is the convolution computed by parallel processes and gathered in process 0, 
    #if you named it different, modify that name below
    
    #Checking with parallel convolution output
    print(np.array_equal(conv1.ravel(), recvbuf))

Parallel execution on engine(s): all


[stdout:0] True
