# Ejercicios 01_dask_delayed_am

### Ejercicio: paralelizar un bucle for

Los bucles *for* son una de las cosas más comunes que queremos paralelizar. Utilice `dask.delayed` en *inc* y *sum* para paralelizar el cálculo a continuación:

In [6]:
from dask import delayed

In [7]:
from time import sleep

def inc(x):
    sleep(1)
    return x+1

def add(x, y):
    sleep(1)
    return(x+y)

In [8]:
data = [1,2,3,4,5,6,7,8]

In [9]:
%%time
# código secuencial

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)
total

Wall time: 8.09 s


44

### Paralelo

In [10]:
from dask import delayed
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  
result = total.compute()
print("After computing :", result) 

Before computing: Delayed('sum-012ef0e1-f440-4e0a-93d4-c108b6b92882')
After computing : 44


In [11]:
total.visualize()

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

In [12]:
# tipo de objeto
delayed(inc)(1) + delayed(inc)(2)

Delayed('add-f8bf90d14cca85e655a4cea1f7c6484a')

### Ejercicio: paralelización de un código de bucle for con flujo de control

A menudo queremos retrasar solo algunas funciones, ejecutando algunas de ellas inmediatamente. Esto es especialmente útil cuando esas funciones son rápidas y nos ayudan a determinar qué otras funciones más lentas debemos llamar. Esta decisión, de retrasar o no retrasar, suele ser donde debemos ser reflexivos al utilizar *dask.delayed*.

En el siguiente ejemplo, iteramos a través de una lista de entradas. Si esa entrada es par, entonces queremos llamar a *inc*. Si la entrada es impar, queremos llamar a *doble*. Esta  función de decisión *is_even* para llamar  *inc* o *double* debe tomarse de inmediato (no de manera perezosa) para que el código Python de creación de nuestro gráfo continúe.

In [9]:
def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [10]:
%%time
# Sequential code

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
print(total)

90
Wall time: 10 s


### Código Paralelo

In [11]:
results = []
for x in data:
    if is_even(x):  # even
        y = delayed(double)(x)
    else:          # odd
        y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

In [12]:
%time total.compute()

Wall time: 2.02 s


90

In [13]:
total.visualize()

ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH

# Ejercicio 01_lazy_am

### Ejercicio

Aplicaremos el retraso a una tarea de procesamiento de datos real, aunque sea simple.

Leeremos tres archivos CSV con pd.read_csv y luego mediremos su longitud total. 

Consideraremos cómo haría esto con código Python ordinario, luego construiremos un grafo para este proceso usando `delayed`, y finalmente ejecutaremos este grafo usando Dask, para un factor de aceleración útil de más de dos (solo hay tres entradas para paralelizar).

In [14]:
%run prep.py -d accounts

In [15]:
import pandas as pd
import os
filenames = [os.path.join('data', 'accounts.%d.csv' % i) for i in [0, 1, 2]]
filenames

['data\\accounts.0.csv', 'data\\accounts.1.csv', 'data\\accounts.2.csv']

In [16]:
%time

# normal, sequential code
a = pd.read_csv(filenames[0])
b = pd.read_csv(filenames[1])
c = pd.read_csv(filenames[2])

na = len(a)
nb = len(b)
nc = len(c)

total = sum([na, nb, nc])
print(total)

Wall time: 0 ns
3000000


Su tarea es volver a crear este gráfico utilizando la función retrasada en el código Python original. Las tres funciones que desea retrasar son `pd.read_csv`, `len` and `sum`.. 

In [19]:
delayed_read_csv = delayed(pd.read_csv)
a = delayed_read_csv(filenames[0])
b = delayed_read_csv(filenames[1])
c = delayed_read_csv(filenames[2])
delayed_len = delayed(len)
na = delayed_len(a)
nb = delayed_len(b)
nc = delayed_len(c)

total = delayed(sum)([na,nb,nc])

# execute
%time total.compute()   

Wall time: 428 ms


3000000

A continuación, repita esto usando bucles, en lugar de escribir todas las variables.

In [20]:
csvs = [delayed(pd.read_csv)(fn) for fn in filenames]
lens = [delayed(len)(csv) for csv in csvs]
total = delayed(sum)(lens)
%time print(total.compute())


3000000
Wall time: 465 ms


# Ejercicios 02_bag_am

### Ejercicio: Calcular el monto total por nombre

Se desea agrupar con groupby (or foldby) la clave `name`, y sumar todos los nombes por cada `name`.

Pasos

