# Cria base de dados para exemplo

* Uma observação para cada segundo entre 1-Jan-2021 até 28-Fev-2021.
* Um arquivo CSV para cada dia do mês (31 para Janeiro + 28 para Fevereiro = 59 no total)
* Colunas: _Timestamp_ + 5 colunas numéricas aleatórias + 1 coluna categórica

In [93]:
import pandas as pd
import numpy as np

dates = pd.date_range(start="1-1-2021", end="2-28-2021")

for dt in dates:
    ind_day = pd.date_range(start="%s 0:0:0"%dt.date(), end="%s 23:59:59"%dt.date(), freq="S")
    data = np.random.rand(len(ind_day), 5)

    cls = np.random.choice(["Class1", "Class2", "Class3", "Class4","Class5"], size=len(ind_day))

    df = pd.DataFrame(data=data, columns=["A", "B", "C", "D", "E"], index=ind_day)
    df["Type"] = cls

    df.to_csv("dask_dataframes/%s.csv"%dt.date())

In [94]:
import os
os.listdir("dask_dataframes/")

['2021-01-01.csv',
 '2021-01-02.csv',
 '2021-01-03.csv',
 '2021-01-04.csv',
 '2021-01-05.csv',
 '2021-01-06.csv',
 '2021-01-07.csv',
 '2021-01-08.csv',
 '2021-01-09.csv',
 '2021-01-10.csv',
 '2021-01-11.csv',
 '2021-01-12.csv',
 '2021-01-13.csv',
 '2021-01-14.csv',
 '2021-01-15.csv',
 '2021-01-16.csv',
 '2021-01-17.csv',
 '2021-01-18.csv',
 '2021-01-19.csv',
 '2021-01-20.csv',
 '2021-01-21.csv',
 '2021-01-22.csv',
 '2021-01-23.csv',
 '2021-01-24.csv',
 '2021-01-25.csv',
 '2021-01-26.csv',
 '2021-01-27.csv',
 '2021-01-28.csv',
 '2021-01-29.csv',
 '2021-01-30.csv',
 '2021-01-31.csv',
 '2021-02-01.csv',
 '2021-02-02.csv',
 '2021-02-03.csv',
 '2021-02-04.csv',
 '2021-02-05.csv',
 '2021-02-06.csv',
 '2021-02-07.csv',
 '2021-02-08.csv',
 '2021-02-09.csv',
 '2021-02-10.csv',
 '2021-02-11.csv',
 '2021-02-12.csv',
 '2021-02-13.csv',
 '2021-02-14.csv',
 '2021-02-15.csv',
 '2021-02-16.csv',
 '2021-02-17.csv',
 '2021-02-18.csv',
 '2021-02-19.csv',
 '2021-02-20.csv',
 '2021-02-21.csv',
 '2021-02-22

# Cria cluster do dask

In [95]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit="1GB")
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 49168 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:49168/status,

0,1
Dashboard: http://127.0.0.1:49168/status,Workers: 2
Total threads: 4,Total memory: 1.86 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49169,Workers: 2
Dashboard: http://127.0.0.1:49168/status,Total threads: 4
Started: Just now,Total memory: 1.86 GiB

0,1
Comm: tcp://127.0.0.1:49184,Total threads: 2
Dashboard: http://127.0.0.1:49185/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:49173,
Local directory: C:\Users\felbu\Documents\Python_EPGE_2022\10-Aulas\dask-worker-space\worker-ci4hwnwc,Local directory: C:\Users\felbu\Documents\Python_EPGE_2022\10-Aulas\dask-worker-space\worker-ci4hwnwc

0,1
Comm: tcp://127.0.0.1:49187,Total threads: 2
Dashboard: http://127.0.0.1:49188/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:49172,
Local directory: C:\Users\felbu\Documents\Python_EPGE_2022\10-Aulas\dask-worker-space\worker-_en8mh9s,Local directory: C:\Users\felbu\Documents\Python_EPGE_2022\10-Aulas\dask-worker-space\worker-_en8mh9s


# Lê base de dados

In [96]:
import dask.dataframe as dd

In [97]:
df = dd.read_csv('dask_dataframes/2021-*-*.csv')

