## Parte I

#### Responda las siguientes preguntas en 200 palabras o menos (2 puntos cada una)


1.	¿Qué es un proceso embarrassingly parallel y uno inherentemente serial? Dé un ejemplo de cada uno (distintos a los vistos en clase)

El proceso de convertir un programa serial o algoritmo en un programa paralelo es denominado paralelilazación, y los programas que pueden ser paralelizado por una simple división del trabajo entre los procesos son conocidos como “embarrassingly parallel”. Un ejemplo de ello es la renderización de infografías, utilizada en la animación por computadora, para la cual cada fotograma o píxel se puede renderizar de forma independiente (renderizado paralelo ). Sin embargo, no es posible que todos los procesos o algoritmos sean paralelizables, a estos se les denomina procesos inherentemente serial, por ejemplo: el muestreo de Gibbs (algoritmo de la cadena de Markov Monte Carlo - MCMC), usado para obtener una secuencia de observaciones que se aproximan a partir de una distribución de probabilidad multivariante, cuando el muestreo directo es difícil y usado comúnmente en la inferencia bayesiana. Este algoritmo es secuencial por naturaleza, es decir, se muestrea una variable aleatoria condicionada a todas las demás variables aleatorias. Si bien, este problema se puede absolver realizando un muestreo por bloques, es ineficiente pues implica un cálculo mucho más extenso

2.	¿Qué cuellos de botella puede enfrentar al paralelizar un proceso? Relaciónelo con la ley de Amdahl/Gustafson 

Uno de los principales cuellos de botella de un proceso de paralelización, es que podemos encontrar procesos que solo pueden ser paralelizables parcialmente, es decir que una parte del proceso sea “inherentemente serial”, lo cual impide que la paralelización sea completa. En este sentido, la ley de Amdahl señala que a menos que se paralelice prácticamente todo el programa en serie, el aumento de la velocidad posiblemente sea muy limitado, incluso si el número de núcleos es elevado. Entonces, esta velocidad no podrá ser mayor a 1/r, incluso si r es bastante pequeño, no se podrá obtener una velocidad superior a r. Sin embargo, este problema no debe preocuparnos demasiado, pues la ley de Amdahl no toma en cuenta el tamaño de problema, el cual al ser más grande puede llegar a reducir la fracción “inherentemente serial” del programa, esta ley es conocida como la ley de Gustafson.

3.	¿En qué se diferencia un CPU de un GPU? Dé un ejemplo de un proceso que convendría paralelizar en cada uno de estos tipos de unidad de procesamiento. 

La principal diferencia se encuentra en la cantidad de núcleos que poseen, tanto un CPU como un GPU, los cuales condicionan las tareas que pueden o no cumplir. El CPU tiene unos cuantos núcleos optimizados para el procesamiento en serie secuencial, convirtiéndolo en un potente motor de ejecución, que centra su reducido número de núcleos en realizar tareas individuales rápidamente. Por su parte, un GPU cuenta con una arquitectura en paralelo más grande que consiste de miles de núcleos más pequeños y eficaces, y que se diseñaron para resolver varias tareas al mismo tiempo. Ejemplo GPU: Una red de aprendizaje profundo puede tener millones de parámetros, y por ello los GPU son usados con bastante frecuencia, varias GPU conectadas a múltiples nodos de procesamiento distribuido. 

4.	Piense en una tarea serial que le han encargado paralelizar. Describa el diseño de la implementación en paralelo de dicha tarea siguiente el método de Foster y los cuatro elementos que lo componen.

