# Dask

We can do data preparation using Dask to subselect interesting items.

Be aware that the `ddf` works "like" but not "the same as" Pandas so you'll have to learn new syntax. Notably `compute`. The diagnostics are great.

In [1]:
import ipython_memory_usage
#https://github.com/ianozsvald/ipython_memory_usage
%ipython_memory_usage_start

'memory profile enabled'

In [1] used 0.5000 MiB RAM in 0.12s, peaked 0.00 MiB above current, total RAM usage 48.53 MiB


In [2]:
!ls -lta data_16_10M/datablock*.parquet | head

-rw-r--r-- 1 ian ian 161830101 Apr 24 14:06 data_16_10M/datablock15.parquet
-rw-r--r-- 1 ian ian 161830098 Apr 24 14:06 data_16_10M/datablock14.parquet
-rw-r--r-- 1 ian ian 161830099 Apr 24 14:06 data_16_10M/datablock13.parquet
-rw-r--r-- 1 ian ian 161830102 Apr 24 14:06 data_16_10M/datablock12.parquet
-rw-r--r-- 1 ian ian 161830100 Apr 24 14:06 data_16_10M/datablock11.parquet
-rw-r--r-- 1 ian ian 161830107 Apr 24 14:06 data_16_10M/datablock10.parquet
-rw-r--r-- 1 ian ian 161830102 Apr 24 14:06 data_16_10M/datablock9.parquet
-rw-r--r-- 1 ian ian 161830099 Apr 24 14:06 data_16_10M/datablock8.parquet
-rw-r--r-- 1 ian ian 161830103 Apr 24 14:06 data_16_10M/datablock7.parquet
-rw-r--r-- 1 ian ian 161830104 Apr 24 14:06 data_16_10M/datablock6.parquet
In [2] used 0.7617 MiB RAM in 0.23s, peaked 0.00 MiB above current, total RAM usage 49.29 MiB


In [3]:
import time
import dask.dataframe as dd
from dask.distributed import Client, progress

#ddf = dd.read_parquet('data_100_50M/datablock*.parquet', chunksize=chunksize)
ddf = dd.read_parquet('data_16_10M/datablock*.parquet')
#print()

In [3] used 71.0195 MiB RAM in 0.95s, peaked 0.00 MiB above current, total RAM usage 120.31 MiB


In [4]:
ddf.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 3 entries, a to c
dtypes: bool(1), float64(2)In [4] used 0.0000 MiB RAM in 0.11s, peaked 0.00 MiB above current, total RAM usage 120.31 MiB


In [5]:
ddf.head()

Unnamed: 0,a,b,c
0,6.229213,99.022067,True
1,47.963104,5.193645,True
2,56.938338,13.297275,False
3,45.02082,10.215429,True
4,73.283511,14.906987,False


In [5] used 550.9297 MiB RAM in 0.79s, peaked 181.19 MiB above current, total RAM usage 671.24 MiB


In [6]:
# can't ask for n, have to ask for a fraction
ddf.sample(frac=0.000001).compute() # 10-60sec

Unnamed: 0,a,b,c
1795389,8.124298,27.903569,True
7619670,42.157127,34.333250,True
2493774,46.041779,0.862118,True
6127641,52.090930,31.182907,False
6510523,12.934024,39.641707,True
...,...,...,...
7199130,82.487965,90.331430,False
96099,49.420155,79.148501,True
2949156,35.195434,5.463790,True
1037057,95.346796,36.790486,False


In [6] used -232.2109 MiB RAM in 14.77s, peaked 2848.50 MiB above current, total RAM usage 439.03 MiB


## We can calculate the RAM stored in each partition

* float64 is 8 bytes, 10M * 8 == 80MB
* 2 * float64 * 10M == 160MB + 1 * byte == 170MB

In [7]:
ddf.memory_usage_per_partition().compute()

0     170000128
1     170000128
2     170000128
3     170000128
4     170000128
5     170000128
6     170000128
7     170000128
8     170000128
9     170000128
10    170000128
11    170000128
12    170000128
13    170000128
14    170000128
15    170000128
dtype: int64

In [7] used 640.8008 MiB RAM in 4.84s, peaked 2805.31 MiB above current, total RAM usage 1079.83 MiB


# Make a Client for diagnostics

Be aware that out of memory errors are common with large partitions!

In [8]:
client = Client(n_workers=2, threads_per_worker=1, memory_limit='8GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:45381  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 16.00 GB


In [8] used -623.3789 MiB RAM in 1.38s, peaked 314.27 MiB above current, total RAM usage 456.45 MiB


In [9]:
t1 = time.time()
result = ddf.max().compute()
delta = time.time() - t1
print(f"Took {delta}")
result

Took 51.00003623962402


a    100.000000
b     99.999998
c      1.000000
dtype: float64

In [9] used 2.4023 MiB RAM in 51.11s, peaked 0.00 MiB above current, total RAM usage 458.85 MiB


In [10]:
t1 = time.time()
result = ddf.groupby('c').mean().compute()
delta = time.time() - t1
print(f"Took {delta}")
result

Took 26.154868364334106


Unnamed: 0_level_0,a,b
c,Unnamed: 1_level_1,Unnamed: 2_level_1
False,75.001322,50.000123
True,25.000493,50.005265


In [10] used 1.1016 MiB RAM in 26.27s, peaked 0.00 MiB above current, total RAM usage 459.95 MiB


In [11]:
ddf.describe().compute() # VERY SLOW!

Unnamed: 0,a,b
count,160000000.0,160000000.0
mean,50.00143,50.00269
std,28.86777,28.86803
min,1.390652e-06,4.087862e-07
25%,25.02155,25.01634
50%,50.02219,50.02671
75%,75.01911,75.02436
max,100.0,100.0


In [11] used 5.0938 MiB RAM in 71.84s, peaked 0.00 MiB above current, total RAM usage 465.05 MiB


In [12]:
client.close()

In [12] used 0.2109 MiB RAM in 1.48s, peaked 0.08 MiB above current, total RAM usage 465.26 MiB
