In [None]:
!pip install pyspark numpy mpi4py

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mpi4py
  Downloading mpi4py-3.1.5.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m55.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: pyspark, mpi4py
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=cb3a4c5519cab867ce5cf6de8655c300da4ab833562f8269b24031175e0ee83e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
  Building wh

In [None]:
from mpi4py import MPI
import numpy as np

# Initialize MPI
comm = MPI.COMM_WORLD
# Get rank of the communicator
rank = comm.Get_rank()
# Size of the process
size = comm.Get_size()
# Variable Initialization
N = int(16)
sum_xy = []
executiontime = 0

# Master Process
if rank != 0:
    start = MPI.Wtime()
    a = comm.recv(source=0)
    b = comm.recv(source=0)
    comm.send(a + b, dest=0, tag=1)
    end = MPI.Wtime()
    comm.send(end - start, dest=0, tag=2)
    print("Time taken by worker ", rank, "is: ", end - start)
# Worker Process
else:
    start = MPI.Wtime()
    x = np.random.randint(100, size=N)
    y = np.random.randint(100, size=N)
    # Splitting the dataset depending on the size of the workers
    split_x = np.array_split(x, size)
    split_y = np.array_split(y, size)

    if rank == 0:
        # Master's work
        sum_xy.extend(list(split_x[rank] + split_y[rank]))
        for worker in range(1, size):
            # Point-to-point communication to worker
            comm.send(split_x[worker], dest=worker)
            comm.send(split_y[worker], dest=worker)
            sum_xy.extend(list(comm.recv(source=worker, tag=1)))
            executiontime += comm.recv(source=worker, tag=2)

        end = MPI.Wtime()
        executiontime += (end - start)
print("X:", x)
print("Y:", y)
print("Final result:", sum_xy)
print("Total execution time:", executiontime)

X: [28 37 82 54 38 27  9  3 12 64 10 70 52 27 29 90]
Y: [67 61 40 12 76 93 25 65  1 79 80 46 57 28 66 29]
Final result: [95, 98, 122, 66, 114, 120, 34, 68, 13, 143, 90, 116, 109, 55, 95, 119]
Total execution time: 0.00037996899999370726


In [None]:
#Importing libraries
from mpi4py import MPI
import numpy as np

#Initialize MPI
comm = MPI.COMM_WORLD
#Get rank of the worker
rank = comm.Get_rank()
#Get size of the process
size = comm.Get_size()

#Initialize variable
N = int(1e4)
average = None
average_x = []
executiontime = 0

#Master Process
if rank != 0:
    start = MPI.Wtime()
    a = comm.recv(source = 0)
    comm.send(np.sum(a)/len(a), dest = 0, tag = 1)
    end = MPI.Wtime()
    comm.send(end-start,dest=0,tag=2)
    print("Time taken by worker ",rank,"is: ", end - start)

#Worker Process
else:
    start = MPI.Wtime()
    x = np.random.randint(100,size=N)

    #Split data based on number of workers
    split_x = np.array_split(x,size)

    #Master's work
    if rank == 0:
        average_x.append(np.sum(split_x[rank])/len(split_x[rank]))

    #Worker's part
    for worker in range(1,size):
        comm.send(split_x[worker], dest = worker)
        average_x.append(comm.recv(source = worker,tag=1))
        executiontime+=comm.recv(source=worker,tag=2)

    #Average of the vector
    average = np.average(average_x)
    end = MPI.Wtime()
    executiontime += (end - start)
    print("Time taken by worker ",rank,"is: ", end - start)
    print("Total executed time: ", np.sum(executiontime))
    print("X:", x)
    print("Y:", y)
    print("Average:",average)

Time taken by worker  0 is:  0.0034947259999853486
Total executed time:  0.0034947259999853486
X: [77  9 25 ... 84 92 19]
Y: [67 61 40 12 76 93 25 65  1 79 80 46 57 28 66 29]
Average: 49.0919


In [6]:
#Import Library
from mpi4py import MPI
import numpy as np

#Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

N = int(1e4)
VecMult = []
executiontime = 0

if rank != 0:
    start = MPI.Wtime()
    A = comm.recv(source = 0)
    b = comm.recv(source = 0)
    comm.send(np.matmul(A,b), dest = 0, tag = 1)
    end = MPI.Wtime()
    comm.send(end-start,dest=0,tag=2)
    print("Time taken by worker ",rank,"is: ", end - start)

else:
    start = MPI.Wtime()
    A = np.random.randint(100,size=(N,N))
    b = np.random.randint(100,size=(N,1))
    split_A= np.array_split(A,size)

    if rank == 0:
        out = np.matmul(split_A[rank],b)
        VecMult.extend(out.flatten().tolist())

    for worker in range(1,size):
        comm.send(split_A[worker], dest = worker)
        comm.send(b,dest=worker)
        out = comm.recv(source = worker,tag=1)
        VecMult.extend(out.flatten().tolist())
        executiontime+=comm.recv(source=worker,tag=2)
    end = MPI.Wtime()
    executiontime += (end - start)
    print("Time taken by worker ",rank,"is: ", end - start)
    print("Total executed time: ", np.sum(executiontime))
    print("\nA:",A)
    print("b:",b)
    print("Vector Multiplication:",VecMult)
    print(len(VecMult))

Time taken by worker  0 is:  1.7812024039999415
Total executed time:  1.7812024039999415

A: [[91 22  8 ... 40 99 31]
 [59 89  3 ... 75 63 54]
 [15 19 49 ... 85 79 13]
 ...
 [53  8 63 ...  4 21 87]
 [73 26 63 ... 50 95 24]
 [92 39 88 ... 24 13 37]]
b: [[15]
 [60]
 [14]
 ...
 [22]
 [12]
 [13]]
Vector Multiplication: [24536375, 24353021, 24476267, 24805299, 24686796, 24631277, 24632735, 24565150, 24747715, 24771083, 24490490, 24509695, 24672412, 24553417, 24731650, 24856083, 24653112, 24484803, 24492131, 24777357, 24382606, 24850205, 24533151, 24558471, 24695129, 24753761, 24740653, 24579694, 24778699, 24925048, 24623756, 24639091, 24833003, 24661475, 24711271, 24843815, 24539970, 24266490, 24464473, 24775571, 24758439, 24468604, 24726577, 24357817, 24566062, 24753727, 24678267, 24348704, 24904654, 24519882, 24682362, 24536790, 24414446, 24633812, 24464258, 24653034, 24598483, 24299459, 24762257, 24530669, 24747041, 24541976, 24484025, 24516505, 24645537, 24528841, 25054612, 24815119, 24

In [7]:
#Import Library
from mpi4py import MPI
import numpy as np

#Initialize MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

#Initialize Matrices
A = None
B = None
N = int(1e2)
C = np.zeros((N,N))


if rank != 0:
    B = np.empty((N,N))
else:
    A = np.random.rand(N,N)
    B = np.random.rand(N,N)

#Receiving variable
Arecv = np.empty((int(N/size),N))
start = MPI.Wtime()

#Separate data to all workers
comm.Scatter(A,Arecv,root=0)

#Send a copy of the vector to all workers
comm.Bcast(B,root=0)

#Gather the result from all workers
comm.Gather(np.matmul(Arecv,B),C,root=0)

end = MPI.Wtime()
print("Time taken by worker:",rank,"is: ",end-start)

Time taken by worker: 0 is:  0.01595124699997541