Se viene desarrollando un proyecto de lenguaje natural que extrae y procesa los tweets de los principales representantes de la lucha contra el cambio climático. Para lo cual se propone una solución paralela basada en MPI: Se (I) **particionará** en cuatro tareas: (1) extracción de los tweets, (2) limpieza de la información extraída y (3) generación de estadísticas locales y (4) geración de estadísticas globales. Habrá (II) **comunicación** para cada una de las etapas y hay que ser particularmente cuidadosos en la comunicación entre la tercera y cuarta tarea. En cuanto a la (III) **aglomeración**, cada uno de los procesadores, ejecuta las operaciones seriales que se tienen que hacer sobre los tweets correspondientes a las tres primeras tareas. La última tarea se realizará en otro procesador. Respecto al (IV) **mapeo**, cada procesador extraerá un número n de tweets (tarea 1) y realizará las tareas 2 y 3 sobre los tweets que extrajo. Las estadísticas locales de la tarea 3 serán aglomeradas en un solo procesador que realizará la tarea 4. 

## Parte II

### Pregunta 1

#### Escribir un código (“parte2_1.py”) que realice lo siguiente:

In [1]:
#Establecer directorio
import os
os.chdir('r../../../Big_Data_Analytics_Tarea1')

#### a.	Que un procesador genere un diccionario y lo envíe a otros tres procesadores.

In [2]:
%%writefile parte2_1.py 
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    diccionario1 = {'alpha': 3, 'beta': 1.5}
    comm.send(diccionario1, dest=1 , tag=1)
    diccionario2 = {'Meses': 12, 'Días': 30}
    comm.send(diccionario2, dest=2 , tag=2)
    diccionario3 = {'Edad': 23, 'Grupo':5 }
    comm.send(diccionario3, dest=3 , tag=3)
elif rank == 1:
    diccionario1 = comm.recv(source=0 , tag=1)
    print("I am processor", rank)
    print(diccionario1)   
elif rank == 2:
    diccionario2 = comm.recv(source=0 , tag=2)
    print("I am processor", rank)
    print(diccionario2)   
elif rank == 3:
    diccionario3 = comm.recv(source=0 , tag=3)
    print("I am processor", rank)
    print(diccionario3)

Overwriting parte2_1.py


#### b.	Que cada uno de los tres procesadores reciba el diccionario enviado, imprima su número de procesador y el diccionario. 

In [3]:
import time
start = time.time()
! mpiexec -n 4 python parte2_1.py
end = time.time()
print(end - start)

I am processor 3
{'Edad': 23, 'Grupo': 5}
I am processor 1
{'alpha': 3, 'beta': 1.5}
I am processor 2
{'Meses': 12, 'Días': 30}
0.5255606174468994


#### c.	En otro chunk responda: Si usted ejecutara el código 10 veces, ¿el orden de los resultados sería siempre igual? ¿Por qué?

No, necesariamente. Esto se debe a que las acciones se están desarrollando de forma paralela y uno de los procesadores pudo ser más rápido que el otro, independientemente de en que orden fueron descritos en el código y esto se debe a que en realidad los 3 procesadores se están ejecutando de forma paralela, lo cual da la posibilidad que uno de los tres ejecute más rápido que los otros dos. Es decir, pede suceder que el procesador 3 opere más rápido que el procesador 1 y 2 y en una siguiente ejecución el procesador 2 opera más rápido que el 1 y 3, y también puede darse que en un momento el procesador 1 opere más rápido que el 2 y 3. Ello deterinará el orden del resultado tanto de como aparecen los procedores y el tiempo que demora.

### Pregunta 2

#### a.Que un procesador genere dos numpy array diferentes, cada uno de 1,000,000 observaciones. Llamar “num1” y “num2” a estos numpy.

Con nuestro procesador de rango 0 generamos dos arrays de números aleatorios con 1 000 000 de observaciones cada uno que llamaremos "num1" y "num2".

In [4]:
%%writefile numpys.py 
from mpi4py import MPI

import numpy as np
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

np.random.seed(1102)

if rank == 0:
    num1 = np.random.random(1000000)
    num2 = np.random.random(1000000)
    print ("El array num1 generado por el procesador",rank,"es",num1)
    print ("El array num2 generado por el procesador",rank,"es",num2)


