Ponerlo en el env


Para empezar a usar dask:
pip install dask o pip install dask[complete]


pip install graphviz

In [2]:
import numpy as np
import pandas as pd
import urllib
import os

import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

# No todos los imports son necesarios, depende del tipo de datos con el que estes trabajando.

## Dask Bag

In [3]:
bg = db.from_sequence([1,2,3,4,5,6,7,8,9,10], npartitions = 2)
bg
# bg.compute()
# res = list(bg)

dask.bag<from_sequence, npartitions=2>

In [4]:
# Lectura de archivos de texto a bags y viceversa
# bg2 = db.read_text('example.txt')
# bg2.to_textfiles(dir/)

## Dask Array

Un dask array se compone de varios numpy arrays, a los que llamaremos "chunks".

![image.png](attachment:image.png)

Para optimizar el uso de dask arrays tenemos que escoger adecuadamente el tamaño de nuestros chunks.
- Si son muy grandes entonces nos terminaremos el espacio en memoria.
- Si son muy pequeños entonces dask puede perder su funcionalidad.
* Tamaño óptimo: entre 100MB y 1G. Número de cores * 2


Por lo tanto, debemos de alinear el número de chunks con nuestro formato en dicso. Si los chunks no son múltiplo  podríamos hacer que Dask pierda eficiencia, haciendo que repita múltiples veces lo que ya hicimos antes.

Expensive!


Por ejemplo, si tenemos un file que tiene chunks de tamaños (128,64) ahora tenemos que escoger un array con chunks de (1280, 64000).

In [5]:
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x
# x.compute()

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Hay varias maneras de usar los chunks:
- bloques simetricos --> chunks = n
- bloques asimetricos --> chunks = (n,m)
- bloques asimetricos sin repeticion --> chunks = ((2,4),(3,3)) o chunks = ((5,5,5,5),(8,8,8))
- sin usar chunks --> chunks = None

![image.png](attachment:image.png)


In [6]:
y = da.random.random((10000, 10000), chunks='auto')  # optimiza la cantidad de chunks
y
# y.compute()
# Si necesitamos cambiar el lay-out de los chunks podemos usar rechunk()
# Si necesitamlos cambiar la estructura del dask array usamos reshape()

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,128.00 MiB
Shape,"(10000, 10000)","(4096, 4096)"
Dask graph,9 chunks in 1 graph layer,9 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 762.94 MiB 128.00 MiB Shape (10000, 10000) (4096, 4096) Dask graph 9 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,128.00 MiB
Shape,"(10000, 10000)","(4096, 4096)"
Dask graph,9 chunks in 1 graph layer,9 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


**`Xarray`**

Es un package wrap que te ofrece algunos extras para trabajar con datasets mas complicados. Te permite:
1. Manejar multiples datasets y ordenarlos de manera cohesiva como un solo dataset.
2. Leer de varios stacks al mismo tiempo.
3. Intercambiar el uso de dask array y munpy array.


**`Operaciones`**

 - dask.array.blockwise(func, out_ind, *args[, name, ...])

 - map_blocks(func, *args[, name, token, ...])

 - map_overlap(func, *args[, depth, boundary, ...])

 - reduction(x, chunk, aggregate[, axis, ...])


**`Slicing`**

In [7]:
s1 = x[0, :15]
s1
# s1.compute()

Unnamed: 0,Array,Chunk
Bytes,120 B,120 B
Shape,"(15,)","(15,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 120 B 120 B Shape (15,) (15,) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",15  1,

Unnamed: 0,Array,Chunk
Bytes,120 B,120 B
Shape,"(15,)","(15,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [8]:
s2 = x[[1,28,62]]
s2
# s2.compute()

Unnamed: 0,Array,Chunk
Bytes,234.38 kiB,23.44 kiB
Shape,"(3, 10000)","(3, 1000)"
Dask graph,10 chunks in 2 graph layers,10 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 234.38 kiB 23.44 kiB Shape (3, 10000) (3, 1000) Dask graph 10 chunks in 2 graph layers Data type float64 numpy.ndarray",10000  3,

