# Gotcha's from Pandas to Dask

This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment.  
Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information.

In [17]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')

Dask versoin: 1.2.2
Pandas versoin: 0.24.2


## Start Dask Client for Dashboard

Starting the Dask Client is optional.  In this example we are running on a `LocalCluster`, this  will also provide a dashboard which is useful to gain insight on the computation.  
For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup)  

The link to the dashboard will become visible when you create a client (as shown below).  
When running within `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to view the various dashboard widgets. 

In [18]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:56527  Dashboard: http://127.0.0.1:56530/status,Cluster  Workers: 4  Cores: 4  Memory: 8.50 GB


See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)

# Create 2 DataFrames for comparison: 
1. for Dask 
2. for Pandas  
Dask comes with builtin dataset samples, we will use this sample for our example. 

In [19]:
ddf = dask.datasets.timeseries()
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int32,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


* Remember `Dask framework` is **lazy** thus in order to see the result we need to run [compute()](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.compute) 
 (or `head()` which runs under the hood compute()) )

In [20]:
ddf.head(2)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113


#### Pandas Dataframe
In order to create a `Pandas` dataframe we can use the `compute()` method from a `Dask dataframe`

In [21]:
pdf = ddf.compute()  
print(type(pdf))
pdf.head(2)

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113


### dataframe.shape  
We can also see *dask laziness* when using the shape attribute

In [22]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')

Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-c9ffcd0a-15b0-4578-b513-d8c786a9f8c7'), 4)


We cannot get the full shape before accessing all the partitions - running `len` will do so

In [23]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive

Dask computed shape: 2,592,000


## Creating a `Dask dataframe` from `Pandas`
In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf)  with the [from_pandas](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_pandas) method. 
You must supply the number of partitions or chunksize that will be used to generate the dask dataframe

In [24]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2

Unnamed: 0_level_0,id,name,x,y
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,int32,object,float64,float64
2000-01-04 00:00:00,...,...,...,...
...,...,...,...,...
2000-01-28 00:00:00,...,...,...,...
2000-01-30 23:59:59,...,...,...,...


## Partitions in Dask Dataframes

Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`.  
    The number of partitions will assist `Dask` on how to breakup the  `Pandas Datafram` and parallelize the computation.  
Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions)  

An example for this can be seen when examing the `reset_ index()` method:

In [25]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]

timestamp    2000-01-01 00:00:00
id                           986
name                       Wendy
x                    -0.00863727
y                       0.620126
Name: 0, dtype: object

In [26]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute() 

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01,986,Wendy,-0.008637,0.620126
0,2000-01-04,1021,Bob,0.694772,-0.759317
0,2000-01-07,1005,Frank,-0.090218,0.69101
0,2000-01-10,987,Sarah,0.482925,-0.707838
0,2000-01-13,995,Quinn,0.108016,0.510353
0,2000-01-16,983,Charlie,-0.688607,0.045371
0,2000-01-19,1015,Ray,-0.254656,-0.665727
0,2000-01-22,1035,Charlie,-0.026247,-0.214072
0,2000-01-25,1047,Kevin,-0.517362,0.410634
0,2000-01-28,940,Ursula,-0.137021,0.877518


# Dask Dataframe vs Pandas Dataframe
Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them.

## Conceptual shift - from Update to Insert/Delete
Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas.  
For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)

### Rename Columns

* using `inplace=True` is not considerd to be *best practice*. 

In [27]:
# Pandas 
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns

