In [2]:
import dask.dataframe as dd
import dask.array as da
state = da.random.RandomState(1)

#### Construct dask dataset

In [3]:
size_rows = 10**9
size_cols = 7
size_chunk = 10**6
list_of_objects_1 = [f'ID{i}' for i in range(100)]
list_of_objects_2 = [f'ID{i}' for i in range(100, 200)]

data_object_1 = state.choice(a=list_of_objects_1, size=(size_rows,1), chunks=(size_chunk,1))
data_object_2 = state.choice(a=list_of_objects_1, size=(size_rows,1), chunks=(size_chunk,1))
data_float = state.random(size=(size_rows,size_cols), chunks=(size_chunk,size_cols))

ddf = dd.from_dask_array(data_object_1, columns=list('A'))
ddf_object_2 = dd.from_dask_array(data_object_2, columns=list('B'))
ddf_float = dd.from_dask_array(data_float, columns=list('CDEFGHI'))

ddf['B'] = ddf_object_2['B']
ddf['C'] = ddf_float['C']
ddf['D'] = ddf_float['D']
ddf['E'] = ddf_float['E']
ddf['F'] = ddf_float['F']
ddf['G'] = ddf_float['G']
ddf['H'] = ddf_float['H']
ddf['I'] = ddf_float['I']
ddf

Unnamed: 0_level_0,A,B,C,D,E,F,G,H,I
npartitions=1000,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,object,object,float64,float64,float64,float64,float64,float64,float64
1000000,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
999000000,...,...,...,...,...,...,...,...,...
999999999,...,...,...,...,...,...,...,...,...


#### Dataset look like

In [11]:
ddf.head(10)

Unnamed: 0,A,B,C,D,E,F,G,H,I
0,ID72,ID69,0.620333,0.447523,0.630952,0.27338,0.762568,0.279339,0.585494
1,ID35,ID35,0.944907,0.495363,0.095849,0.675129,0.179783,0.89593,0.287761
2,ID3,ID49,0.748195,0.637758,0.960402,0.448629,0.793454,0.976845,0.553017
3,ID20,ID94,0.428023,0.931614,0.754333,0.94086,0.460578,0.702318,0.932937
4,ID75,ID50,0.297353,0.710219,0.088402,0.635305,0.399633,0.823551,0.93793
5,ID66,ID35,0.053875,0.680031,0.387828,0.20053,0.720087,0.638489,0.760411
6,ID59,ID17,0.09167,0.081982,0.907554,0.247872,0.336054,0.947644,0.317975
7,ID24,ID34,0.426804,0.364789,0.831076,0.836025,0.65328,0.312214,0.131841
8,ID80,ID24,0.656172,0.577048,0.877196,0.891309,0.110991,0.318264,0.72528
9,ID37,ID8,0.674422,0.556494,0.922952,0.817823,0.767931,0.389167,0.281679


#### Dataset size

In [23]:
f'{ddf.head(10).memory_usage(index=False).sum()*(size_rows/10)*10**-9} GB'

'72.0 GB'

#### Dask groupby

Basic aggregation

In [3]:
import time
start_time = time.time()
print('Computation started')
ddf.groupby(['A'], dropna=False, observed=True).agg({'C':'sum', 'D':'sum', 'E':'sum'}).compute()
print("--- %s seconds ---" % (time.time() - start_time))

Computation started
--- 280.72916078567505 seconds ---


Advanced aggregation

In [4]:
import time
start_time = time.time()
print('Computation started')
ddf.groupby(['A', 'B'], dropna=False, observed=True).agg({'C':'median', 'C':'std'}).compute()
print("--- %s seconds ---" % (time.time() - start_time))Basic aggregation

Computation started
--- 319.9341297149658 seconds ---


#### Dask ML

In [24]:
from dask_ml.preprocessing import Categorizer, DummyEncoder

from sklearn.linear_model import LogisticRegression

from sklearn.pipeline import make_pipeline

import pandas as pd

import dask.dataframe as dd

df = pd.DataFrame({"A": [1, 2, 1, 2], "B": ["a", "b", "c", "c"]})

X = dd.from_pandas(df, npartitions=2)

y = dd.from_pandas(pd.Series([0, 1, 1, 0]), npartitions=2)

pipe = make_pipeline(
   Categorizer(),
   DummyEncoder(),
   LogisticRegression(solver='lbfgs')
)


pipe.fit(X, y)

Pipeline(steps=[('categorizer', Categorizer()),
                ('dummyencoder', DummyEncoder()),
                ('logisticregression', LogisticRegression())])