Unnamed: 0,Array,Chunk
Bytes,234.38 kiB,23.44 kiB
Shape,"(3, 10000)","(3, 1000)"
Dask graph,10 chunks in 2 graph layers,10 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [9]:
s3 = y[y < 0.468]
s3
# s3.compute()

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Dask graph,9 chunks in 8 graph layers,9 chunks in 8 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan,) (nan,) Dask graph 9 chunks in 8 graph layers Data type float64 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan,)","(nan,)"
Dask graph,9 chunks in 8 graph layers,9 chunks in 8 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Documentacion del Indexing : _https://docs.dask.org/en/stable/array-assignment.html_

**`Stack y Concat`**

In [10]:
arr1 = da.from_array(np.zeros((3, 4)), chunks=(1, 2))
arr2 = da.from_array(np.ones((3, 4)), chunks=(1, 2))

arr1.compute()


array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

In [11]:
arr2.compute()

array([[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]])

In [12]:
data = [arr1,arr2]
x = da.stack(data, axis=0)
y = da.stack(data, axis=1)

x

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(2, 3, 4)","(1, 1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 192 B 16 B Shape (2, 3, 4) (1, 1, 2) Dask graph 12 chunks in 3 graph layers Data type float64 numpy.ndarray",4  3  2,

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(2, 3, 4)","(1, 1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [13]:
y

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(3, 2, 4)","(1, 1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 192 B 16 B Shape (3, 2, 4) (1, 1, 2) Dask graph 12 chunks in 3 graph layers Data type float64 numpy.ndarray",4  2  3,

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(3, 2, 4)","(1, 1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [14]:
a = da.concatenate(data, axis=0)
b = da.concatenate(data, axis=1)

a

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(6, 4)","(1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 192 B 16 B Shape (6, 4) (1, 2) Dask graph 12 chunks in 3 graph layers Data type float64 numpy.ndarray",4  6,

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(6, 4)","(1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [15]:
b

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(3, 8)","(1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 192 B 16 B Shape (3, 8) (1, 2) Dask graph 12 chunks in 3 graph layers Data type float64 numpy.ndarray",8  3,

Unnamed: 0,Array,Chunk
Bytes,192 B,16 B
Shape,"(3, 8)","(1, 2)"
Dask graph,12 chunks in 3 graph layers,12 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## Dask Dataframes


Un dataframe de Dask se compone de dataframes de pandas

![image.png](attachment:image.png)

**Usos**
1. Manipulacion de datasets muy grandes, incluso aquellos que exceden la memoria disponible.
2. Aceleracion de los procesos.
3. Distribucion de tareas usando como apoyo pandas.

**Limitaciones**
1. Indexar una columna desordenada.
2. groupby, join --> expensive.
3. Dask no implementa algunas funciones de pandas.

**`Creando el Dataframe`**

In [16]:
# Ejemplo

# Primero creamos un dataframe
index = pd.date_range("2020-07-03", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400), "b": list("abcaddbe" * 300)}, index=index)
# df

# Luego, podemos agregar las particiones que queramos al dataframe usando dask
ddf = dd.from_pandas(df, npartitions=10)  # (dataframe a usar, numero de particiones que queremos)
ddf
# ddf.compute()

Unnamed: 0_level_0,a,b
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-07-03 00:00:00,int32,object
2020-07-13 00:00:00,...,...
...,...,...
2020-10-01 00:00:00,...,...
2020-10-10 23:00:00,...,...


In [17]:
#Revisando la estructura del dataframe

ddf.map_partitions(type).compute()

0    <class 'pandas.core.frame.DataFrame'>
1    <class 'pandas.core.frame.DataFrame'>
2    <class 'pandas.core.frame.DataFrame'>
3    <class 'pandas.core.frame.DataFrame'>
4    <class 'pandas.core.frame.DataFrame'>
5    <class 'pandas.core.frame.DataFrame'>
6    <class 'pandas.core.frame.DataFrame'>
7    <class 'pandas.core.frame.DataFrame'>
8    <class 'pandas.core.frame.DataFrame'>
9    <class 'pandas.core.frame.DataFrame'>
dtype: object

In [18]:
div = ddf.divisions # nos permite checar los intervalos de cada partición
print(div)

# Accediendo a una partición específica, tiene formato df
p1 = ddf.partitions[1]
p1
# p1.compute()

# Tambien podemos utilizar repartition para volver a asignar recursos.
# DataFrame.repartition(divisions=[1,10,50, 100, ... , 2400], npartitions=None, partition_size=None, freq='12h', force=False)

(Timestamp('2020-07-03 00:00:00', freq='H'), Timestamp('2020-07-13 00:00:00', freq='H'), Timestamp('2020-07-23 00:00:00', freq='H'), Timestamp('2020-08-02 00:00:00', freq='H'), Timestamp('2020-08-12 00:00:00', freq='H'), Timestamp('2020-08-22 00:00:00', freq='H'), Timestamp('2020-09-01 00:00:00', freq='H'), Timestamp('2020-09-11 00:00:00', freq='H'), Timestamp('2020-09-21 00:00:00', freq='H'), Timestamp('2020-10-01 00:00:00', freq='H'), Timestamp('2020-10-10 23:00:00', freq='H'))


Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-07-13,int32,object
2020-07-23,...,...


In [19]:
# Revisando como se ocupa la memoria
mem = ddf.memory_usage_per_partition(index=True, deep=True)
mem.compute()

0    16800
1    16800
2    16800
3    16800
4    16800
5    16800
6    16800
7    16800
8    16800
9    16800
dtype: int64

**Ejemplo de sintaxis si queremos leer de un csv file**


ddf2 = dd.read_csv('nombre del file.csv', blocksize = 80e6) # 80MB por cada particion
ddf2


**Aprovechndo la similitud de los nombres de archivo**


ddf3 = dd.read_csv('2000-*-*.csv', parse_dates = ['timestamp'])         
ddf3

In [20]:
# Otro ejemplo de dataframes

print("- Downloading NYC Flights dataset... ", end='', flush=True)
url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, headers = urllib.request.urlretrieve(url, 'nycflights.tar.gz')
print("Done!", flush=True)

- Downloading NYC Flights dataset... Done!


In [21]:

df2 = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 assume_missing=True)

df2

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],float64,float64,float64,float64,float64,object,float64,float64,float64,float64,float64,float64,float64,object,object,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [22]:
df2.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1.0,1621.0,1540.0,1747.0,1701.0,US,33.0,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0.0,0.0
1,1990-01-02,2.0,1547.0,1540.0,1700.0,1701.0,US,33.0,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0.0,0.0
2,1990-01-03,3.0,1546.0,1540.0,1710.0,1701.0,US,33.0,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0.0,0.0
3,1990-01-04,4.0,1542.0,1540.0,1710.0,1701.0,US,33.0,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0.0,0.0
4,1990-01-05,5.0,1549.0,1540.0,1706.0,1701.0,US,33.0,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0.0,0.0


In [23]:
df2.columns

Index(['Date', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime',
       'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime',
       'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest',
       'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Diverted'],
      dtype='object')

In [20]:
df2.tail()

ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+---------+--------+----------+
| Column  | Found  | Expected |
+---------+--------+----------+
| TailNum | object | float64  |
+---------+--------+----------+

The following columns also raised exceptions on conversion:

- TailNum
  ValueError("could not convert string to float: 'N14342'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'TailNum': 'object'}

to the call to `read_csv`/`read_table`.

In [21]:
df2 = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str},
                 assume_missing=True)

In [22]:
df2.tail()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
70320,1995-04-06,4.0,2013.0,2015.0,2250.0,2257.0,CO,751.0,N14980,157.0,...,128.0,-7.0,-2.0,EWR,MCO,938.0,6.0,23.0,0.0,0.0
70321,1995-04-07,5.0,2016.0,2015.0,2250.0,2257.0,CO,751.0,N16883,154.0,...,126.0,-7.0,1.0,EWR,MCO,938.0,5.0,23.0,0.0,0.0
70322,1995-04-08,6.0,2034.0,2015.0,2312.0,2257.0,CO,751.0,N11984,158.0,...,129.0,15.0,19.0,EWR,MCO,938.0,9.0,20.0,0.0,0.0
70323,1995-04-09,7.0,2051.0,2015.0,2318.0,2257.0,CO,751.0,N13983,147.0,...,121.0,21.0,36.0,EWR,MCO,938.0,6.0,20.0,0.0,0.0
70324,1995-04-10,1.0,201.0,,,,,,,,...,,,,,,,,,,


In [23]:
df2.compute()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1.0,1621.0,1540.0,1747.0,1701.0,US,33.0,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0.0,0.0
1,1990-01-02,2.0,1547.0,1540.0,1700.0,1701.0,US,33.0,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0.0,0.0
2,1990-01-03,3.0,1546.0,1540.0,1710.0,1701.0,US,33.0,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0.0,0.0
3,1990-01-04,4.0,1542.0,1540.0,1710.0,1701.0,US,33.0,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0.0,0.0
4,1990-01-05,5.0,1549.0,1540.0,1706.0,1701.0,US,33.0,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
70320,1995-04-06,4.0,2013.0,2015.0,2250.0,2257.0,CO,751.0,N14980,157.0,...,128.0,-7.0,-2.0,EWR,MCO,938.0,6.0,23.0,0.0,0.0
70321,1995-04-07,5.0,2016.0,2015.0,2250.0,2257.0,CO,751.0,N16883,154.0,...,126.0,-7.0,1.0,EWR,MCO,938.0,5.0,23.0,0.0,0.0
70322,1995-04-08,6.0,2034.0,2015.0,2312.0,2257.0,CO,751.0,N11984,158.0,...,129.0,15.0,19.0,EWR,MCO,938.0,9.0,20.0,0.0,0.0
70323,1995-04-09,7.0,2051.0,2015.0,2318.0,2257.0,CO,751.0,N13983,147.0,...,121.0,21.0,36.0,EWR,MCO,938.0,6.0,20.0,0.0,0.0


In [31]:
df2.describe().compute()

Unnamed: 0,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,FlightNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
count,1383331.0,1353762.0,1383330.0,1349856.0,1383330.0,1383330.0,1349856.0,1383297.0,69944.0,1349856.0,1353761.0,1381835.0,70324.0,70324.0,1383330.0,1383330.0
mean,3.909684,1354.817,1346.016,1525.96,1531.578,738.5972,151.5442,152.8416,135.220862,6.5646,8.472505,843.7756,6.112636,19.618921,0.02137523,0.002822898
std,1.983435,458.1968,448.1485,481.4817,463.6213,593.4904,80.84441,79.77442,83.477934,31.69391,26.02491,620.0608,4.78699,9.711615,0.1446318,0.05305593
min,1.0,1.0,0.0,1.0,0.0,1.0,-13.0,0.0,12.0,-1437.0,-530.0,11.0,0.0,0.0,0.0,0.0
25%,2.0,926.0,920.0,1136.0,1135.0,317.0,96.0,100.0,71.0,-8.0,0.0,431.0,4.0,13.0,0.0,0.0
50%,4.0,1430.0,1430.0,1538.0,1546.0,681.0,149.0,153.0,121.0,1.0,1.0,760.0,5.0,18.0,0.0,0.0
75%,6.0,1801.0,1755.0,1952.0,1948.0,1472.0,197.0,194.0,165.0,14.0,7.0,1097.0,7.0,23.0,0.0,0.0
max,7.0,2400.0,2359.0,2400.0,2400.0,9851.0,703.0,726.0,419.0,991.0,1435.0,2586.0,150.0,200.0,1.0,1.0


**`Indexing`**

Para poder indexar los objetos de dask podemos hacerlo similar a como usualmente lo haciamos en pandas.

Tambien podemos utilizar la funcion set_index() pero con poca frecuencia porque la operacion es muy costosa. Sirve para reducir el tiempo de ejecucion. Similar a SQL.

In [None]:
# ddf.b nos permite ver la estructura de una columna
slice1 = ddf["2020-09-01": "2020-11-01 5:00"]
slice1
# slice1.compute()

# ddf = df.set_index('timestamp')  # set the index 

# ddf.loc['2001-01-05':'2001-01-12']  
# df.merge(df2, left_index=True, right_index=True)  

Unnamed: 0_level_0,a,b
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-09-01 00:00:00,int32,object
2020-09-11 00:00:00,...,...
2020-09-21 00:00:00,...,...
2020-10-01 00:00:00,...,...
2020-10-10 23:00:00,...,...


**`Joins`**

Dask permite:
1. Hacer join de 2 dataframes de dask (single partition)
2. Hacer join de 2 dataframes de dask utilizando los indices
3. Hacer join de un dataframe de dask con un dataframe de pandas

**`Persist`**

Usualmente cuando utilizamos dask dataframes el proceso de tasks es el siguiente:

1. Cargamos los datos de un archivo
2. Filtrar los datos a un subset en particular
3. "Shuffle data" 
(LFS : Load, Filter, Shuffle)
4. Hacer queries con el indexado anterior

Es ideal aplicar el proceso LFS para mantener los datos resultantes en memoria, y así despues utilizarlos varias veces sin tener que repetir la misma descarga de datos una y otra vez.

**Don't do this**

client.persist(df)  # persist doesn't change the input in-place

**Do this instead**

df = client.persist(df)  # replace your old lazy DataFrame

**`Computation`**: "Dask is lazily evaluated"

In [None]:
# Cada que tu quieras obtener el resultado de una operacion debes usar .compute()
slice1.compute()

Unnamed: 0,a,b
2020-09-01 00:00:00,1440,a
2020-09-01 01:00:00,1441,b
2020-09-01 02:00:00,1442,c
2020-09-01 03:00:00,1443,a
2020-09-01 04:00:00,1444,d
...,...,...
2020-10-10 19:00:00,2395,a
2020-10-10 20:00:00,2396,d
2020-10-10 21:00:00,2397,d
2020-10-10 22:00:00,2398,b


df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.balance < 0]
df = client.persist(df)

df = df.set_index('timestamp')
df = client.persist(df)

--> df.customer_id.nunique().compute()

18452844

In [None]:
%time 

#Retomando el ejemplo de los vuelos
df2.Distance.max().compute()

CPU times: total: 0 ns
Wall time: 0 ns


2586.0

In [None]:
# df2.DepDelay.max().visualize(rankdir="LR", size="12, 12!")

**`Funciones y Metodos`**

Hay algunas funciones de Dask que coinciden con las existentes de pandas. Para cada funcion solo hay que llamarla y darle .compute() para obtener los resultados.

Algunas funciones : read_csv(), head(), tail(), dropna(), fillna(), persist(), nunique(), pivot_table(), repartition(), pipe(), etc.

In [None]:
# Algunos ejemplos de funciones
ej1 = ddf.a.mean() 
ej1.compute()

# Si tenemos un dataframe muy grande para ahorrarnos tiempo y memoria podemos usar particiones

1199.5

In [None]:
# unique similar a distinct
ej2 = ddf.b.unique()
ej2.compute()

0    a
1    b
2    c
3    d
4    e
Name: b, dtype: object

Tambien podemos encadenar las funciones como en pandas

In [None]:
res = slice1.a.cumsum() - 42
res.compute()

2020-09-01 00:00:00       1398
2020-09-01 01:00:00       2839
2020-09-01 02:00:00       4281
2020-09-01 03:00:00       5724
2020-09-01 04:00:00       7168
                        ...   
2020-10-10 19:00:00    1833088
2020-10-10 20:00:00    1835484
2020-10-10 21:00:00    1837881
2020-10-10 22:00:00    1840279
2020-10-10 23:00:00    1842678
Freq: H, Name: a, Length: 960, dtype: int32

In [None]:
# Podemos tomar las columnas que queramos como en pandas
df_train = df2[['CRSDepTime',  'CRSArrTime', 'Cancelled']]
df_train
# df_train.compute()

Unnamed: 0_level_0,CRSDepTime,CRSArrTime,Cancelled
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,float64,float64,float64
,...,...,...
...,...,...,...
,...,...,...
,...,...,...


In [None]:
df_train.shape

(Delayed('int-bbf3c3b2-f0c1-4076-bb2d-c930c229cc3d'), 3)

**`Dask does Tasks`**

Siempre tenemos que llamar compute() para poder obtener los resultados. 

Pero tambien podemos usar las funciones .dask y .visualize() para ver las "tasks" que llevara a cabo Dask, ya sea en formato lista o diagrama


In [None]:
ej1.dask # Lista de pasos
# ej1.visualize() # Diagrama

0,1
"layer_type  MaterializedLayer  is_materialized  True  number of outputs  10  npartitions  10  columns  ['a', 'b']  type  dask.dataframe.core.DataFrame  dataframe_type  pandas.core.frame.DataFrame  series_dtypes  {'a': dtype('int32'), 'b': dtype('O')}",

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,10
npartitions,10
columns,"['a', 'b']"
type,dask.dataframe.core.DataFrame
dataframe_type,pandas.core.frame.DataFrame
series_dtypes,"{'a': dtype('int32'), 'b': dtype('O')}"

0,1
layer_type  Blockwise  is_materialized  True  number of outputs  10  depends on from_pandas-96367c21b264851aed24fc1821efd34c,

0,1
layer_type,Blockwise
is_materialized,True
number of outputs,10
depends on,from_pandas-96367c21b264851aed24fc1821efd34c

0,1
layer_type  Blockwise  is_materialized  True  number of outputs  10  depends on getitem-c736fabaaf1fa17adc4514eadaff6465,

0,1
layer_type,Blockwise
is_materialized,True
number of outputs,10
depends on,getitem-c736fabaaf1fa17adc4514eadaff6465

0,1
layer_type  DataFrameTreeReduction  is_materialized  True  number of outputs  1  depends on series-count-chunk-06817e1923bc0c5544af133abfe26928-d50c6b84d0df8f9ec44fb0350847b40c,

0,1
layer_type,DataFrameTreeReduction
is_materialized,True
number of outputs,1
depends on,series-count-chunk-06817e1923bc0c5544af133abfe26928-d50c6b84d0df8f9ec44fb0350847b40c

0,1
layer_type  Blockwise  is_materialized  True  number of outputs  10  depends on getitem-c736fabaaf1fa17adc4514eadaff6465,

0,1
layer_type,Blockwise
is_materialized,True
number of outputs,10
depends on,getitem-c736fabaaf1fa17adc4514eadaff6465

0,1
layer_type  DataFrameTreeReduction  is_materialized  True  number of outputs  1  depends on series-sum-chunk-3a41bb6afbe128b21f7c64b5a9541471-7fce84d23c11744f1daf8973ad8d5b28,

0,1
layer_type,DataFrameTreeReduction
is_materialized,True
number of outputs,1
depends on,series-sum-chunk-3a41bb6afbe128b21f7c64b5a9541471-7fce84d23c11744f1daf8973ad8d5b28

0,1
layer_type  MaterializedLayer  is_materialized  True  number of outputs  1  depends on series-sum-agg-3a41bb6afbe128b21f7c64b5a9541471  series-count-agg-06817e1923bc0c5544af133abfe26928,

0,1
layer_type,MaterializedLayer
is_materialized,True
number of outputs,1
depends on,series-sum-agg-3a41bb6afbe128b21f7c64b5a9541471
,series-count-agg-06817e1923bc0c5544af133abfe26928


Documentacion de todas las funciones de los dataframes : _https://docs.dask.org/en/stable/dataframe-api.html_