Index(['id', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

In [28]:
# Dask - Error
ddf.rename(columns={'id':'ID'}, inplace=True)
ddf.columns

TypeError: rename() got an unexpected keyword argument 'inplace'

In [29]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns

Index(['id', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

## Data munipilations  
There are several diffrences when manipulating data.  

### loc - Pandas

In [30]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:05,1001,Oliver,0.721349,-54.949113
2000-01-01 00:00:25,1043,Yvonne,0.66136,34.283023


In [31]:
# Daske - Error 
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

### Dask - use mask/where

In [32]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113


In [33]:
#Dask
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113


For more information see [dask mask documentation](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.mask)

## Meta argument
One key feature in `Dask` is the introduction of `meta` arguement.  
> `meta` is the prescription of the names/types of the output from the computation  
from [stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)

Since `Dask` creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.  
For additinal information see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata)

In [34]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al


In [35]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('name', 'object'))



Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al


#### Introducing meta argument

In [36]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')

In [37]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta = meta_arg)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al


In [38]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000  
    else:
        return row['y'] * -1

In [39]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,z
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We,-0.620126
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al,0.780113


### Map partitions
* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://dask.readthedocs.io/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) method.   
Mainly useful for functions that are not implemented in `Dask` or `Pandas` . 
* Finally we can return a new `dataframe` which needs to be described in the `meta` argument  
The function could also include arguments.

In [40]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'                                              
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()

Unnamed: 0_level_0,ID,name,x,y,dist
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,1.494376
2000-01-01 00:00:02,1000,Victor,-0.803278,-0.940583,0.316356
2000-01-01 00:00:03,904,Quinn,0.269541,-0.408315,1.197602
2000-01-01 00:00:04,1034,Patricia,0.01683,0.660678,1.098458


### Convert index into Time column

In [41]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We,00:00:00
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al,00:00:01


In [42]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,z,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,We,-0.620126,00:00:00
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,Al,0.780113,00:00:01


## Drop NA on column

In [43]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1) 

In [44]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)

In [45]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,00:00:00
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,00:00:01


In odrer for `Dask` to drop a column with all `na` it must check all the partitions with `compute()`

In [46]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,986,Wendy,-0.008637,0.620126,00:00:00
2000-01-01 00:00:01,985,Alice,-0.530643,-0.780113,00:00:01


##  1.4 Reset Index

In [47]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)

Unnamed: 0,ID,name,x,y,times
0,986,Wendy,-0.008637,0.620126,00:00:00
1,985,Alice,-0.530643,-0.780113,00:00:01


In [48]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)

Unnamed: 0,ID,name,x,y,times
0,986,Wendy,-0.008637,0.620126,00:00:00
1,985,Alice,-0.530643,-0.780113,00:00:01


# Read / Save files

* When working with `pandas` and `dask` preferable use [parquet format](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format).  
* When working with `Dask` - files can be read with multiple workers .  
* Most `kwargs` are applicable for reading and writing files   
e.g. 
ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).  
* However some are not available such as  `nrows`.

[see documentaion](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_csv) (including the option for output file naming).

## Save files

In [49]:
%%time
# Pandas
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
pdf.to_csv(output_dir_file)

Wall time: 26.1 s


In [50]:
list(output_dir_file.parent.glob('*.csv'))

[WindowsPath('data/pdf_single_file.csv')]

`Dask`
Notice the '*' to allow for multiple file renaming. 



In [71]:
%%time
# Dask
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)

Wall time: 19.1 s


To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.npartitions)  

To change the number of output files use [repartition](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.repartition) which is an expensive operation.

In [52]:
ddf.npartitions

30

In [53]:
list(Path(output_dask_dir).glob('*.csv'))

[WindowsPath('data/pd2dd/ddf00.csv'),
 WindowsPath('data/pd2dd/ddf01.csv'),
 WindowsPath('data/pd2dd/ddf02.csv'),
 WindowsPath('data/pd2dd/ddf03.csv'),
 WindowsPath('data/pd2dd/ddf04.csv'),
 WindowsPath('data/pd2dd/ddf05.csv'),
 WindowsPath('data/pd2dd/ddf06.csv'),
 WindowsPath('data/pd2dd/ddf07.csv'),
 WindowsPath('data/pd2dd/ddf08.csv'),
 WindowsPath('data/pd2dd/ddf09.csv'),
 WindowsPath('data/pd2dd/ddf10.csv'),
 WindowsPath('data/pd2dd/ddf11.csv'),
 WindowsPath('data/pd2dd/ddf12.csv'),
 WindowsPath('data/pd2dd/ddf13.csv'),
 WindowsPath('data/pd2dd/ddf14.csv'),
 WindowsPath('data/pd2dd/ddf15.csv'),
 WindowsPath('data/pd2dd/ddf16.csv'),
 WindowsPath('data/pd2dd/ddf17.csv'),
 WindowsPath('data/pd2dd/ddf18.csv'),
 WindowsPath('data/pd2dd/ddf19.csv'),
 WindowsPath('data/pd2dd/ddf20.csv'),
 WindowsPath('data/pd2dd/ddf21.csv'),
 WindowsPath('data/pd2dd/ddf22.csv'),
 WindowsPath('data/pd2dd/ddf23.csv'),
 WindowsPath('data/pd2dd/ddf24.csv'),
 WindowsPath('data/pd2dd/ddf25.csv'),
 WindowsPath

## Read files

For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe).

