# <span style="color:green"><center>Diplomado en Big Data</center></span>

# <span style="color:red"><center>Bag: Listas paralelas para datos semiestructurados<center></span>

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


##   <span style="color:blue">Profesores</span>

1. Alvaro Mauricio Montenegro Díaz, ammontenegrod@unal.edu.co
2. Daniel Mauricio Montenegro Reyes, dextronomo@gmail.com 

##   <span style="color:blue">Asesora Medios y Marketing digital</span>
 

4. Maria del Pilar Montenegro, pmontenegro88@gmail.com 

## <span style="color:blue">Contenido</span>

* [Introducción](#Introducción)
* [Creación de datos](#Creación-de-datos)
* [Creación de una colección bag](#Creación-de-una-colección-bag)
* [Manipulación de colecciones bag](#Manipulación-de-colecciones-bag)
* [Ejemplo: datos Accounts.JSON](#Ejemplo:-datos-Accounts.JSONe)
* [Groupby y Foldby](#Groupby-y-Foldby)
* [DataFrames](#DataFrames)
* [Limitaciones](#Limitaciones)
* [Aprender más](#Aprender-más)
* [Shutdown](#Shutdown)


## <span style="color:blue">Fuente</span>

Esta es una traducción libre del tutorial disponible en [dask-tutorial](https://github.com/dask/dask-tutorial).

## <span style="color:blue">Introducción</span>

`Dask-bag` sobresale en el procesamiento de datos que se pueden representar como una secuencia de entradas arbitrarias. Nos referiremos a esto como datos "desordenados" (`messy data`), porque pueden contener estructuras anidadas complejas, campos faltantes, mezclas de tipos de datos, etc. El estilo de programación * funcional * encaja muy bien con la iteración estándar de Python, como se puede encontrar en el módulo `itertools`.

Los datos desordenados a menudo se encuentran al comienzo de las canalizaciones de procesamiento de datos cuando se consumen por primera vez grandes volúmenes de datos sin procesar. El conjunto inicial de datos puede ser JSON, CSV, XML o cualquier otro formato que no aplique una estructura y tipos de datos estrictos.
Por esta razón, el procesamiento y el procesamiento de datos iniciales a menudo se realiza con `list`s,` dict`s y `set`s de Python.

Estas estructuras de datos centrales están optimizadas para el almacenamiento y procesamiento de uso general. Agregar cálculo de transmisión con iteradores / expresiones generadoras o bibliotecas como `itertools` o [` toolz`] (https://toolz.readthedocs.io/en/latest/) nos permite procesar grandes volúmenes en un espacio pequeño. Si combinamos esto con el procesamiento en paralelo, podemos generar una cantidad considerable de datos.

Dask.bag es una colección de Dask de alto nivel para automatizar cargas de trabajo comunes de esta forma. En una palabra

    dask.bag = mapa, filtro, toolz + ejecución paralela

    
### Documentación relacionada

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## <span style="color:blue">Creación de datos</span>

In [None]:
%run prep.py -d accounts

## <span style="color:blue">Configuración</span>

Nuevamente, usaremos el programador distribuido. Los programadores se explicarán en profundidad [más adelante] (05_distributed.ipynb).

In [1]:
from dask.distributed import Client

client = Client(n_workers=4)

## <span style="color:blue">Creación de una colección bag</span>

Puede crear un Bag a partir de una secuencia de Python, de archivos, de datos en S3, etc. Demostramos el uso de `.take()` para mostrar elementos de los datos. (Hacer .take(1) da como resultado una tupla con un elemento)

Tenga en cuenta que los datos se dividen en bloques y hay muchos elementos por bloque. En el primer ejemplo, las dos particiones contienen cinco elementos cada una, y en los dos siguientes, cada archivo está dividido en uno o más bloques de bytes.

+ lista [1,2,3,4,3]
+ tupla (3,5,6,5)
+ dict {key1:4, key2:4, }
+ set {3,4,5}


+ bag ()

map reduce

In [78]:
# each element is an integer
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], npartitions=2)
b.take(6)

(1, 2, 3, 4, 5)

In [3]:
for item in b:
    print(item, end=' ')

1 2 3 4 5 6 7 8 9 10 

In [None]:
# each element is a text file, where each line is a JSON object
# note that the compression is handled automatically
import os
b = db.read_text(os.path.join('../data', 'accounts.*.json.gz'))
b.take(1)

In [10]:
import os
files = os.path.join('../data', 'accounts.*.json.gz')
b = db.read_text(files)
print(files)

../data/accounts.*.json.gz


In [None]:
b.take(1)

In [81]:
# Edit sources.py to configure source locations
import sys
sys.path.append('../') # para acceder al archivo source en la carpeta base de esta carpeta

import sources
sources.bag_url

's3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv'

s3 = Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento de objetos que ofrece escalabilidad, disponibilidad de datos, seguridad y rendimiento. s3 es exabytes y permite la creación de grandes lagos de datos.

+  define un protocolo de intercambio de información, en este caso s3. Otros protolos son

+ s3://
+ hdf5:// para archivos hdf5
+ gcs:// para archivo parquet

In [10]:
# Requires `s3fs` library
# each partition is a remote CSV text file
b = db.read_text(sources.bag_url,
                 storage_options={'anon': True})
b.take(2)

('VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\n',
 '2,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896484375,40.750110626220703,1,N,-73.974784851074219,40.750617980957031,1,12,1,0.5,3.25,0,0.3,17.05\n')

In [13]:
help(db.read_text)

Help on function read_text in module dask.bag.text:

read_text(urlpath, blocksize=None, compression='infer', encoding='utf-8', errors='strict', linedelimiter='\n', collection=True, storage_options=None, files_per_partition=None, include_path=False)
    Read lines from text files
    
    Parameters
    ----------
    urlpath : string or list
        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
        to read from alternative filesystems. To read from multiple files you
        can pass a globstring or a list of paths, with the caveat that they
        must all have the same protocol.
    blocksize: None, int, or str
        Size (in bytes) to cut up larger files.  Streams by default.
        Can be ``None`` for streaming, an integer number of bytes, or a string
        like "128MiB"
    compression: string
        Compression format like 'gzip' or 'xz'.  Defaults to 'infer'
    encoding: string
    errors: string
    linedelimiter: string
    collection: boo

## <span style="color:blue">Manipulación de colecciones bag</span>

Los objetos  `Bag` contienen la API funcional estándar que se encuentra en proyectos como la biblioteca estándar de Python, `toolz`, o `pyspark`, incluyendo `map`, `filter`, `groupby`, etc..

Operaciones sobre objetos `Bag` crea nuevos objetos bags. Llame el método `.compute()` para lanzar la ejecución, como hicimos para objetos  `Delayed`.  

In [83]:
def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c.compute()

[4, 16, 36, 64, 100]

In [16]:
def is_even(n):
    return  n%2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x**2)

In [17]:
# blocking form: wait for completion (which is very fast in this case)
c.compute()

[4, 16, 36, 64, 100]

## <span style="color:blue">Ejemplo: datos Accounts.JSON</span>

Hemos creado un conjunto simulado de datos JSON comprimidos con gzip en el directorio de datos. Esto es como el ejemplo usado en el ejemplo de `DataFrame` que veremos más adelante, excepto que ha agrupado todos los entradas para cada `id` individual en un solo registro. Esto es similar a los datos que puede recopilar de una base de datos de almacén de documentos o una API web.

Cada línea es un diccionario codificado JSON con las siguientes claves

- id: Identificador único del cliente
- name: Nombre del cliente
- transactions: Lista de pares  (transaction-id, amount) , uno para cada transacción para el cliente en ese archivo

In [None]:
filename = os.path.join('../data', 'accounts.*.json.gz')
lines = db.read_text(filename)
lines.take(1)

In [19]:
filename

'../data/accounts.*.json.gz'

Nuestros datos salen del archivo como líneas de texto. Tenga en cuenta que la descompresión de archivos se produjo automáticamente. Podemos hacer que estos datos parezcan más razonables mapeando la función `json.loads` en nuestro `bag`.

In [None]:
import json
js = lines.map(json.loads)
# take: inspect first few elements
js.take(2)

### Consultas básicas

Una vez que traducimos (parse) nuestros datos JSON en objetos Python adecuados (`dict`s,` list`s, etc.) podemos realizar consultas más interesantes creando pequeñas funciones de Python para ejecutar en nuestros datos.

In [None]:
# filter: keep only some elements of the sequence
js.filter(lambda record: record['name'] == 'Alice').take(5)

In [88]:
def count_transactions(d):
    return {'name': d['name'], 'count': len(d['transactions'])}

# map: apply a function to each element
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .take(5))

({'name': 'Alice', 'count': 18},
 {'name': 'Alice', 'count': 30},
 {'name': 'Alice', 'count': 209},
 {'name': 'Alice', 'count': 189},
 {'name': 'Alice', 'count': 265})

In [10]:
# pluck: select a field, as from a dictionary, element[field]
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .take(5))

(18, 30, 209, 189, 265)

In [11]:
# Average number of transactions for all of the Alice entries
(js.filter(lambda record: record['name'] == 'Alice')
   .map(count_transactions)
   .pluck('count')
   .mean()
   .compute())

167.1822222222222

### Uso de  `flatten` des-anidar los datos 

En el siguiente ejmeplo usamos `.flatten()` para aplanar los resultados.  Vaos a calcular el monto promedio de todas las transacciones para todas las Alices.

In [None]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .take(3))

In [13]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .take(3))

({'transaction-id': 330, 'amount': 402},
 {'transaction-id': 1804, 'amount': 372},
 {'transaction-id': 3347, 'amount': 377})

In [35]:
(js.filter(lambda record: record['name']=='Alice')
 .pluck('transactions')
 .flatten()
 .take(3))

({'transaction-id': 330, 'amount': 402},
 {'transaction-id': 1804, 'amount': 372},
 {'transaction-id': 3347, 'amount': 377})

In [14]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .take(3))

(402, 372, 377)

In [15]:
(js.filter(lambda record: record['name'] == 'Alice')
   .pluck('transactions')
   .flatten()
   .pluck('amount')
   .mean()
   .compute())

907.8130848575074

## <span style="color:blue">Groupby y Foldby</span>

Con frecuencia queremos agrupar datos por alguna función o clave. Podemos hacer esto con el método `.groupby`, que es sencillo pero obliga a una mezcla completa de los datos (costoso) o con el método` .foldby`, más difícil de usar pero más rápido, que hace una transmisión combinada groupby y reducción.

* `groupby`: Mezcla los datos para que todos los elementos con la misma clave estén en el mismo par clave-valor
* `foldby`: recorre los datos acumulando un resultado por clave

* Nota: el `groupby` completo es particularmente malo. En cargas de trabajo reales, haría bien en usar `foldby` o cambiar a` DataFrame`s si es posible. 

### `groupby`

Groupby recopila elementos de su colección para que todos los elementos con el mismo valor en alguna función se recopilen juntos en un par clave-valor.

In [42]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan','Edith', 'Frank'])
b.groupby(len).compute()

[(7, ['Charlie']), (3, ['Dan', 'Bob']), (5, ['Edith', 'Frank', 'Alice'])]

In [48]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()

[(0, [4, 6, 8, 0, 2]), (1, [3, 5, 7, 9, 1])]

In [47]:
b.groupby(lambda x: x % 2).starmap(lambda k, v: (k, max(v))).compute()

[(0, 8), (1, 9)]

### `foldby`

Foldby puede resultar bastante extraño al principio. Es similar a las siguientes funciones de otras bibliotecas:

* [`toolz.reduceby`] (http://toolz.readthedocs.io/en/latest/streaming-analytics.html#streaming-split-apply-combine)
* [`pyspark.RDD.combineByKey`] (http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/)

Cuando usa `foldby` usted proporciona

1. Una función clave en la que agrupar elementos
2. Un operador binario como el que pasaría a `reduce` que usa para realizar la reducción por cada grupo
3. Un operador binario combinado que puede combinar los resultados de dos llamadas de `reduce` en diferentes partes de su conjunto de datos.

La reducción debe ser asociativa. Ocurrirá en paralelo en cada una de las particiones de su conjunto de datos. Entonces, todos estos resultados intermedios serán combinados por el operador binario `combine`.

In [60]:
b.foldby(lambda x: x % 2, binop=max, combine=max).compute()

[(0, 8), (1, 9)]

### Ejemplo con los datos de account.json

Encontrar el número de personas con el mismo nombre

In [None]:
%%time
# Warning, this one takes a while...
result = js.groupby(lambda item: item['name']).starmap(lambda k, v: (k, len(v))).compute()
print(sorted(result))

In [28]:
%%time
# This one is comparatively fast and produces the same result.
from operator import add
def incr(tot, _):
    return tot + 1

result = js.foldby(key='name', 
                   binop=incr, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()
print(sorted(result))

[('Alice', 1350), ('Bob', 1044), ('Charlie', 700), ('Dan', 1246), ('Edith', 1200), ('Frank', 800), ('George', 1250), ('Hannah', 950), ('Ingrid', 950), ('Jerry', 850), ('Kevin', 1110), ('Laura', 800), ('Michael', 1089), ('Norbert', 1300), ('Oliver', 800), ('Patricia', 1150), ('Quinn', 850), ('Ray', 1150), ('Sarah', 862), ('Tim', 1126), ('Ursula', 800), ('Victor', 500), ('Wendy', 849), ('Xavier', 578), ('Yvonne', 700), ('Zelda', 844)]
CPU times: user 474 ms, sys: 85.9 ms, total: 560 ms
Wall time: 3.34 s


### Exercicio: Calcular el monto total por nombre

Se desea agrupar con groupby (or foldby) la clave `name`, ty sumar todos los nombes por cada `name`.

Pasos

1. Cree una pequeña función que, dado un diccionario como 

        {'name': 'Alice', 'transactions': [{'amount': 1, 'id': 123}, {'amount': 2, 'id': 456}]}
        
    produce las uma de los montos, por ejemplo `3`
    
2.  Cambie ligeramente el operador binario del ejemplo `foldby` anterior para que el operador binario no cuente el número de entradas, sino que acumule la suma de las cantidades.

In [None]:
# Your code here...

## <span style="color:blue">DataFrames</span>

Por las mismas razones por las que Pandas es a menudo más rápido que Python puro, `dask.dataframe` puede ser más rápido que` dask.bag`. Trabajaremos más con DataFrames más adelante, pero desde el punto de vista de un Bag, con frecuencia es el punto final de la parte "desordenada" (`messy`) de la ingestión de datos: una vez que los datos se pueden convertir en un marco de datos, luego una división compleja -aplicar-combinar la lógica será mucho más sencilla y eficiente.

Puede transformar una bolsa con una tupla simple o una estructura de diccionario plano en un `dask.dataframe` con el método` to_dataframe`.


In [30]:
df1 = js.to_dataframe()
df1.head()

Unnamed: 0,id,name,transactions
0,0,Wendy,"[{'transaction-id': 167, 'amount': 1471}, {'tr..."
1,1,Oliver,"[{'transaction-id': 111, 'amount': 338}, {'tra..."
2,2,Charlie,"[{'transaction-id': 11579, 'amount': 1615}, {'..."
3,3,Ursula,"[{'transaction-id': 258, 'amount': 4056}, {'tr..."
4,4,Dan,"[{'transaction-id': 3422, 'amount': 197}, {'tr..."


Esto ahora parece un DataFrame bien definido, y podemos aplicarle cálculos similares a Pandas de manera eficiente.

Usando un Dask DataFrame, ¿cuánto tiempo lleva hacer nuestro cálculo previo de la cantidad de personas con el mismo nombre? Resulta que `dask.dataframe.groupby ()` supera a `dask.bag.groupby ()` en más de un orden de magnitud; pero todavía no puede coincidir con `dask.bag.foldby ()` para este caso.

In [31]:
%time df1.groupby('name').id.count().compute().head()

CPU times: user 593 ms, sys: 75.6 ms, total: 669 ms
Wall time: 4.54 s


name
Alice      1350
Bob        1044
Charlie     700
Dan        1246
Edith      1200
Name: id, dtype: int64

### Desnormalización

Este formato de DataFrame es menos que óptimo porque la columna de transacciones está llena de datos anidados, por lo que Pandas tiene que volver al tipo de objeto de objeto, que es bastante lento en Pandas. Idealmente, queremos transformarnos en un marco de datos solo después de haber aplanado nuestros datos para que cada registro sea un solo `int`,` string`, `float`, etc.

In [32]:
def denormalize(record):
    # returns a list for each person, one item per transaction
    return [{'id': record['id'], 
             'name': record['name'], 
             'amount': transaction['amount'], 
             'transaction-id': transaction['transaction-id']}
            for transaction in record['transactions']]

transactions = js.map(denormalize).flatten()
transactions.take(3)

({'id': 0, 'name': 'Wendy', 'amount': 1471, 'transaction-id': 167},
 {'id': 0, 'name': 'Wendy', 'amount': 1348, 'transaction-id': 211},
 {'id': 0, 'name': 'Wendy', 'amount': 1528, 'transaction-id': 389})

In [None]:
df = transactions.to_dataframe()
df.head()

In [None]:
%%time
# number of transactions per name
# note that the time here includes the data load and ingestion
df.groupby('name')['transaction-id'].count().compute()

## <span style="color:blue">Limitaciones</span>

Las colecciones bag proporcionan cálculos muy generales (cualquier función de Python). Esta generalidad
tiene un costo. Las bolsas tienen las siguientes limitaciones conocidas

1. Las operaciones de bag tienden a ser más lentas que los cálculos de *matrices/marcos de datos* (dataframes)    de la misma manera que Python tiende a ser más lento que *NumPy/Pandas*
2. `Bag.groupby` es lento. Debería intentar usar `Bag.foldby` si es posible. El uso de `Bag.foldby` requiere más reflexión. Aún mejor, considere la posibilidad de crear un marco de datos normalizado.

## <span style="color:blue">Aprender más</span>

* [Bag documentation](https://docs.dask.org/en/latest/bag.html)
* [Bag screencast](https://youtu.be/-qIiJ1XtSv0)
* [Bag API](https://docs.dask.org/en/latest/bag-api.html)
* [Bag examples](https://examples.dask.org/bag.html)

## <span style="color:blue">Shutdown</span>

In [16]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
