<img src="https://drive.google.com/uc?export=view&id=1YjAWn06OMcVhlyixBZBDnY17rnn7Otg5" width="100%">

# Dataframes de Dask

En este notebook veremos una introducción práctica al procesamiento distribuido con la librería `dask`, primero lo instalaremos:

In [1]:
!pip install dask[complete] h5py

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## **1. ¿Qué son los DataFrames de Dask?**
---

Los DataFrames de `dask` son una estructura de datos tabular que está compuesta de múltiples DataFrames de `pandas`:

<img src="https://drive.google.com/uc?export=view&id=1UCpyAI0-9KfNyNfWTq84ltSTEU59862N" width="70%">

Este tipo de estructura de datos permite coordinar, paralelizar y distribuir series y dataframes de `pandas`, dándonos una forma de uso muy similar a los `pd.DataFrame`.

Generalmente usamos los `DataFrame` de `dask` cuando:

* Tenemos conjuntos de datos grandes que no caben en la memoria RAM.
* Queremos acelerar operaciones sobre datasets usando varios núcleos de un computador o varios nodos.

Veamos los detalles de este tipo de estructuras. Primero importamos el módulo para usar DataFrames:

In [2]:
import numpy as np
import pandas as pd
import dask.dataframe as dd

## **2. Creación**
---

Existen diversas funciones para crear `DataFrames` en `dask`, veamos algunos casos:

* `from_pandas`: podemos crear un `DataFrame` de `dask` desde un `pd.DataFrame`:

In [3]:
df = pd.DataFrame(
        {
            "A": np.random.uniform(-1, 1, 100),
            "B": np.random.randint(1, 5, 100)
            },
        )
print(df)

           A  B
0   0.788902  3
1  -0.881308  1
2  -0.081425  1
3   0.188149  1
4   0.324244  2
..       ... ..
95 -0.537219  1
96 -0.553918  4
97  0.797362  4
98 -0.043168  2
99 -0.129599  1

[100 rows x 2 columns]


Creamos la tabla en `dask`:

In [4]:
df_dask = dd.from_pandas(df, npartitions=2)
print(df_dask)

Dask DataFrame Structure:
                     A      B
npartitions=2                
0              float64  int64
50                 ...    ...
99                 ...    ...
Dask Name: from_pandas, 2 tasks


El parámetro `n_partitions` nos permite especificar en cuántos chunks se divide el `DataFrame`, también puede usar el parámetro `chunks` de forma equivalente a los arreglos de `dask`:

In [5]:
df_dask = dd.from_pandas(df, chunksize=10)
print(df_dask)

Dask DataFrame Structure:
                      A      B
npartitions=10                
0               float64  int64
10                  ...    ...
...                 ...    ...
90                  ...    ...
99                  ...    ...
Dask Name: from_pandas, 10 tasks


* `from_array`: permite crear un `DataFrame` a partir de un arreglo de `numpy`:

In [6]:
df = dd.from_array(
        np.random.uniform(0, 1, size=(10, 2)),
        columns=["grade", "value"],
        )
print(df)

Dask DataFrame Structure:
                 grade    value
npartitions=1                  
0              float64  float64
9                  ...      ...
Dask Name: from_array, 1 tasks


* `from_dict`: permite crear un `DataFrame` a partir de un diccionario de _Python_:

In [7]:
data = {
        "grades": np.random.uniform(0, 5, 100),
        "id": np.random.randint(100, size=(100,))
        }
df = dd.from_dict(data, npartitions=2)
print(df)

AttributeError: ignored

Como vimos, es posible crear `DataFrames` de `dask` desde multiples estructuras de datos clásicas de _Python_, no obstante, estos enfoques requieren que los datos estén en la memoria RAM (lo cual es imposible con grandes cantidades de datos). Por ello, normalmente estaremos creando `DataFrames` de `dask` por medio de funciones de lectura para distintos tipos de formatos de datos tabulares, algunos ejemplos comunes son:

