## Paralelizando nuestro Código

### Determinar la cantidad de procesadores de cómputo en un sistema

Podemos encontrar la cantidad de procesadores en un sistema para maximizar nuestros recursos y rendimiento. Para esta demostración, es más sencillo contar solo todos los núcleos físicos y no los núcleos "lógicos" que resultan de las capacidades de subprocesos múltiples simultáneos (SMT) de la CPU.

In [None]:
import psutil

# logical =  False (no contamos los núcleos lógicos de SMT)
num_procs = psutil.cpu_count(logical=False)

print(num_procs)

### Multiprocessing: `Process` class

La clase Process es un método para generar procesos explícitamente y, a menudo, se denominan funciones predefinidas.

### Example: Using `multiprocess.Process` to Parallelize our Program

In [16]:
import multiprocessing as mp
import random
import string

random.seed(123)

n = 16  # numero de iteraciones

# Definimos un output queue
# Queue permite la comunicación entre procesos utilizando datos Pickle-able (métodos .put () y .get ())
# Nuestros procesos solo necesitan comunicarse cuando agregan su salida
output = mp.Queue()


# definir una función de ejemplo: ingresaremos la cola para que se recopile nuestra salida
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
        string.ascii_lowercase
        + string.ascii_uppercase
        + string.digits)
                       for i in range(length))
    output.put(rand_str)


# Configurar una lista de procesos que queremos ejecutar
# mp.Process se llama para generar un proceso
# target = la función que nos gustaría ejecutar
# args = tupla de argumentos aceptados por la función
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(n)]

# Cada proceso necesita ser inicializado
for p in processes:
    p.start()

# Esperamos hasta que cada proceso haya terminado antes de continuar
for p in processes:
    p.join()

# Obtener los resultados del proceso de la cola de salida
results = [output.get() for p in processes]

print(results)

['ERH5c', 'RsocO', 'EfvNZ', '0JvQN', 'mWg2z', 'stOJZ', 'mtHmI', 'IGgYf', '6w1w3', 'ma9RU', '92jr3', 'wJn7v', 'vuGnm', 'iZsel', 'Z178J', 'WEzmZ']


#### Orden de los resultados

La cola de salida recibe datos de forma desordenada, solo sigue el orden en el que los procesos colocan datos en la cola. Esto puede ser problemático si importa el orden de los elementos en el resultado.

Afortunadamente, se puede realizar un seguimiento del rango de cada proceso para facilitar la clasificación del orden de los resultados.

In [17]:
import multiprocessing as mp
import random
import string

random.seed(123)

output = mp.Queue()


# Creemos un parámetro de rango para que podamos asignar un rango a cada resultado.
def rand_string(length, rank, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
        string.ascii_lowercase
        + string.ascii_uppercase
        + string.digits)
                       for i in range(length))
    output.put((rank, rand_str))


processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(n)]

for p in processes:
    p.start()

for p in processes:
    p.join()

results = [output.get() for p in processes]

# ordenar los resultados por el rango de cada proceso
results.sort()
# results = [r[1] for r in results]

print(results)

[(0, 'CTdAx'), (1, 'LEgE2'), (2, '7rxK2'), (3, 'ZobUz'), (4, 'TyZli'), (5, 'YqiCS'), (6, 'KyiWL'), (7, 'fwK3E'), (8, 'HarPS'), (9, 'tMnQD'), (10, 'LJTZJ'), (11, 'qHyvy'), (12, 'BZXxE'), (13, 'KtZQj'), (14, 'MJXbD'), (15, 'Tjz36')]


## `Pool` class: Asignar datos a un worker processes

La clase Pool aplica o asigna funciones a datos preexistentes que pueden dividirse en fragmentos.

 - Agregar más procesos de trabajo nos permite procesar múltiples fragmentos en paralelo.
 - Cuantos más trabajadores, más pequeño se puede hacer cada trozo.
 - Cuanto más pequeño sea el trozo, más rápido se puede completar nuestro trabajo.

Definamos una función muy simple que devuelve un número elevado a la potencia de 3.

In [18]:
def cube(x):
    return x ** 3

A continuación, ¡reuniremos nuestro grupo de trabajadores! Debe haber un trabajador por núcleo de procesador.

In [19]:
# ¡Definamos nuestro grupo de trabajadores!

pool = mp.Pool(processes=num_procs)

Usaremos el método `pool.apply ()` para aplicar una función en bucle a través de un rango de entradas.

- el método `apply ()` requiere una función y una tupla como argumento
- la función es `cube`
- la función se llama repetidamente para cada entrada
- el argumento tupla es `(x,)`

In [20]:
# Genera los cubos de 0-255

results = [pool.apply(cube, args=(x,)) for x in range(0, 255)]
print(results)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

Ahora apliquemos el método `pool.map ()` para aplicar una función sobre un rango de datos. Esto puede sonar muy similar a `pool.apply ()` pero el orden de las operaciones es ligeramente diferente.

- el `map()` requiere una función y datos para aplicar la función a
- la función es `cube`
- los datos son la tupla `range(0,256)`
- la función se aplica a cada elemento de la tupla

In [22]:
# Genera los cubos de 0-255

# Observe cómo este código no requiere comprensión de la lista y es más limpio

x = range(0, 255)
#print(x)

results = pool.map(cube, x)
print(results)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

## Synchronous and Asynchronous Computing

Los métodos `.apply()` y `.map()` son ejemplos de computación síncrona.

Hay variantes asincrónicas de estos métodos que no esperan y devuelven resultados en el orden en que terminaron los subprocesos.

In [24]:
# Método asincrónico

results = [pool.apply_async(cube, args=(x,)) for x in range(0, 255)]
print(results)

