# Various Trips and Traps with Dask

In [1]:
import os
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, progress

In [2]:
data_dir = 'data'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

### Setup

In [3]:
%%time

# Create n large csv files (could be too big to fit all in memory)
n = 5
shape = (10000, 1000)
index_start = 0

index_end = index_start
for i in range(n):
    filepath = os.path.join(data_dir, f'datafile_{i:02d}.csv')
    index_start, index_end = index_end, index_end + shape[0]
    if not os.path.exists(filepath):
        data = (i + 1) * np.random.randn(shape[0], shape[1])
        print(f"Array {i} size in memory: {data.nbytes*1e-6:.2f} MB")
        index = pd.RangeIndex(index_start, index_end, name='Index')
        pd.DataFrame(data, index=index).to_csv(filepath)

CPU times: user 615 µs, sys: 713 µs, total: 1.33 ms
Wall time: 711 µs


In [4]:
try:
    isinstance(client, Client)
except NameError:
    client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:51536  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 4  Memory: 2.00 GB


In [5]:
%%time

# Read and merge all data from csv files
combined_df = dd.read_csv(os.path.join(data_dir, '*.csv')).set_index('Index')


CPU times: user 2.2 s, sys: 112 ms, total: 2.31 s
Wall time: 9.35 s


## 1. Resetting the index on a partitioned dataframe

In [6]:
combined_df.tail()

Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9,...,990,991,992,993,994,995,996,997,998,999
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
49995,2.656084,-5.723478,2.992767,-1.373304,-0.809676,2.440365,-0.934688,-0.209673,1.959916,-0.731121,...,2.577678,6.887296,5.771353,-0.558412,2.139612,6.682269,-2.874145,-4.54216,-6.578731,9.212493
49996,3.575132,7.584768,-0.880743,1.277054,-1.335541,-0.842873,3.631325,-2.188766,3.539856,2.853291,...,0.68774,6.352208,9.339392,1.230229,-5.708584,1.303731,-6.171363,-1.026645,-4.487722,-6.613544
49997,-4.852932,1.664352,-1.909289,6.354981,-4.297942,-9.12393,2.663738,-0.762787,-0.378273,-7.511945,...,-3.581669,3.949568,1.075555,-12.807619,5.005144,5.194674,-4.804662,5.563043,-5.169852,3.808141
49998,0.524689,-4.54098,-8.191672,-9.465075,10.039459,1.687003,11.20274,8.066513,-2.347594,4.397023,...,-0.04254,5.069655,7.772472,-11.117755,0.759763,6.487029,11.073698,-7.354793,-1.626473,-0.957538
49999,5.122776,-2.273731,-5.90033,-6.542091,-4.175456,10.635781,0.621987,-10.042059,-1.020847,-5.882787,...,-3.411462,4.516036,9.136437,0.147661,-3.737408,4.090771,-6.298624,1.795501,-1.939175,0.140364


Note that the `reset_index` method does not work like it does in Pandas