In [98]:
df.head()

Unnamed: 0.1,Unnamed: 0,A,B,C,D,E,Type
0,2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3
1,2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5
2,2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2
3,2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2
4,2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3


In [99]:
df.tail()

Unnamed: 0.1,Unnamed: 0,A,B,C,D,E,Type
86395,2021-02-28 23:59:55,0.412063,0.01843,0.004757,0.023691,0.90825,Class5
86396,2021-02-28 23:59:56,0.398651,0.804245,0.545502,0.643793,0.715667,Class5
86397,2021-02-28 23:59:57,0.835641,0.489529,0.600274,0.920222,0.445715,Class2
86398,2021-02-28 23:59:58,0.839915,0.888782,0.354771,0.341346,0.97144,Class4
86399,2021-02-28 23:59:59,0.747639,0.139655,0.842183,0.567116,0.507573,Class2


In [100]:
df.dtypes

Unnamed: 0     object
A             float64
B             float64
C             float64
D             float64
E             float64
Type           object
dtype: object

In [101]:
df = df.rename(columns={'Unnamed: 0':'date'}).set_index('date')

In [102]:
df.head()

Unnamed: 0_level_0,A,B,C,D,E,Type
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3
2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5
2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2
2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2
2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3


In [103]:
df.tail()

Unnamed: 0_level_0,A,B,C,D,E,Type
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-02-28 23:59:55,0.412063,0.01843,0.004757,0.023691,0.90825,Class5
2021-02-28 23:59:56,0.398651,0.804245,0.545502,0.643793,0.715667,Class5
2021-02-28 23:59:57,0.835641,0.489529,0.600274,0.920222,0.445715,Class2
2021-02-28 23:59:58,0.839915,0.888782,0.354771,0.341346,0.97144,Class4
2021-02-28 23:59:59,0.747639,0.139655,0.842183,0.567116,0.507573,Class2


In [104]:
df.dtypes

A       float64
B       float64
C       float64
D       float64
E       float64
Type     object
dtype: object

In [105]:
df.columns

Index(['A', 'B', 'C', 'D', 'E', 'Type'], dtype='object')

In [106]:
#Não está como datetime!
df.index

Dask Index Structure:
npartitions=59
2021-01-01 00:00:00    object
2021-01-02 00:00:00       ...
                        ...  
2021-02-28 00:00:00       ...
2021-02-28 23:59:59       ...
Name: date, dtype: object
Dask Name: sort_index, 295 tasks

In [132]:
df = dd.read_csv("dask_dataframes/2021-*-*.csv",
                 names=["date", "A", "B", "C", "D", "E", "Type"],
                 skiprows=1,
                 parse_dates=["date",]
                )

In [133]:
df.head()

Unnamed: 0,date,A,B,C,D,E,Type
0,2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3
1,2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5
2,2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2
3,2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2
4,2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3


In [134]:
df.dtypes

date    datetime64[ns]
A              float64
B              float64
C              float64
D              float64
E              float64
Type            object
dtype: object

# Delayed computation

## Exemplo 1

In [135]:
df['A'].max()

dd.Scalar<series-..., dtype=float64>

In [136]:
df['A'].max().compute()

0.9999996676628308

## Exemplo 2

In [137]:
df.groupby('Type').mean()

Unnamed: 0_level_0,A,B,C,D,E
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,float64,float64,float64,float64,float64
,...,...,...,...,...


In [138]:
df.groupby('Type').mean().compute()

Unnamed: 0_level_0,A,B,C,D,E
Type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Class1,0.500021,0.500159,0.499956,0.49964,0.49977
Class2,0.499678,0.499987,0.499696,0.50018,0.499881
Class3,0.500175,0.499797,0.499966,0.5001,0.499826
Class4,0.500274,0.499719,0.500275,0.500228,0.500256
Class5,0.499999,0.500522,0.500138,0.500192,0.500234


## Exemplo 3

In [139]:
df['F'] = df['A'] + df['B']
df['F'] = df['F'] - df['F'].mean()
media = df['F'].mean()
media

dd.Scalar<series-..., dtype=float64>

In [140]:
media.compute()

-1.3143841877348196e-16