Overwriting numpys.py


In [5]:
! mpiexec -n 1 python numpys.py

El array num1 generado por el procesador 0 es [0.3051895  0.8090249  0.70315185 ... 0.99735614 0.81987858 0.19472251]
El array num2 generado por el procesador 0 es [0.69826857 0.68300644 0.62900289 ... 0.54458017 0.73143874 0.12210111]


#### b.Enviar cada numpy a un procesador diferente. Que cada uno de los otros procesadores reciba su numpy

Procedemos a enviar "num1" al procesador de rango 1 y "num2" al procesador de rango 2.

In [6]:
%%writefile numpys2.py 
from mpi4py import MPI

import numpy as np
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

np.random.seed(1102)

if rank == 0:
    num1 = np.random.random(1000000)
    num2 = np.random.random(1000000)
    comm.Send(num1, dest=1)
    comm.Send(num2, dest=2)
elif rank == 1:
    num1 = np.empty(1000000)
    comm.Recv(num1, source=0)
    print ("El procesador",rank,"recibio",num1)
elif rank == 2:
    num2 = np.empty(1000000)
    comm.Recv(num2, source=0)
    print ("El procesador",rank,"recibio",num2)

Overwriting numpys2.py


In [7]:
! mpiexec -n 3 python numpys2.py

El procesador 1 recibio [0.3051895  0.8090249  0.70315185 ... 0.99735614 0.81987858 0.19472251]
El procesador 2 recibio [0.69826857 0.68300644 0.62900289 ... 0.54458017 0.73143874 0.12210111]


#### c. Que otro procesador (que no haya recibido nada) reciba “num1” y “num2” y los imprima.

Procedemos a enviar "num1" desde nuestro procesador de rango 1 y "num2" desde nuestro procesador de rango 2 a nuestro procesador de rango 3. 

In [8]:
%%writefile parte2_2.py 
from mpi4py import MPI

import numpy as np
import random

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

np.random.seed(1102)

if rank == 0:
    num1 = np.random.random(1000000)
    num2 = np.random.random(1000000)
    comm.Send(num1, dest=1)
    comm.Send(num2, dest=2)
elif rank == 1:
    num1 = np.empty(1000000)
    comm.Recv(num1, source=0)
    comm.Send(num1, dest=3)
elif rank == 2:
    num2 = np.empty(1000000)
    comm.Recv(num2, source=0)
    comm.Send(num2, dest=3)
elif rank == 3:
    num1 = np.empty(1000000)
    comm.Recv(num1, source=1)
    num2 = np.empty(1000000)
    comm.Recv(num2, source=2)
    print ("El procesador", rank ,"recibio",num1)
    print ("El procesador", rank, "recibio",num2)

Overwriting parte2_2.py


#### d.	Ejecute el código creado y registre el tiempo que toma realizar este ejercicio.

Procedemos a ejecutar nuestro último código. Registramos el tiempo de ejecución en nuestro código.

In [9]:
import time
start = time.time()
! mpiexec -n 4 python parte2_2.py
end = time.time()
print("El proceso tomo",end - start,"segundos")

El procesador 3 recibio [0.3051895  0.8090249  0.70315185 ... 0.99735614 0.81987858 0.19472251]
El procesador 3 recibio [0.69826857 0.68300644 0.62900289 ... 0.54458017 0.73143874 0.12210111]
El proceso tomo 0.8293895721435547 segundos


e.	En otro chunk responda: ¿existe una manera de agilizar este proceso con las herramientas de MPI? Sea detallado en su respuesta y argumentos.

Una manera de agilizar el proceso es con los comandos scatter, gather y reduce. Si la base de datos está dividida en arrays (o chunks en general) el comando comm.scatter permite enviar estos arrays a otros procesadores. Por ejemplo, los arrays creados por el procesador 0 pueden ser transmitidos a los procesadores 1 y 2 de esta manera. 