In [54]:
%%time
# Pandas
dir_path = Path(r'data/pd2dd')
concat_df = pd.concat([pd.read_csv(f) 
                       for f in list(dir_path.glob('*.csv'))])
len(concat_df)

Wall time: 5.65 s


In [55]:
%%time
# Dask
_ddf = dd.read_csv('data/pd2dd/ddf*.csv')
_ddf

Wall time: 65.7 ms


Remember that `Dask` is lazy - thus it does not *realy* read the file until it needs to...

In [56]:
%%time
_ddf = dd.read_csv('data/pd2dd/ddf*.csv')
len(_ddf)

Wall time: 3.18 s


 ## Consider using client.persist()
 Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell.  Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory  
Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an exampel in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes)   
This concept should also  be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.


In [57]:
# e.g.
_ddf = dd.read_csv('data/pd2dd/ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)

Unnamed: 0,ID,name,x,y,times
0,986,Wendy,-0.008637,0.620126,00:00:00
1,985,Alice,-0.530643,-0.780113,00:00:01


# Group By - custom aggregations
In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/master/dataframes/02-groupby.ipynb) that is in the repository -  
This is another example how to try to eliminate the use of `groupby.apply`.   
In this example we are grouping columns into unique lists.

#### Pandas

In [58]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()

Unnamed: 0,name,ID,seconds
0,Wendy,986,0
1,Alice,985,0
2,Victor,1000,0
3,Quinn,904,0
4,Patricia,1034,0


In [59]:
%%time
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list()))) 
               for att_col_gr in gp_col]
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')      

Wall time: 2.42 s


In [60]:
df_edge_att.head(2)

Unnamed: 0_level_0,Weight,ID,seconds
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,99694,"[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...","[61, 92, 60, 49, 24, 06, 00, 44, 73, 48, 94, 8..."
Bob,99417,"[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...","[61, 92, 60, 49, 24, 06, 00, 44, 73, 48, 94, 8..."


* Remeber that in any some cases `Pandas` is more efficiante (assuming that you can load all the data into the RAM).  

#### Dask

In [61]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)

Unnamed: 0,name,ID,seconds
0,Wendy,986,0
1,Alice,985,1


In [62]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) 
               for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)

Wall time: 18.9 s


We can do better...   
Using [dask custom aggregation](https://docs.dask.org/en/latest/dataframe-api.html?highlight=dropna#dask.dataframe.groupby.Aggregation) is consideribly better

In [63]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)

In [64]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)  

Wall time: 3.16 s


## [Debugging](https://docs.dask.org/en/latest/debugging.html)
Debugging may be challenging...
1. Run code without client 
2. Use Dashboard profiler
3. Verify integrity of DAG

### Corrupted DAG  
In this example we show that once the DAG is currupted you may need to reset the calculation

In [65]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,972,Patricia,0.46767,-0.410009


In [66]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2  
                     +  (df[coor_y] - df[coor_y].shift())^2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))

Is everything OK?

In [67]:
# returns an error because of ^2 (needs to be **2)
ddf.head()

TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

* Even if the function is corrected the DAG is corrupted

In [68]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))

In [69]:
ddf.head(2)

TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

We need to reset the dataframe

In [70]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)

Unnamed: 0_level_0,id,name,x,y,col
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,1039,Wendy,0.927836,-0.622193,
2000-01-01 00:00:01,1051,Jerry,-0.802541,-0.614413,1.730394