## Exceções (métodos com compute implícito)

In [141]:
df.head()

Unnamed: 0,date,A,B,C,D,E,Type,F
0,2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3,0.189947
1,2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5,0.304757
2,2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2,0.86175
3,2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2,-0.48755
4,2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3,-0.461997


In [142]:
df.tail()

Unnamed: 0,date,A,B,C,D,E,Type,F
86395,2021-02-28 23:59:55,0.412063,0.01843,0.004757,0.023691,0.90825,Class5,-0.569572
86396,2021-02-28 23:59:56,0.398651,0.804245,0.545502,0.643793,0.715667,Class5,0.202831
86397,2021-02-28 23:59:57,0.835641,0.489529,0.600274,0.920222,0.445715,Class2,0.325104
86398,2021-02-28 23:59:58,0.839915,0.888782,0.354771,0.341346,0.97144,Class4,0.728631
86399,2021-02-28 23:59:59,0.747639,0.139655,0.842183,0.567116,0.507573,Class2,-0.112771


In [143]:
#Cuidado
df.sample(frac=1e-5)

Unnamed: 0_level_0,date,A,B,C,D,E,Type,F
npartitions=59,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,datetime64[ns],float64,float64,float64,float64,float64,object,float64
,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...


In [144]:
df.sample(frac=1e-5).compute()

Unnamed: 0,date,A,B,C,D,E,Type,F
42468,2021-01-01 11:47:48,0.462923,0.540652,0.61129,0.165053,0.618305,Class5,0.003509
11786,2021-01-02 03:16:26,0.563997,0.999124,0.004403,0.336384,0.840461,Class2,0.563055
74913,2021-01-03 20:48:33,0.375349,0.946426,0.849838,0.845396,0.744662,Class3,0.321709
70103,2021-01-04 19:28:23,0.085109,0.34764,0.677882,0.384983,0.086745,Class3,-0.567316
23517,2021-01-05 06:31:57,0.710931,0.538663,0.026305,0.306726,0.855716,Class2,0.249529
78883,2021-01-06 21:54:43,0.634284,0.228821,0.274427,0.68796,0.784981,Class5,-0.136961
9835,2021-01-07 02:43:55,0.864179,0.010765,0.947466,0.846413,0.899201,Class4,-0.125122
36174,2021-01-08 10:02:54,0.809037,0.001634,0.750501,0.740311,0.268889,Class5,-0.189395
68373,2021-01-09 18:59:33,0.839179,0.750887,0.459599,0.188967,0.965038,Class2,0.590001
78160,2021-01-10 21:42:40,0.647971,0.400651,0.464043,0.073004,0.877701,Class5,0.048557


# Semelhanças com pandas

A grande vantagem de Dask sobre, por exemplo, Spark

## Filtrar linhas

In [145]:
df_1 = df[df.Type == 'Class1']

In [146]:
df_1.head()

Unnamed: 0,date,A,B,C,D,E,Type,F
7,2021-01-01 00:00:07,0.827363,0.478525,0.82615,0.728583,0.134825,Class1,0.305822
10,2021-01-01 00:00:10,0.412788,0.630448,0.956777,0.235059,0.896744,Class1,0.04317
15,2021-01-01 00:00:15,0.827425,0.857359,0.75448,0.996289,0.672775,Class1,0.684719
53,2021-01-01 00:00:53,0.825701,0.899504,0.264561,0.261113,0.267173,Class1,0.725139
59,2021-01-01 00:00:59,0.737701,0.734716,0.432248,0.316197,0.834699,Class1,0.47235


## Selecionar colunas

In [147]:
df_num = df[['A','B','C','D','E','F']]

In [148]:
df_num.head()

Unnamed: 0,A,B,C,D,E,F
0,0.77139,0.418623,0.623215,0.401876,0.004995,0.189947
1,0.864748,0.440075,0.819741,0.478412,0.86675,0.304757
2,0.864099,0.997717,0.568261,0.435532,0.14211,0.86175
3,0.266226,0.24629,0.889184,0.231991,0.841046,-0.48755
4,0.190853,0.347216,0.48063,0.422721,0.290855,-0.461997


## Estatísticas simples

