# **Lab Distributed Data Analytics**

## Tutorial 2

In [None]:
#Displaying information about the CPU architecture
!lscpu

Architecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   46 bits physical, 48 bits virtual
CPU(s):                          2
On-line CPU(s) list:             0,1
Thread(s) per core:              2
Core(s) per socket:              1
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           79
Model name:                      Intel(R) Xeon(R) CPU @ 2.20GHz
Stepping:                        0
CPU MHz:                         2199.998
BogoMIPS:                        4399.99
Hypervisor vendor:               KVM
Virtualization type:             full
L1d cache:                       32 KiB
L1i cache:                       32 KiB
L2 cache:                        256 KiB
L3 cache:                        55 MiB
NUMA node0 CPU(s):               0,1
Vulnerability 

In [None]:
import os
os.cpu_count() #number of logical CPU cores

2

In [None]:
#Verifying open mpi installation
!ompi_info --version

Open MPI v4.0.3

http://www.open-mpi.org/community/help/


In [None]:
#Installing python library mpi4py
!pip install --quiet mpi4py

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/2.5 MB[0m [31m7.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m1.9/2.5 MB[0m [31m28.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.5/2.5 MB[0m [31m32.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m22.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone


### 1 Vector Multiplication

In [None]:
%%writefile ex1.py
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD                   #communicator
rank = comm.Get_rank()                  #number of the process running the code
size = comm.Get_size()                  #total number of processes running
myHostName = MPI.Get_processor_name()   #machine name running the code

t_0 = time.time() #start time

def dot_product(N):

  if size == 1:

    a = np.arange(N)
    b = np.arange(N, N*2)

    #Element-wise product
    sum = 0
    for i in range(N):
      sum += a[i] * b[i]

    t_1 = time.time()
    print(f'Time: {t_1-t_0:.5f}') #timing
    return sum

  else:
    if rank == 0: #master/coordinator node
      a = np.arange(N)
      b = np.arange(N, N*2)
      list1 = np.array_split(a, size, axis=0)
      list2 = np.array_split(b, size, axis=0)

      #Send method for buffer-like objects like numpy array
      for i in range(1,size):
        comm.send(list1[i], dest=i)
        comm.send(list2[i], dest=i)

      a_i = list1[0]
      b_i = list2[0]

      sum = 0
      for i in range(len(a_i)):
        sum += a_i[i] * b_i[i]

      for i in range(1,size):
          sum += comm.recv(source=i)

      t_1 = time.time()
      print(f'Time: {t_1-t_0:.5f}') #timing

      return sum

    else: #worker node
      #Receive method for buffer-like objects like numpy array
      a_i = comm.recv(source=0)
      b_i = comm.recv(source=0)
      sum = 0
      for i in range(len(a_i)):
        sum += a_i[i] * b_i[i]
      comm.send(sum, dest=0)

if rank == 0:
  print(f'Cores: {size}')

Ns = [6,12,24]
for N in Ns:
  sum = dot_product(N)
  if rank == 0:
    print(f'N={N}, Sum={sum}')

MPI.Finalize()

Overwriting ex1.py


In [None]:
! mpirun --allow-run-as-root --use-hwthread-cpus --oversubscribe -n 1 python ex1.py
! mpirun --allow-run-as-root --use-hwthread-cpus --oversubscribe -n 2 python ex1.py

Cores: 1
Time: 0.00010
N=6, Sum=145
Time: 0.00015
N=12, Sum=1298
Time: 0.00021
N=24, Sum=10948
Time: 0.00028
N=100, Sum=823350
Cores: 2
Time: 0.00511
N=6, Sum=145
Time: 0.00537
N=12, Sum=1298
Time: 0.01009
N=24, Sum=10948
Time: 0.01038
N=100, Sum=823350


### 2 Collective Communication

In [None]:
%%writefile ex2.py
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD                   #communicator
rank = comm.Get_rank()                  #number of the process running the code
size = comm.Get_size()                  #total number of processes running
myHostName = MPI.Get_processor_name()   #machine name running the code

t_0 = time.time() #start time

def dot_product(N):

  if size == 1:
    a = np.arange(N)
    b = np.arange(N, N*2)
    #Element-wise product
    sum = 0
    for i in range(N):
      sum += a[i] * b[i]

    t_1 = time.time()
    print(f'Time: {t_1-t_0:.5f}') #timing
    return sum

  else:
    list1 = None
    list2 = None

    if rank == 0: #master node
      a = np.arange(N)
      b = np.arange(N, N*2)

      list1 = np.array_split(a, size, axis=0)
      list2 = np.array_split(b, size, axis=0)

    #Distributing the data with collective communication

    a_i = comm.scatter(list1, root=0)
    b_i = comm.scatter(list2, root=0)

    sum = 0
    for i in range(len(a_i)):
      sum += a_i[i] * b_i[i]

    sum_p = comm.gather(sum, root=0)

    if rank == 0: #master node
      sum = 0
      for s in sum_p:
          sum += s

      t_1 = time.time()
      print(f'Time: {t_1-t_0:.5f}') #timing

      return sum

if rank == 0:
  print(f'Cores: {size}')

Ns = [6,12,24]
for N in Ns:
  sum = dot_product(N)
  if rank == 0:
    print(f'N={N}, Sum={sum}')

MPI.Finalize()

Writing ex2.py


In [None]:
! mpirun --allow-run-as-root --use-hwthread-cpus --oversubscribe -n 1 python ex2.py
! mpirun --allow-run-as-root --use-hwthread-cpus --oversubscribe -n 2 python ex2.py

Cores: 1
Time: 0.00014
N=6, Sum=145
Time: 0.00023
N=12, Sum=1298
Time: 0.00032
N=24, Sum=10948
Cores: 2
Time: 0.06977
N=6, Sum=145
Time: 0.07019
N=12, Sum=1298
Time: 0.07052
N=24, Sum=10948


### 3 Distributed Sorting

In [None]:
%%writefile ex3.py
from mpi4py import MPI
import numpy as np
import time
import random

comm = MPI.COMM_WORLD                   #communicator
rank = comm.Get_rank()                  #number of the process running the code
size = comm.Get_size()                  #total number of processes running
myHostName = MPI.Get_processor_name()   #machine name running the code

t_0 = time.time() #start time

def sorting(N):

  if size == 1:
    my_list = list(range(N))
    random.shuffle(my_list)

    my_list.sort()

    t_1 = time.time()
    print(f'Time: {t_1-t_0:.5f}') #timing

    return my_list

  else:

    sub_lists = None

    if rank == 0: #master node
      my_list = list(range(N))
      random.shuffle(my_list)
      sub_lists = np.array_split(my_list, size, axis=0)

    #Distributing the data with collective communication

    sub_list = comm.scatter(sub_lists, root=0)

    sub_list.sort()

    sub_lists = comm.gather(sub_list, root=0)

    if rank == 0: #master node
      my_list = list()
      while len(my_list) < N:
        n = N+1
        j = 0
        for i,sl in enumerate(sub_lists):
          if len(sl) > 0:
            if sl[0] < n:
              n = sl[0]
              j = i
        sub_lists[j] = sub_lists[j][1:]
        my_list.append(n)

      t_1 = time.time()
      print(f'Time: {t_1-t_0:.5f}') #timing

      return my_list

if rank == 0:
  print(f'Cores: {size}')

Ns = [10000,100000]
for N in Ns:
  my_list = sorting(N)
  if rank == 0:
    print(f'N={N}, Sorted list: {my_list[:5]}')

Writing ex3.py


In [None]:
! mpirun --allow-run-as-root --use-hwthread-cpus -n 1 python ex3.py
! mpirun --allow-run-as-root --use-hwthread-cpus -n 2 python ex3.py

Cores: 1
Time: 0.00519
N=10000, Sorted list: [0, 1, 2, 3, 4]
Time: 0.06436
N=100000, Sorted list: [0, 1, 2, 3, 4]
Cores: 2
Time: 0.03435
N=10000, Sorted list: [0, 1, 2, 3, 4]
Time: 0.29120
N=100000, Sorted list: [0, 1, 2, 3, 4]


Conclusion: None of the exercises showed reductions in time when parallelizing the executions. The reason could be the communication time is longer than the code execution per se.