* `dd.read_csv`: funciona igual que la función de `pandas` (tiene los mismos parámetros), no obstante, agrega el parámetro `blocksize` (tamaño de las particiones en bytes) para controlar las particiones.
* `dd.read_json`: funciona igual que la función de `pandas` y también agrega el parámetro `blocksize`.
* `dd.read_sql_query`: funciona igual que la función `pd.read_sql` pero agrega el parámetro `npartitions` para controlar el número de particiones.
* `dd.read_parquet`: permite cargar archivos en formato `parquet`, el cual es un tipo de formato que ya viene particionado y resulta ser muy compatible con `dask` como lo veremos más adelante.

Veamos un ejemplo donde cargamos un conjunto de datos desde `dask`. Usaremos el conjunto de datos [Netflix Data: Cleaning, Analysis and Visualization](https://www.kaggle.com/datasets/ariyoomotade/netflix-data-cleaning-analysis-and-visualization), el cual incluye información acerca del contenido añadido a la plataforma de streaming *Netflix* entre el $2008$ y el $2021$. Está conformado por 10 columnas, las cuales son:

* `show_id`: corresponde a la llave primaria de la tabla. Tiene un formato establecido el cual corresponde a una <i>s</i> seguida de un número en secuencia, por ejemplo: s34.
* `type`: indica el tipo de show ofrecido (Película o Serie).
* `title`: señala el nombre de la serie o la película.
* `director`: indica el nombre de quién dirigió la película o serie.
* `country`: indica el lugar de producción del show.
* `date_added`: muestra la fecha de publicación de la serie o película en la plataforma con el formato <i>MMMMMM DD, AAAA</i>.
* `release_year`: muestra el año de publicación original de la película o serie.
* `rating`: muestra las calificaciones o el nivel de conveniencia de la película según su contenido, por ejemplo: PG-13, TV-PG, etc.
* `duration`: corresponde a la duración en minutos en el caso de las películas y la cantidad de temporadas en el caso de las series.
* `listed_in`: indica el género o categoría donde se clasifica la serie o película dentro de la plataforma.

Primero, descargamos el conjunto de datos:

In [8]:
!wget 'https://drive.google.com/uc?export=view&id=1sgr_sNm1UPjVTV0PbjcVAgLduO9yt15g' -O 'netflix2.zip'

--2022-11-21 21:37:16--  https://drive.google.com/uc?export=view&id=1sgr_sNm1UPjVTV0PbjcVAgLduO9yt15g
Resolving drive.google.com (drive.google.com)... 142.251.2.139, 142.251.2.113, 142.251.2.100, ...
Connecting to drive.google.com (drive.google.com)|142.251.2.139|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://doc-14-4g-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/nkj8341jruea1ocbaq8s1mb76t182mld/1669066575000/16848862265445619282/*/1sgr_sNm1UPjVTV0PbjcVAgLduO9yt15g?e=view&uuid=8370f003-2bea-4112-a7fd-76148f2c8abc [following]
--2022-11-21 21:37:17--  https://doc-14-4g-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/nkj8341jruea1ocbaq8s1mb76t182mld/1669066575000/16848862265445619282/*/1sgr_sNm1UPjVTV0PbjcVAgLduO9yt15g?e=view&uuid=8370f003-2bea-4112-a7fd-76148f2c8abc
Resolving doc-14-4g-docs.googleusercontent.com (doc-14-4g-docs.googleusercontent.com)... 142.251.2.132, 2607:f8b0:4023:c0d::84

Ahora, lo descomprimimos:

In [9]:
![[ -d 'netflix2.parquet' ]] && rm -rf 'netflix2.parquet'
!unzip 'netflix2.zip'

Archive:  netflix2.zip
   creating: netflix2.parquet/
  inflating: netflix2.parquet/part.14.parquet  
  inflating: netflix2.parquet/part.9.parquet  
  inflating: netflix2.parquet/part.16.parquet  
  inflating: netflix2.parquet/part.1.parquet  
  inflating: netflix2.parquet/part.0.parquet  
  inflating: netflix2.parquet/part.3.parquet  
  inflating: netflix2.parquet/part.10.parquet  
  inflating: netflix2.parquet/part.19.parquet  
  inflating: netflix2.parquet/part.2.parquet  
  inflating: netflix2.parquet/part.15.parquet  
  inflating: netflix2.parquet/part.8.parquet  
  inflating: netflix2.parquet/part.18.parquet  
  inflating: netflix2.parquet/part.4.parquet  
  inflating: netflix2.parquet/part.13.parquet  
  inflating: netflix2.parquet/part.7.parquet  
  inflating: netflix2.parquet/part.6.parquet  
  inflating: netflix2.parquet/part.12.parquet  
  inflating: netflix2.parquet/part.17.parquet  
  inflating: netflix2.parquet/part.11.parquet  
  inflating: netflix2.parquet/part.5.parque

## **3. Apache Parquet**
---

En este caso, el conjunto de datos se encuentra en formato `parquet`, se trata de un formato de código abierto, orientado a columnas (como _Cassandra_) que está pensado para un almacenamiento eficiente y de rápida lectura:

<img src="https://drive.google.com/uc?export=view&id=1eVa_GVCe0HYiWmzONc-zx3YFSin_aopx" width="40%">

Este formato tiene tres componentes:

* **Header**: guarda información general del archivo (por ejemplo, el id de la partición que estamos manejando).
* **Data block**: almacena la información como chunks columnares de los datos.
* **Footer**: almacena metadatos del archivo (por ejemplo, fecha de creación, versión del formato, esquema de columnas, tipos, entre otros).

Este formato es muy popular hoy en día para almacenar conjuntos de datos por los siguientes motivos:

* Almacena los tipos de cada columna.
* Es particionado, lo cual facilita la tranferencia de datos.
* Se lee bastante rápido, lo cual hace que sea preferido sobre formatos clásicos como `csv` o `excel`.

El formato `parquet` es normalmente usado para crear _Data Lakes_. Veamos cómo cargar este conjunto de datos con `dask`:

In [10]:
df = dd.read_parquet("netflix2.parquet")
print(df)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: read-parquet, 20 tasks


Como se puede ver, el archivo ya trae `20` particiones (nativas del formato `parquet`). Veamos algunos detalles de los `DataFrames` de `dask` con este conjunto de datos.

## **4. Atributos y Propiedades**
---

Los `DataFrame` de `dask` tienen una forma de uso muy parecida a los de `pandas`, sin embargo, en `dask` no tenemos los resultados cargados directamente en la memoria RAM. Funcionan de una forma muy equivalente a los arreglos de `dask` y terminan siendo promesas de `DataFrame` de `pandas`. Veamos los atributos más comunes que se usan en `dask`:

* `columns`: permite obtener el nombre de las columnas del `DataFrame`:

In [11]:
print(df.columns)

Index(['show_id', 'type', 'title', 'director', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in'],
      dtype='object')


* `dtypes`: permite extraer los tipos que tiene cada columna del `DataFrame`:

In [12]:
print(df.dtypes)

show_id         object
type            object
title           object
director        object
country         object
date_added      object
release_year     int64
rating          object
duration        object
listed_in       object
dtype: object


* `shape`: permite extraer el tamaño del `DataFrame`:

In [13]:
print(df.shape)

(Delayed('int-f05d3e22-a67c-49f8-9beb-12bfaeecb9db'), 10)


Note que el resultado tiene un tipo `Delayed` que no es directamente un número, esto se debe a que `dask` no conoce qué tamaño va a tener el arreglo en memoria (no ha sido cargado). Podemos calcular el tamaño con el método `compute`:

In [14]:
print(df.shape[0].compute())

8790


* `npartitions`: permite obtener el número de particiones del `DataFrame`:

In [15]:
print(df.npartitions)

20


* También podemos acceder a una columna específica por medio de la notación punto, por ejemplo, podemos obtener una serie de `Dask` al acceder a la propiedad `title` (nombre de columna) del `DataFrame`:

In [16]:
col = df.title
print(col)

Dask Series Structure:
npartitions=20
    object
       ...
     ...  
       ...
       ...
Name: title, dtype: object
Dask Name: getitem, 40 tasks


## **5. Métodos**
---

Los métodos de los `DataFrames` en `dask` tratan de ser lo más cercanos posibles a los métodos en `pandas`.

Al igual que con los arreglos de `dask`, hay un método clave que nos permite evaluar los resultados directamente en memoria. Por ejemplo, el siguiente código extrae los primeros `5` registros usando el método `head`:

In [17]:
res = df.head(5)
print(res)

  show_id     type                             title         director  \
0      s1    movie              dick johnson is dead  kirsten johnson   
1      s3  tv show                         ganglands  julien leclercq   
2      s6  tv show                     midnight mass    mike flanagan   
3     s14    movie  confessions of an invisible girl    bruno garotti   
4      s8    movie                           sankofa     haile gerima   

         country date_added  release_year rating duration  \
0  united states  9/25/2021          2020     pg      min   
1         france  9/24/2021          2021   tvma   season   
2  united states  9/24/2021          2021   tvma   season   
3         brazil  9/22/2021          2021   tvpg      min   
4  united states  9/24/2021          1993   tvma      min   

                                           listed_in  
0                                      documentaries  
1  crime tv shows international tv shows tv actio...  
2                   tv dramas

El resultado obtenido es un `DataFrame` de `pandas`:

In [18]:
print(type(res))

<class 'pandas.core.frame.DataFrame'>


Otro método específico en `dask` es `repartition`, el cual permite cambiar el número de particiones de un `DataFrame`:

In [19]:
df2 = df.repartition(npartitions=40)
print(df2)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=40                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: repartition, 80 tasks


Veamos algunos de los métodos más comunes que se usan con los `DataFrames` de `dask`:

* `info`: permite obtener una descripción muy general del `DataFrame` (mucho más compacta que la de `pandas` ya que no hemos cargado el conjunto de datos completo):

In [20]:
print(df.info())

<class 'dask.dataframe.core.DataFrame'>
Columns: 10 entries, show_id to listed_in
dtypes: object(9), int64(1)None


* `describe`: permite obtener estadísticas generales del conjunto de datos. Recuerde que el parámetro `include` funciona como en `pandas` y permite seleccionar los tipos de columnas a describir.

In [21]:
desc = df.describe(include="all")
print(desc)

Dask DataFrame Structure:
              show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=1                                                                                            
               object  object  object   object  object     object      float64  object   object    object
                  ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: describe, 830 tasks


Debemos evaluarlo para ver el resultado:

In [22]:
print(desc.compute())

       show_id   type title   director        country date_added  \
unique    8790      2  8716       4526             86       1713   
count     8790   8790  8790       8790           8790       8790   
top         s1  movie        not given  united states   1/1/2020   
freq         1   6126    26       2588           3240        110   
mean       NaN    NaN   NaN        NaN            NaN        NaN   
std        NaN    NaN   NaN        NaN            NaN        NaN   
min        NaN    NaN   NaN        NaN            NaN        NaN   
25%        NaN    NaN   NaN        NaN            NaN        NaN   
50%        NaN    NaN   NaN        NaN            NaN        NaN   
75%        NaN    NaN   NaN        NaN            NaN        NaN   
max        NaN    NaN   NaN        NaN            NaN        NaN   

        release_year rating duration                    listed_in  
unique           NaN     12        3                          513  
count    8790.000000   8790     8790           

* `mean`: permite obtener el promedio por columnas del `DataFrame`:

In [23]:
mean = df.mean()
print(mean)

Dask Series Structure:
npartitions=1
country    float64
type           ...
dtype: float64
Dask Name: dataframe-mean, 83 tasks


  meta = self._meta_nonempty.mean(axis=axis, skipna=skipna)


Debemos evaluarlo para ver el resultado (únicamente aplica sobre las columnas numéricas):

In [25]:
print(mean.compute())

release_year    2014.183163
dtype: float64


* `std`: permite obtener la desviación estándar por columnas del `DataFrame`:

In [26]:
std = df.std()
print(std)

Dask Series Structure:
npartitions=1
country    float64
type           ...
dtype: float64
Dask Name: dataframe-std, 90 tasks


  meta = self._meta_nonempty.std(axis=axis, skipna=skipna)
  meta = self._meta_nonempty.var(axis=axis, skipna=skipna)


Debemos evaluarlo para ver el resultado (únicamente aplica sobre las columnas numéricas):

In [27]:
print(std.compute())

release_year    8.825466
dtype: float64


* `value_counts`: permite obtener un recuento de valores en una columna dada:

In [28]:
type_counts = df.type.value_counts()
print(type_counts)

Dask Series Structure:
npartitions=1
    int64
      ...
Name: type, dtype: int64
Dask Name: value-counts-agg, 64 tasks


Debemos evaluarlo para ver el resultado:

In [29]:
print(type_counts.compute())

movie      6126
tv show    2664
Name: type, dtype: int64


* `rename`: permite cambiar el esquema del `DataFrame`:

In [30]:
df2 = df.rename(columns={"title": "titulo"})
print(df2.columns)

Index(['show_id', 'type', 'titulo', 'director', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in'],
      dtype='object')


* `astype`: permite cambiar los tipos:

In [31]:
new_col = df.release_year.astype("string")
print(new_col)

Dask Series Structure:
npartitions=20
    string
       ...
     ...  
       ...
       ...
Name: release_year, dtype: string
Dask Name: astype, 60 tasks


También funciona sobre varias columnas de un `DataFrame`:

In [32]:
df2 = df.astype({"release_year": "string", "title": "string"})
print(df2.dtypes)

show_id         object
type            object
title           string
director        object
country         object
date_added      object
release_year    string
rating          object
duration        object
listed_in       object
dtype: object


* `isna`: permite detectar valores faltantes, por ejemplo, la siguiente celda calcula el número de valores faltantes por columna:

In [33]:
nas = df.isna().sum()
print(nas)

Dask Series Structure:
npartitions=1
country    int64
type         ...
dtype: int64
Dask Name: dataframe-sum-agg, 61 tasks


Evaluamos el resultado:

In [34]:
print(nas.compute())

show_id         0
type            0
title           0
director        0
country         0
date_added      0
release_year    0
rating          0
duration        0
listed_in       0
dtype: int64


* `dropna`: permite eliminar valores faltantes, funciona exactamente como lo hace la función de `pandas`:

In [35]:
res = df.dropna()
print(res)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: dropna, 40 tasks


* `fillna`: permite reemplazar valores faltante, funciona exactamente como lo hace la función de `pandas`:

In [36]:
res = df.fillna(0)
print(res)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: fillna, 40 tasks


* `apply`: esta función tiene la misma utilidad que en `pandas`, no obstante, resulta ser bastante importante en `dask` ya que nos permite ejecutar una función de _Python_ de forma distribuida y paralelizada sobre un `DataFrame`, por ejemplo, la siguiente función calcula los dos últimos dígitos del año de publicación de un show:

In [37]:
def get_digits(year):
    return year % 100

Veamos algunos ejemplos:

In [38]:
print(get_digits(2009))

9


In [39]:
print(get_digits(1996))

96


In [40]:
print(get_digits(2022))

22


Podemos aplicarla sobre la columna `release_year` con `dask`.

In [41]:
digits = (
        df
        .release_year
        .apply(get_digits, meta=("release_year", "int64"))
        .compute()
        )
print(digits)

0       20
1       21
2       21
3       21
4       93
        ..
8785    16
8786    16
8787    12
8788    16
8789    16
Name: release_year, Length: 8790, dtype: int64


En este caso, agregamos el parámetro `meta` para dar más información a `dask` sobre el tipo de columna que es `release_year`. Ya que `dask` por defecto trabaja sobre unos tipos inferidos de la carga de datos que pueden ser erróneos.

* `map`: permite mapear una tabla de referencia a una columna:

In [42]:
maps = {"movie": "pelicula", "tv show": "television"}
new_types = df.type.map(maps)
print(new_types)

Dask Series Structure:
npartitions=20
    object
       ...
     ...  
       ...
       ...
Name: type, dtype: object
Dask Name: map, 60 tasks


Veamos el resultado evaluado:

In [43]:
print(new_types.compute())

0         pelicula
1       television
2       television
3         pelicula
4         pelicula
           ...    
8785    television
8786    television
8787    television
8788    television
8789    television
Name: type, Length: 8790, dtype: object


* `assign`: permite crear nuevas columnas:

In [44]:
df2 = df.assign(
        new_col = 1,
        types_spa = df.type.map(maps)
        )
print(df2)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in new_col types_spa
npartitions=20                                                                                                              
                object  object  object   object  object     object        int64  object   object    object   int64    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...     ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...     ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...     ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...     ...       ...
Dask Name: assign, 100 tasks


Veamos los primeros 5 registros:

In [45]:
print(df2.head(5))

  show_id     type                             title         director  \
0      s1    movie              dick johnson is dead  kirsten johnson   
1      s3  tv show                         ganglands  julien leclercq   
2      s6  tv show                     midnight mass    mike flanagan   
3     s14    movie  confessions of an invisible girl    bruno garotti   
4      s8    movie                           sankofa     haile gerima   

         country date_added  release_year rating duration  \
0  united states  9/25/2021          2020     pg      min   
1         france  9/24/2021          2021   tvma   season   
2  united states  9/24/2021          2021   tvma   season   
3         brazil  9/22/2021          2021   tvpg      min   
4  united states  9/24/2021          1993   tvma      min   

                                           listed_in  new_col   types_spa  
0                                      documentaries        1    pelicula  
1  crime tv shows international tv shows t

* `groupby`: funciona igual que su análogo en `pandas` pero este se ejecuta de forma distribuida:

In [46]:
res = df.groupby("type").agg({"title": "count"})
print(res)

Dask DataFrame Structure:
               title
npartitions=1       
               int64
                 ...
Dask Name: aggregate-agg, 44 tasks


Veamos el resultado:

In [47]:
print(res.compute())

         title
type          
movie     6126
tv show   2664


* `merge`: al igual que en `pandas`, podemos unir dos o más `DataFrames` por medio de las operaciones `merge` y `join` (recuerde que `dask` lo hace de forma distribuida, lo cual es muy útil para cruzar tablas muy grandes). Veamos un ejemplo donde definimos dos `DataFrames`:

In [48]:
data = {
        "col1": [1, 2, 3, 4, 5],
        "col2": ["a", "b", "c", "d", "e"]
        }
df1 = dd.from_dict(data, npartitions=1)
print(df1)

AttributeError: ignored

In [49]:
data = {
        "col1": [1, 2, 3, 4],
        "col3": ["perro", "gato", "pajaro", "pez"]
        }
df2 = dd.from_dict(data, npartitions=1)
print(df2)

AttributeError: ignored

Vamos a unir los dos `DataFrame` con la operación `merge`:

In [50]:
res = df1.merge(df2, on="col1")
print(res)

NameError: ignored

Veamos el resultado:

In [51]:
print(res.compute())

         title
type          
movie     6126
tv show   2664


## **6. Filtrado**
---

La sintaxis para la selección de valores en `DataFrame` de `dask` es muy parecida a `pandas`, no obstante, hay algunas consideraciones en cuanto a que no se recomienda el indexado posicional, en especial porque tenemos datos distribuidos de los que no conocemos directamente su tamaño. Veamos algunos ejemplos:

* Para seleccionar columnas, podemos indexar el `DataFrame` como si fuera un diccionario, por ejemplo:

In [52]:
df2 = df[["type", "show_id"]]
print(df2.columns)

Index(['type', 'show_id'], dtype='object')


* Podemos usar el método `loc` para extraer datos con respecto a su índice:

In [53]:
df2 = df.loc[20:30]
print(df2)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: try_loc, 40 tasks


No obstante, el método `iloc` (indexado posicional) no funciona correctamente:

In [54]:
try:
    df2 = df.iloc[:10]
except Exception as e:
    print(e)

'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.


* Podemos hacer selecciones condicionales de la misma forma que en `pandas`, por ejemplo, seleccionamos todos los registros correspondientes al tipo `movie`:

In [55]:
df2 = df[df.type == "movie"]
print(df2)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: getitem, 80 tasks


Calculamos el resultado:

In [56]:
print(df2.compute())

     show_id   type                             title         director  \
0         s1  movie              dick johnson is dead  kirsten johnson   
3        s14  movie  confessions of an invisible girl    bruno garotti   
4         s8  movie                           sankofa     haile gerima   
6        s10  movie                      the starling   theodore melfi   
7       s939  movie   motu patlu in the game of zones      suhas kadav   
...      ...    ...                               ...              ...   
8702   s8232  movie                          the bund        not given   
8707   s8269  movie                  the darkest dawn        not given   
8716   s8331  movie                  the great battle        not given   
8763   s8648  movie        twisted trunk big fat body        not given   
8783   s8785  movie              yoko and his friends        not given   

             country date_added  release_year rating duration  \
0      united states  9/25/2021          2020 

* También podemos usar el método `query` para seleccionar valores de acuerdo a un criterio, tal y como funciona en `pandas`:

In [57]:
df2 = df.query("type == 'movie'")
print(df2)

Dask DataFrame Structure:
               show_id    type   title director country date_added release_year  rating duration listed_in
npartitions=20                                                                                            
                object  object  object   object  object     object        int64  object   object    object
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
...                ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
                   ...     ...     ...      ...     ...        ...          ...     ...      ...       ...
Dask Name: query, 40 tasks


Calculamos el resultado:

In [58]:
print(df2.compute())

     show_id   type                             title         director  \
0         s1  movie              dick johnson is dead  kirsten johnson   
3        s14  movie  confessions of an invisible girl    bruno garotti   
4         s8  movie                           sankofa     haile gerima   
6        s10  movie                      the starling   theodore melfi   
7       s939  movie   motu patlu in the game of zones      suhas kadav   
...      ...    ...                               ...              ...   
8702   s8232  movie                          the bund        not given   
8707   s8269  movie                  the darkest dawn        not given   
8716   s8331  movie                  the great battle        not given   
8763   s8648  movie        twisted trunk big fat body        not given   
8783   s8785  movie              yoko and his friends        not given   

             country date_added  release_year rating duration  \
0      united states  9/25/2021          2020 

## **7. Comparativa con Pandas**
---

Veamos una comparativa en tiempo y memoria con respecto a los `DataFrame` de `pandas`; para ello, usaremos las librerías `psutil` y `time`:

In [59]:
import psutil, time

Veamos la diferencia en memoria entre la carga del `DataFrame` desde `pandas` y desde `dask`:

* `pandas`:

In [60]:
memory = psutil.virtual_memory()[2]
print(f"RAM inicial: {memory:.2f}%")

RAM inicial: 9.10%


Cargamos el conjunto de datos con `pandas`:

In [61]:
df_pandas = pd.read_parquet("netflix2.parquet")

Veamos qué tanto subió la memoria RAM:

In [62]:
memory = psutil.virtual_memory()[2]
print(f"RAM final: {memory:.2f}%")

RAM final: 9.10%


* `dask`:

In [63]:
memory = psutil.virtual_memory()[2]
print(f"RAM inicial: {memory:.2f}%")

RAM inicial: 9.10%


Cargamos el conjunto de datos con `dask`:

In [64]:
df_dask = dd.read_parquet("netflix2.parquet")

Veamos qué tanto subió la memoria RAM:

In [65]:
memory = psutil.virtual_memory()[2]
print(f"RAM final: {memory:.2f}%")

RAM final: 9.10%


Como puede ver `dask` no carga directamente el conjunto de datos en memoria. Ahora veamos una comparativa en tiempo en operaciones de agregación (se ven beneficiadas de paralelismo).

* `pandas`:

In [66]:
t0 = time.time()
res = df_pandas.groupby(["type", "country"]).agg({"release_year": "mean"})
delta_t = time.time() - t0
print(f"Segundos: {delta_t:.2f}")

Segundos: 0.01


* `dask`:

In [67]:
t0 = time.time()
res = (
        df_dask
        .groupby(["type", "country"])
        .agg({"release_year": "mean"})
        .compute()
        )
delta_t = time.time() - t0
print(f"Segundos: {delta_t:.2f}")

Segundos: 0.27


Como podemos ver, el resultado demora más en `dask`. Recuerde que esta herramienta debe coordinar y gestionar multiples `DataFrames` que están contenidos en varias particiones, esto permite trabajar con grandes cantidades de datos. No obstante, si el conjunto de datos es pequeño `pandas` sigue siendo la mejor opción.

## **8. Recursos Adicionales**
---

* [DataFrames de Dask](https://docs.dask.org/en/stable/dataframe.html).
* [Dask - Talks & tutorials](https://docs.dask.org/en/stable/presentations.html).

## **9. Créditos**
---

**Profesor**

- [Jorge E. Camargo, PhD](https://dis.unal.edu.co/~jecamargom/)

**Diseño, desarrollo del notebook y material audiovisual**

- [Juan S. Lara MSc](https://www.linkedin.com/in/juan-sebastian-lara-ramirez-43570a214/)

**Universidad Nacional de Colombia** - *Facultad de Ingeniería*