1. Cree una pequeña función que, dado un diccionario como 

        {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
        
    produce las uma de los montos, por ejemplo `3`
    
2.  Cambie ligeramente el operador binario del ejemplo `foldby` anterior para que el operador binario no cuente el número de entradas, sino que acumule la suma de las cantidades.

In [21]:
%run prep.py -d accounts

In [22]:
from dask.distributed import Client

client = Client(n_workers=4)

In [27]:
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(1)

('{"id": 0, "name": "Alice", "transactions": [{"transaction-id": 1092, "amount": 1778}, {"transaction-id": 1745, "amount": 1766}, {"transaction-id": 1861, "amount": 1804}, {"transaction-id": 2588, "amount": 1992}, {"transaction-id": 2997, "amount": 1794}, {"transaction-id": 3149, "amount": 1984}, {"transaction-id": 3263, "amount": 1918}, {"transaction-id": 3708, "amount": 1873}, {"transaction-id": 3781, "amount": 1792}, {"transaction-id": 3818, "amount": 1762}, {"transaction-id": 3826, "amount": 1802}, {"transaction-id": 3908, "amount": 1743}, {"transaction-id": 3961, "amount": 1887}, {"transaction-id": 4083, "amount": 2029}, {"transaction-id": 4092, "amount": 1908}, {"transaction-id": 4966, "amount": 1749}, {"transaction-id": 5423, "amount": 1846}, {"transaction-id": 5645, "amount": 1811}, {"transaction-id": 5933, "amount": 1856}, {"transaction-id": 6312, "amount": 1859}, {"transaction-id": 6470, "amount": 1941}, {"transaction-id": 6714, "amount": 1893}, {"transaction-id": 6746, "amou

In [28]:
import dask.bag as db

filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(1)

('{"id": 0, "name": "Alice", "transactions": [{"transaction-id": 1092, "amount": 1778}, {"transaction-id": 1745, "amount": 1766}, {"transaction-id": 1861, "amount": 1804}, {"transaction-id": 2588, "amount": 1992}, {"transaction-id": 2997, "amount": 1794}, {"transaction-id": 3149, "amount": 1984}, {"transaction-id": 3263, "amount": 1918}, {"transaction-id": 3708, "amount": 1873}, {"transaction-id": 3781, "amount": 1792}, {"transaction-id": 3818, "amount": 1762}, {"transaction-id": 3826, "amount": 1802}, {"transaction-id": 3908, "amount": 1743}, {"transaction-id": 3961, "amount": 1887}, {"transaction-id": 4083, "amount": 2029}, {"transaction-id": 4092, "amount": 1908}, {"transaction-id": 4966, "amount": 1749}, {"transaction-id": 5423, "amount": 1846}, {"transaction-id": 5645, "amount": 1811}, {"transaction-id": 5933, "amount": 1856}, {"transaction-id": 6312, "amount": 1859}, {"transaction-id": 6470, "amount": 1941}, {"transaction-id": 6714, "amount": 1893}, {"transaction-id": 6746, "amou

In [29]:
import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(2)

({'id': 0,
  'name': 'Alice',
  'transactions': [{'transaction-id': 1092, 'amount': 1778},
   {'transaction-id': 1745, 'amount': 1766},
   {'transaction-id': 1861, 'amount': 1804},
   {'transaction-id': 2588, 'amount': 1992},
   {'transaction-id': 2997, 'amount': 1794},
   {'transaction-id': 3149, 'amount': 1984},
   {'transaction-id': 3263, 'amount': 1918},
   {'transaction-id': 3708, 'amount': 1873},
   {'transaction-id': 3781, 'amount': 1792},
   {'transaction-id': 3818, 'amount': 1762},
   {'transaction-id': 3826, 'amount': 1802},
   {'transaction-id': 3908, 'amount': 1743},
   {'transaction-id': 3961, 'amount': 1887},
   {'transaction-id': 4083, 'amount': 2029},
   {'transaction-id': 4092, 'amount': 1908},
   {'transaction-id': 4966, 'amount': 1749},
   {'transaction-id': 5423, 'amount': 1846},
   {'transaction-id': 5645, 'amount': 1811},
   {'transaction-id': 5933, 'amount': 1856},
   {'transaction-id': 6312, 'amount': 1859},
   {'transaction-id': 6470, 'amount': 1941},
   {'tran

In [31]:
import json
from operator import add
from dask import compute

def smallf(dic):
    sum_amt = sum([i['amount'] for i in dic['transactions']])
    return sum_amt

def incr(tot, dic):
    return tot + smallf(dic)
    
result = js.foldby(key='name',
                  binop=incr,
                  initial=0,
                  combine=add,
                  combine_initial=0).compute()
print(sorted(result))

[('Alice', 129021364), ('Bob', 300280575), ('Charlie', 141149909), ('Dan', 203269791), ('Edith', 495042631), ('Frank', 361012197), ('George', 161978852), ('Hannah', 223172217), ('Ingrid', 213943493), ('Jerry', 132312077), ('Kevin', 48877162), ('Laura', 202494361), ('Michael', 144975317), ('Norbert', 76520636), ('Oliver', 98638601), ('Patricia', 148323981), ('Quinn', 226677093), ('Ray', 444291927), ('Sarah', 272724395), ('Tim', 262654926), ('Ursula', 200103080), ('Victor', 315752810), ('Wendy', 103810130), ('Xavier', 150664754), ('Yvonne', 348753781), ('Zelda', 120378498)]