De la misma manera, el comando comm.gather permite reunir los arrays u otros objetos tratados en otros porcesadores. Por ejemplo el procesador 3 puede de esta forma recuperar los máximos encontrados por los procesadores 1 y 2. 

Finalmente, con el comando comm.reduce y el argumento MPI_MAX permite realizar operaciones sobre objetos en otros procesadores. Por ejemplo, con este comando el procesador 3 puede hallar el máximo global a partir de los máximos guardados como objetos en los procesadores 1 y 2.  

### Pregunta 3

Generamos un numpy array que almacene el archivo: “tarea2.csv”, vemos cuantos elementos contiene y calculamos el máximo para tenerlo como referencia para las siguientes preguntas.

In [10]:
import numpy as np
from numpy import genfromtxt
tarea2 = genfromtxt('tarea2.csv', delimiter=',')
print("Vemos que la base de datos tiene",tarea2.shape,"elementos.")

Vemos que la base de datos tiene (1048575,) elementos.


In [11]:
print("El máximo que deberíamos encontrar en cada proceso es",tarea2.max())

El máximo que deberíamos encontrar en cada proceso es 0.99999982


#### a.	Escribir un código que halle el valor máximo de “tarea2” usando un procesador. Imprimir el valor máximo. Registrar el tiempo de demora.

Escribimos nuestro código de manera que el procesador 0 almacene la base de datos "tarea2" en un numpy array y calcule su máximo. Hemos visto que asignar la creación del array a un procesador es más eficiente que no asignarlo.

In [12]:
%%writefile p3a.py 
from mpi4py import MPI

import numpy as np
import random
from numpy import genfromtxt

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    tarea2 = genfromtxt('tarea2.csv', delimiter=',')
    maximo = tarea2.max()
    print("Nuestro máximo es efectivamente",maximo)

Overwriting p3a.py


In [13]:
import time
start = time.time()
! mpiexec -n 1 python p3a.py
end = time.time()
print("El proceso tomó",end - start,"segundos")

Nuestro máximo es efectivamente 0.99999982
El proceso tomó 6.261150121688843 segundos


Nuestro código encontró de manera exitosa el máximo global (0.9999982). En la ejecución hemos incluido el tiempo que tomó.

#### b.	Escribir un código que realice las siguientes indicaciones. Dividir el numpy en dos partes iguales. Que dos procesadores distintos encuentren el máximo de cada parte. Que otro procesador junte los máximos hallados y encuentre el máximo global. Este resultado debe ser igual al de 3a. Registrar el tiempo de demora.

Primero dividimos el numpy en 2 chunks fuera del código a manera de test y también para averiguar el tamaño de cada chunk pues necesitaremos esa información en nuestro código MPI.

In [14]:
tarea2_chunk = np.array_split(tarea2, 2)
tarea2_chunk

[array([0.34887171, 0.2668857 , 0.1366463 , ..., 0.57128704, 0.81662822,
        0.24393123]),
 array([0.71707726, 0.31591403, 0.57266182, ..., 0.48673952, 0.09359856,
        0.93271565])]

In [15]:
print("El tamaño de nuestro primer chunk es",tarea2_chunk[0].size)

El tamaño de nuestro primer chunk es 524288


In [16]:
print("El tamaño de nuestro segundo chunk es",tarea2_chunk[1].size)

El tamaño de nuestro segundo chunk es 524287


Escribimos un código donde el procesador 0 divide el MPI y envía los 2 chunks a los procesadores 1 y 2. 
Los procesadores 1 y 2 calculan el máximo del chunk que recibieron.
El procesador 3 recibe los máximos de los procesadores 1 y 2 calcula el máximo global a partir de ahí.

In [17]:
%%writefile p3b.py 
from mpi4py import MPI