[<multiprocessing.pool.ApplyResult object at 0x7fc89fa8e950>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8ea50>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8eb50>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8ec10>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8ecd0>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8ee10>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8eed0>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8ef90>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90090>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa8edd0>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90190>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90250>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90310>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa903d0>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90490>, <multiprocessing.pool.ApplyResult object at 0x7fc89fa90550>, <multiprocessing.pool.A

No hay necesidad de preocuparse. El resultado de este método asincrónico es una lista de objetos en sus direcciones de memoria. Esto ocurre porque el método `apply_async` devuelve un objeto` AsyncResult`. Los objetos en realidad están 'encurtidos' y deben extraerse con el método de encurtido `.get()`.

¡También hay algunos métodos interesantes asociados con `AsyncResult`!

In [25]:
# Podemos comprobar si estos procesos asincrónicos han finalizado

ready = [p.ready() for p in results]
print(ready)

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru

In [26]:
# También podemos ver si cada proceso fue exitoso, es decir, ¡sin errores!

success = [p.successful() for p in results]
print(success)

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru

In [11]:
# ¡Los métodos .get() devuelven los resultados!
# Solo tenga en cuenta que es posible que su resultado asincrónico aún no esté terminado

output = [p.get() for p in results]
print(output)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

Puede notar que el orden parece correcto. Esto se debe a que cada proceso no se genera al mismo tiempo; hay gastos generales involucrados, y para trabajadores idénticos que toman el mismo tiempo para completar sus tareas, el orden en el que generan a menudo determina el orden en que terminan. Solo recuerda **el pedido no está garantizado**.

In [27]:
# asegúrese de que su grupo de trabajadores esté cerrado una vez que haya terminado

pool.close()

Uno puede evitar usar .close () en un objeto `Pool` usando:

```
with mp.Pool(processes = num_procs) as pool:
```

¡Probemos eso!

In [28]:
with mp.Pool(processes=num_procs) as pool:
    results = pool.map_async(cube, range(0, 255))  # usamos asynchronous map

    output = results.get()  # obtenemos nuestros resultados
    print(output)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

## Project Gutenberg Reader

Let's use everything we've learned to tackle one last problem.

I have consolidated 18,792 books from [Project Gutenberg](https://www.gutenberg.org/) into a large 10GB binary file.

Using Python, let's count how many times some very important strings appear in all of these books:
 - `dog`
 - `cat`
 - `girl`
 - `boy`
 

## Strategy #1 Serialized Code

<img src = 'img/single_process_reader.png' >

In [29]:
# ubicación de este archivo
f = "your_path_to/large_text.txt"


# Crea un 'generador' de Python
# Lee en un archivo binario grande en trozos de 256k
# Procesa cada fragmento y cuenta el recuento de palabras
def read_in_chunks(file_object, chunk_size=256 * 1024):
    """
    Función Lazy (generador) para leer un archivo pieza por pieza.
    Tamaño de fragmento predeterminado: 256 k.
    """
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


dogs = 0
cats = 0
boys = 0
girls = 0

# Abra el archivo, lea en trozos secuencialmente y cuente el número de instancias de palabras
with open(f, "rb") as fin:
    for chunk in read_in_chunks(fin):
        line_list = chunk.decode('latin-1').strip().split()

        dogs += line_list.count("dog")
        cats += line_list.count("cat")
        boys += line_list.count("boy")
        girls += line_list.count("girl")
        
print(dogs, cats, boys, girls)

1 2 5 10
0.008787580999978672


## Strategy #2 Parallelized Workers

<img src = 'img/multi_process_reader.png' >

In [38]:
from timeit import default_timer as timer
import multiprocessing as mp
import psutil

# un archivo binario grande se divide en trozos que se envían a un grupo de procesos de trabajo
# método más rápido

# logical =  False (no contamos los núcleos lógicos de SMT)
num_procs = psutil.cpu_count(logical=False)

# ubicación de nuestros datos
f = "your_path_to/large_text.txt"

# Crea un 'generador' de Python
# Lee en un archivo binario grande en trozos de 256k
# Procesa cada fragmento y cuenta el recuento de palabras
def read_in_chunks(file_object, chunk_size=256 * 1024):
    """
    Función Lazy (generador) para leer un archivo pieza por pieza.
    Tamaño de fragmento predeterminado: 256 k.
    """
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


# Nuestra función de contador
def count_objects(chunk):
    dogs = 0
    cats = 0
    boys = 0
    girls = 0

    line_list = chunk.strip().split()

    dogs += line_list.count("dog")
    cats += line_list.count("cat")
    boys += line_list.count("boy")
    girls += line_list.count("girl")

    return dogs, cats, boys, girls


# Abra nuestro archivo grande
with open(f, "rb") as fin:
    # Leer en trozos
    chunks = [chunk.decode('latin-1') for chunk in read_in_chunks(fin)]

    
# Aquí asignamos nuestra función a los trozos
# Nosotros, nuestro grupo de trabajadores, hacemos todo el trabajo duro por nosotros
# Sin generación manual de objetos de proceso
def multi_read(n):
    print('Creating pool with %d processes\n' % n)
    with mp.Pool(n) as p:
        results = p.map(count_objects, chunks)
    return results

results = multi_read(num_procs)

all_dogs = 0
all_cats = 0
all_boys = 0
all_girls = 0

for dogs, cats, boys, girls in results:
    all_dogs += dogs
    all_cats += cats
    all_boys += boys
    all_girls += girls

print(all_dogs, all_cats, all_boys, all_girls)

FileNotFoundError: [Errno 2] No such file or directory: 'your_path_to/large_text.txt'

While there isn't a 1:1 scaleup, this is a nice speed up!