In [149]:
df.A.mean().compute()

0.5000292140315263

## Group by

In [154]:
df[['Type','A','B','C']].groupby('Type').agg(['mean','std']).compute()

Unnamed: 0_level_0,A,A,B,B,C,C
Unnamed: 0_level_1,mean,std,mean,std,mean,std
Type,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
Class1,0.500021,0.28866,0.500159,0.288619,0.499956,0.288754
Class2,0.499678,0.288692,0.499987,0.288781,0.499696,0.288975
Class3,0.500175,0.288799,0.499797,0.288944,0.499966,0.2888
Class4,0.500274,0.288835,0.499719,0.288691,0.500275,0.288721
Class5,0.499999,0.288547,0.500522,0.289014,0.500138,0.288558


# Resampling

In [205]:
df_hora = df.resample("1H")

In [206]:
df_media_hora = df_hora.mean()

In [207]:
df_media_hora

Unnamed: 0_level_0,A,B,C,D,E,F
npartitions=59,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-01-01 00:00:00,float64,float64,float64,float64,float64,float64
2021-01-02 00:00:00,...,...,...,...,...,...
...,...,...,...,...,...,...
2021-02-28 00:00:00,...,...,...,...,...,...
2021-02-28 23:00:00,...,...,...,...,...,...


In [208]:
df_media_hora.head()

Unnamed: 0_level_0,A,B,C,D,E,F
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-01-01 00:00:00,0.502897,0.501565,0.503678,0.497192,0.500488,0.004396
2021-01-01 01:00:00,0.497565,0.49548,0.498473,0.50344,0.500186,-0.007021
2021-01-01 02:00:00,0.498016,0.505579,0.502972,0.504882,0.495799,0.003529
2021-01-01 03:00:00,0.498847,0.502555,0.50154,0.497835,0.503687,0.001336
2021-01-01 04:00:00,0.504765,0.48704,0.502415,0.504662,0.501845,-0.008261


# Partições

In [179]:
df.npartitions

59

In [155]:
df.partitions[0]

Unnamed: 0_level_0,date,A,B,C,D,E,Type,F
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,datetime64[ns],float64,float64,float64,float64,float64,object,float64
,...,...,...,...,...,...,...,...


In [156]:
df.partitions[0].head()

Unnamed: 0,date,A,B,C,D,E,Type,F
0,2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3,0.189947
1,2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5,0.304757
2,2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2,0.86175
3,2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2,-0.48755
4,2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3,-0.461997


In [160]:
df = df.set_index('date')

In [173]:
df_hora = df.repartition(freq="1H")

In [174]:
df_hora.npartitions

1416

In [175]:
#Visualmente, não muda nada
df_hora.head()

Unnamed: 0_level_0,A,B,C,D,E,Type,F
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2021-01-01 00:00:00,0.77139,0.418623,0.623215,0.401876,0.004995,Class3,0.189947
2021-01-01 00:00:01,0.864748,0.440075,0.819741,0.478412,0.86675,Class5,0.304757
2021-01-01 00:00:02,0.864099,0.997717,0.568261,0.435532,0.14211,Class2,0.86175
2021-01-01 00:00:03,0.266226,0.24629,0.889184,0.231991,0.841046,Class2,-0.48755
2021-01-01 00:00:04,0.190853,0.347216,0.48063,0.422721,0.290855,Class3,-0.461997


In [213]:
df_semana = df.repartition(freq='7D')
df_semana.npartitions

9

In [210]:
#Aplicar uma função a cada partição
df_semana.manpartitionsartitions(lambda x: x.A.mean())

Dask Series Structure:
npartitions=59
2021-01-01 00:00:00    float64
2021-01-02 00:00:00        ...
                        ...   
2021-02-28 00:00:00        ...
2021-02-28 23:59:59        ...
dtype: float64
Dask Name: lambda, 770 tasks

In [215]:
df_semana.map_partitions(lambda x: x.A.mean()).compute()

0    0.499922
1    0.499982
2    0.500394
3    0.499533
4    0.500046
5    0.499716
6    0.500151
7    0.500322
8    0.500296
dtype: float64

# O dashboard do Dask

(mostrar o dashboard)