import numpy as np
import pandas as pd
import random
from numpy import genfromtxt


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    tarea2 = genfromtxt('tarea2.csv', delimiter=',')
    tarea2_chunk = np.array_split(tarea2, 2)
    chunk1=tarea2_chunk[0]
    chunk2=tarea2_chunk[1]
    comm.Send(chunk1, dest=1)
    comm.Send(chunk2, dest=2)
elif rank == 1:
    chunk1 = np.empty(524288)
    comm.Recv(chunk1, source=0)
    max1=chunk1.max()
    comm.Send(max1, dest=3)
    print("El procesador", rank,"encontró",max1)
elif rank == 2:
    chunk2 = np.empty(524287)
    comm.Recv(chunk2, source=0)
    max2=chunk2.max()
    comm.Send(max2, dest=3)
    print("El procesador", rank,"encontró",max2)
elif rank == 3:
    max1 = np.empty(1)
    max2 = np.empty(1)
    comm.Recv(max1, source=1)
    comm.Recv(max2, source=2)
    max3=max(max1,max2)
    print("El procesador",rank,"encontró",max3)

Overwriting p3b.py


In [18]:
import time
start = time.time()
! mpiexec -n 4 python p3b.py
end = time.time()
print("El proceso tomo",end - start,"segundos")

El procesador 1 encontró 0.99999791
El procesador 2 encontró 0.99999982
El procesador 3 encontró [0.99999982]
El proceso tomo 7.583211898803711 segundos


Vemos que encontramos el mismo máximo que en 3a, es decir 0.99999982. Sin embargo encontramos que toma más tiempo que con el código de 3a.

Probamos una alternativa donde la generación de los chunks no se asigna a ningún procesador en particular. Aquí los procesadores 0 y 1 toman cada uno de los 2 chunks y hallan los máximos locales mientras que el procesador 2 recibe estos máximos y halla el máximo global. 

In [19]:
%%writefile p3b2.py 
from mpi4py import MPI

import numpy as np
import pandas as pd
import random
from numpy import genfromtxt

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

tarea2 = genfromtxt('tarea2.csv', delimiter=',')
tarea2_chunk = np.array_split(tarea2, 2)
chunk1=tarea2_chunk[0]
chunk2=tarea2_chunk[1]

if rank == 0:    
    max1=chunk1.max()
    comm.Send(max1, dest=2)
    print("El procesador", rank, "encontró",max1)
elif rank == 1:
    max2=chunk2.max()
    comm.Send(max2, dest=2)
    print("El procesador",rank,"encontró",max2)
elif rank == 2:
    max1 = np.empty(1)
    max2 = np.empty(1)
    comm.Recv(max1, source=0)
    comm.Recv(max2, source=1)
    maxtot=max(max1,max2)
    print("El procesador",rank,"encontró",maxtot)

Overwriting p3b2.py


In [20]:
import time
start = time.time()
! mpiexec -n 3 python p3b2.py
end = time.time()
print("El proceso tomo",end - start,"segundos")

El procesador 1 encontró 0.99999982
El procesador 0 encontró 0.99999791
El procesador 2 encontró [0.99999982]
El proceso tomo 8.206487655639648 segundos


Este método parece ser menos eficiente que si asignamos la creación del array a un procesador. Por ende lo descartamos.

#### c.	Repetir 3b dividiendo el numpy original en tres partes. Registrar el tiempo de demora.

De nuevo vemos cual es el tamaño de los arrays.

In [21]:
tarea2_chunk2 = np.array_split(tarea2, 3)
tarea2_chunk2

[array([0.34887171, 0.2668857 , 0.1366463 , ..., 0.05235301, 0.60290039,
        0.51786804]),
 array([0.94508648, 0.17400229, 0.74788725, ..., 0.19144469, 0.04561006,
        0.76499122]),
 array([0.90580344, 0.27597395, 0.377038  , ..., 0.48673952, 0.09359856,
        0.93271565])]

In [22]:
print("El tamaño de nuestro primer chunk es",tarea2_chunk2[0].size)