From the [documentation](https://docs.dask.org/en/latest/dataframe-api.html?highlight=reset_index#dask.dataframe.DataFrame.reset_index):
> Note that unlike in pandas, the reset `dask.dataframe` index will not be monotonically increasing from 0. Instead, it will restart at 0 for each partition (e.g. `index1 = [0, ..., 10], index2 = [0, ...]`). This is due to the inability to statically know the full length of the index.

In [7]:
%%time

combined_df = combined_df.reset_index()
combined_df.tail()

CPU times: user 237 ms, sys: 18.4 ms, total: 255 ms
Wall time: 1.81 s


Unnamed: 0,Index,0,1,2,3,4,5,6,7,8,...,990,991,992,993,994,995,996,997,998,999
3252,49995,2.656084,-5.723478,2.992767,-1.373304,-0.809676,2.440365,-0.934688,-0.209673,1.959916,...,2.577678,6.887296,5.771353,-0.558412,2.139612,6.682269,-2.874145,-4.54216,-6.578731,9.212493
3253,49996,3.575132,7.584768,-0.880743,1.277054,-1.335541,-0.842873,3.631325,-2.188766,3.539856,...,0.68774,6.352208,9.339392,1.230229,-5.708584,1.303731,-6.171363,-1.026645,-4.487722,-6.613544
3254,49997,-4.852932,1.664352,-1.909289,6.354981,-4.297942,-9.12393,2.663738,-0.762787,-0.378273,...,-3.581669,3.949568,1.075555,-12.807619,5.005144,5.194674,-4.804662,5.563043,-5.169852,3.808141
3255,49998,0.524689,-4.54098,-8.191672,-9.465075,10.039459,1.687003,11.20274,8.066513,-2.347594,...,-0.04254,5.069655,7.772472,-11.117755,0.759763,6.487029,11.073698,-7.354793,-1.626473,-0.957538
3256,49999,5.122776,-2.273731,-5.90033,-6.542091,-4.175456,10.635781,0.621987,-10.042059,-1.020847,...,-3.411462,4.516036,9.136437,0.147661,-3.737408,4.090771,-6.298624,1.795501,-1.939175,0.140364


In [10]:
%%time

# Problem!
combined_df.loc[3000].compute()

Unnamed: 0_level_0,Index,0,1,2,3,4,5,6,7,8,...,991,992,993,994,995,996,997,998,999,idx
Index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
3000,3000,1.788494,0.631375,-0.657303,0.250359,-0.251256,0.694597,0.978765,0.877007,0.591869,...,-0.074555,1.037596,-1.545785,0.181904,0.047263,-0.117303,-1.635576,-0.249782,-0.443489,1


In [11]:
# Note: this doesn't work in Dask:
# combined_df = combined_df.set_index(np.arange(len(combined_df)))

In [12]:
%%time

# Workaround
combined_df = combined_df.assign(idx=1)
combined_df = combined_df.set_index((combined_df.idx.cumsum() - 1).rename('Index'))

CPU times: user 1.89 s, sys: 114 ms, total: 2 s
Wall time: 12.4 s


In [13]:
combined_df = combined_df.reset_index(drop=True)

In [14]:
combined_df.tail()

Unnamed: 0,Index,0,1,2,3,4,5,6,7,8,...,991,992,993,994,995,996,997,998,999,idx
3252,49995,2.656084,-5.723478,2.992767,-1.373304,-0.809676,2.440365,-0.934688,-0.209673,1.959916,...,6.887296,5.771353,-0.558412,2.139612,6.682269,-2.874145,-4.54216,-6.578731,9.212493,1
3253,49996,3.575132,7.584768,-0.880743,1.277054,-1.335541,-0.842873,3.631325,-2.188766,3.539856,...,6.352208,9.339392,1.230229,-5.708584,1.303731,-6.171363,-1.026645,-4.487722,-6.613544,1
3254,49997,-4.852932,1.664352,-1.909289,6.354981,-4.297942,-9.12393,2.663738,-0.762787,-0.378273,...,3.949568,1.075555,-12.807619,5.005144,5.194674,-4.804662,5.563043,-5.169852,3.808141,1
3255,49998,0.524689,-4.54098,-8.191672,-9.465075,10.039459,1.687003,11.20274,8.066513,-2.347594,...,5.069655,7.772472,-11.117755,0.759763,6.487029,11.073698,-7.354793,-1.626473,-0.957538,1
3256,49999,5.122776,-2.273731,-5.90033,-6.542091,-4.175456,10.635781,0.621987,-10.042059,-1.020847,...,4.516036,9.136437,0.147661,-3.737408,4.090771,-6.298624,1.795501,-1.939175,0.140364,1


In [15]:
%%time

from itertools import count

# This doesn't work "Worker exceeded 95% memory budget"
index_generator = iter(count())
f = lambda x: next(index_generator)
combined_df = combined_df.assign(idx=f).set_index('idx')

CPU times: user 2.24 s, sys: 133 ms, total: 2.37 s
Wall time: 13.6 s


In [16]:
combined_df.tail().index



KilledWorker: ("('shuffle-shuffle-join-sort_index-tail-5-sort_index-cfb425d38ebe300d5ccf002c9cd908c8', 0)", <Worker 'tcp://127.0.0.1:51569', name: 1, memory: 0, processing: 1>)

In [None]:
%%time

index_generator = iter(range(50000))
f = lambda x: next(index_generator)
combined_df = combined_df.assign(idx=f).set_index('idx')

In [None]:
combined_df.tail()

In [None]:
%%time

# new_index_values = np.arange(50000)  
#TypeError: Column assignment doesn't support type numpy.ndarray

# new_index_values = pd.Series(np.arange(50000))  
#ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

combined_df = combined_df.assign(idx=new_index_values)
combined_df = combined_df.set_index(combined_df.idx)

In [None]:
df.set_index(np.arange(len(df)))

tornado.application - ERROR - Exception in callback <bound method SystemMonitor.update of <SystemMonitor: cpu: 0 memory: 182 MB fds: 114>>
Traceback (most recent call last):
  File "/anaconda3/envs/torch/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/anaconda3/envs/torch/lib/python3.7/site-packages/distributed/system_monitor.py", line 65, in update
    read_bytes = (ioc.bytes_recv - last.bytes_recv) / (duration or 0.5)
AttributeError: 'NoneType' object has no attribute 'bytes_recv'