El tamaño de nuestro primer chunk es 349525


In [23]:
print("El tamaño de nuestro segundo chunk es",tarea2_chunk2[1].size)

El tamaño de nuestro segundo chunk es 349525


In [24]:
print("El tamaño de nuestro tercer chunk es",tarea2_chunk2[2].size)

El tamaño de nuestro tercer chunk es 349525


Escribimos un código donde el procesador 0 divide el MPI y envía los 3 chunks a los procesadores 1, 2 y 3. 
Los procesadores 1, 2 y 3 calculan el máximo del chunk que recibieron.
El procesador 0 recibe los máximos de los procesadores 1, 2 y 3 y calcula el máximo global a partir de ahí.

In [25]:
%%writefile p3c.py 
from mpi4py import MPI

import numpy as np
import pandas as pd
import random
from numpy import genfromtxt


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    tarea2 = genfromtxt('tarea2.csv', delimiter=',')
    tarea2_chunk = np.array_split(tarea2, 3)
    chunk1=tarea2_chunk[0]
    chunk2=tarea2_chunk[1]
    chunk3=tarea2_chunk[2]
    comm.Send(chunk1, dest=1)
    comm.Send(chunk2, dest=2)
    comm.Send(chunk3, dest=3)
    max1 = np.empty(1)
    max2 = np.empty(1)
    max3 = np.empty(1)
    comm.Recv(max1, source=1)
    comm.Recv(max2, source=2)
    comm.Recv(max3, source=3)                    
    maxtot=max(max1,max2,max3)
    print("El procesador",rank,"encontró",maxtot)    
elif rank == 1:
    chunk1 = np.empty(349525)
    comm.Recv(chunk1, source=0)
    max1=chunk1.max()
    comm.Send(max1, dest=0)
    print("El procesador", rank,"encontró",max1)
elif rank == 2:
    chunk2 = np.empty(349525)
    comm.Recv(chunk2, source=0)
    max2=chunk2.max()
    comm.Send(max2, dest=0)
    print("El procesador", rank,"encontró",max2)
elif rank == 3:
    chunk3 = np.empty(349525)
    comm.Recv(chunk3, source=0)
    max3=chunk3.max()
    comm.Send(max3, dest=0)
    print("El procesador", rank,"encontró",max3)

Overwriting p3c.py


In [26]:
import time
start = time.time()
! mpiexec -n 4 python p3c.py
end = time.time()
print("El proceso tomo",end - start,"segundos")

El procesador 1 encontró 0.99999708
El procesador 2 encontró 0.99999791
El procesador 3 encontró 0.99999982
El procesador 0 encontró [0.99999982]
El proceso tomo 7.663603067398071 segundos


Vemos que encontramos el mismo máximo que en 3a, es decir 0.99999982. Vemos que toma más tiempo que con el código en 3a pero menos que con el código en 3b.

#### d.	Comparar los tiempos registrados en 3a, 3b y 3c. ¿Hay una reducción del tiempo? ¿La reducción del tiempo es lineal? ¿Por qué?

El proceso que duró menos tiempo fue el 3a, seguido del 3c y luego el 3b. 

Curiosamente, el proceso no paralelizado (3a) fue el más rápido, por lo que al paralelizar hubo un aumento del tiempo. Esto puede deberse a que los procesos seriales de python están optimizados de tal formal que al paralelizar no hay una ganancia para un proceso relativamente simple.

Sin embargo vemos que en 3c obtenemos un menor tiempo que en 3b. Esto se puede deber a que, una vez paralelizado, es más eficiente si aprovechamos y usamos una mayor cantidad de unidades de procesamiento.

La reducción del tiempo entre 3c y 3b no es lineal pues hay procesos que no son paralelizables (por ejemplo la división de la base de datos) por lo que no se va a poder aprovechar los nuevos procesadores para dividir absolutamente todos los procesos entre ellos. 