# Gotcha's from Pandas to Dask

This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment.  
Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information.

In [1]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')

Dask versoin: 2022.04.0
Pandas versoin: 1.4.2


## Start Dask Client for Dashboard

Starting the Dask Client is optional.  In this example we are running on a `LocalCluster`, this  will also provide a dashboard which is useful to gain insight on the computation.  
For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup)  

The link to the dashboard will become visible when you create a client (as shown below).  
When running within `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to view the various dashboard widgets. 

In [2]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 6.78 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39211,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 6.78 GiB

0,1
Comm: tcp://127.0.0.1:37931,Total threads: 1
Dashboard: http://127.0.0.1:42685/status,Memory: 3.39 GiB
Nanny: tcp://127.0.0.1:41003,
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8dvc6orb,Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-8dvc6orb

0,1
Comm: tcp://127.0.0.1:40067,Total threads: 1
Dashboard: http://127.0.0.1:40229/status,Memory: 3.39 GiB
Nanny: tcp://127.0.0.1:40143,
Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-7x38qz2h,Local directory: /home/runner/work/dask-examples/dask-examples/dataframes/dask-worker-space/worker-7x38qz2h


See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)

# Create 2 DataFrames for comparison: 
1. for Dask 
2. for Pandas  
Dask comes with builtin dataset samples, we will use this sample for our example. 

In [3]:
ddf = dask.datasets.timeseries()
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


* Remember `Dask framework` is **lazy** thus in order to see the result we need to run [compute()](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.compute.html) 
 (or `head()` which runs under the hood compute()) )

In [4]:
ddf.head(2)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027


#### Pandas Dataframe
In order to create a `Pandas` dataframe we can use the `compute()` method from a `Dask dataframe`

In [5]:
pdf = ddf.compute()  
print(type(pdf))
pdf.head(2)

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027


### dataframe.shape  
We can also see *dask laziness* when using the shape attribute

In [6]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')

Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-457670a7-0118-48ff-b5cd-08db2a126430'), 4)


We cannot get the full shape before accessing all the partitions - running `len` will do so

In [7]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive

Dask computed shape: 2,592,000


## Creating a `Dask dataframe` from `Pandas`
In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf)  with the [from_pandas](https://docs.dask.org/en/latest/generated/dask.dataframe.from_pandas.html) method. 
You must supply the number of partitions or chunksize that will be used to generate the dask dataframe

In [8]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2

Unnamed: 0_level_0,id,name,x,y
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,int64,object,float64,float64
2000-01-04 00:00:00,...,...,...,...
...,...,...,...,...
2000-01-28 00:00:00,...,...,...,...
2000-01-30 23:59:59,...,...,...,...


## Partitions in Dask Dataframes

Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`.  
    The number of partitions will assist `Dask` on how to breakup the  `Pandas Datafram` and parallelize the computation.  
Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions)  

An example for this can be seen when examing the `reset_ index()` method:

In [9]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]

timestamp    2000-01-01 00:00:00
id                           995
name                      Yvonne
x                      -0.257197
y                       0.288303
Name: 0, dtype: object

In [10]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute() 

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01,995,Yvonne,-0.257197,0.288303
0,2000-01-04,1040,Kevin,0.071874,0.998809
0,2000-01-07,950,Dan,-0.860903,-0.366538
0,2000-01-10,1012,Norbert,-0.688807,-0.845508
0,2000-01-13,970,Kevin,0.964429,-0.470225
0,2000-01-16,1017,Ray,-0.858035,-0.123796
0,2000-01-19,973,Norbert,-0.595604,0.726321
0,2000-01-22,1024,Bob,0.27216,0.54934
0,2000-01-25,1026,Bob,-0.942005,-0.867345
0,2000-01-28,996,Norbert,0.885387,-0.840972


# Dask Dataframe vs Pandas Dataframe
Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them.

## Conceptual shift - from Update to Insert/Delete
Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas.  
For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)

### Rename Columns

* using `inplace=True` is not considerd to be *best practice*. 

In [11]:
# Pandas 
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns

Index(['id', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

In [12]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns

Index(['id', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

## Data manipulations  
There are several diffrences when manipulating data.  

### loc - Pandas

In [13]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:17,1000,Norbert,0.727163,89.682962
2000-01-01 00:00:20,1060,Ursula,0.700307,-75.613821


### Error

```python
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment
```

### Dask - use mask/where

In [14]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027


In [15]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027


For more information see [dask mask documentation](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.mask.html)

## Meta argument
One key feature in `Dask` is the introduction of `meta` arguement.  
> `meta` is the prescription of the names/types of the output from the computation  
from [stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)

Since `Dask` creates a DAG for the computation, it requires to understand what are the outputs of each calculation stage.  
For additinal information see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata)

In [16]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La


In [17]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('name', 'object'))



Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La


#### Introducing meta argument

In [18]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')

In [19]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La


In [20]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000  
    else:
        return row['y'] * -1

In [21]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,z
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv,-0.288303
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La,-0.63027


### Map partitions
* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.map_partitions.html) method.   
Mainly useful for functions that are not implemented in `Dask` or `Pandas` . 
* Finally we can return a new `dataframe` which needs to be described in the `meta` argument  
The function could also include arguments.

In [22]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'                                              
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()

Unnamed: 0_level_0,ID,name,x,y,dist
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,0.636862
2000-01-01 00:00:02,1012,Ursula,-0.013075,0.732633,0.788061
2000-01-01 00:00:03,960,Yvonne,-0.361644,-0.202956,0.998412
2000-01-01 00:00:04,988,Kevin,0.918343,-0.933568,1.473825


### Convert index into Time column

In [23]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv,00:00:00
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La,00:00:01


In [24]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,initials,z,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,Yv,-0.288303,00:00:00
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,La,-0.63027,00:00:01


## Drop NA on column

In [25]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1) 

In [26]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)

In [27]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)

Unnamed: 0_level_0,ID,name,x,y,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,00:00:00
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,00:00:01


In odrer for `Dask` to drop a column with all `na` it must check all the partitions with `compute()`

In [28]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)

Unnamed: 0_level_0,ID,name,x,y,times
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,995,Yvonne,-0.257197,0.288303,00:00:00
2000-01-01 00:00:01,1007,Laura,-0.79446,0.63027,00:00:01


##  1.4 Reset Index

In [29]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)

Unnamed: 0,ID,name,x,y,times
0,995,Yvonne,-0.257197,0.288303,00:00:00
1,1007,Laura,-0.79446,0.63027,00:00:01


In [30]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)

Unnamed: 0,ID,name,x,y,times
0,995,Yvonne,-0.257197,0.288303,00:00:00
1,1007,Laura,-0.79446,0.63027,00:00:01


# Read / Save files

* When working with `pandas` and `dask` preferable use [parquet format](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format).  
* When working with `Dask` - files can be read with multiple workers .  
* Most `kwargs` are applicable for reading and writing files   
e.g. 
ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).  
* However some are not available such as  `nrows`.

[see documentaion](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.to_csv.html) (including the option for output file naming).

## Save files

In [31]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)

In [32]:
%%time
# Pandas
pdf.to_csv(output_dir_file)

CPU times: user 15.1 s, sys: 343 ms, total: 15.5 s
Wall time: 15.3 s


In [33]:
list(output_dir_file.parent.glob('*.csv'))

[PosixPath('data/2000-01-26.csv'),
 PosixPath('data/2000-01-09.csv'),
 PosixPath('data/2000-01-01.csv'),
 PosixPath('data/2000-01-11.csv'),
 PosixPath('data/2000-01-02.csv'),
 PosixPath('data/2000-01-22.csv'),
 PosixPath('data/2000-01-08.csv'),
 PosixPath('data/2000-01-07.csv'),
 PosixPath('data/2000-01-03.csv'),
 PosixPath('data/2000-01-30.csv'),
 PosixPath('data/2000-01-29.csv'),
 PosixPath('data/2000-01-12.csv'),
 PosixPath('data/2000-01-19.csv'),
 PosixPath('data/2000-01-20.csv'),
 PosixPath('data/2000-01-23.csv'),
 PosixPath('data/2000-01-04.csv'),
 PosixPath('data/2000-01-13.csv'),
 PosixPath('data/2000-01-06.csv'),
 PosixPath('data/2000-01-21.csv'),
 PosixPath('data/2000-01-10.csv'),
 PosixPath('data/2000-01-17.csv'),
 PosixPath('data/pdf_single_file.csv'),
 PosixPath('data/2000-01-14.csv'),
 PosixPath('data/2000-01-05.csv'),
 PosixPath('data/2000-01-16.csv'),
 PosixPath('data/2000-01-28.csv'),
 PosixPath('data/2000-01-25.csv'),
 PosixPath('data/2000-01-27.csv'),
 PosixPath('dat

Notice the `'*'` to allow for multiple file renaming. 

In [34]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)

In [35]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)

CPU times: user 380 ms, sys: 31.3 ms, total: 412 ms
Wall time: 9.27 s


['/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf00.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf01.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf02.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf03.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf04.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf05.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf06.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf07.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf08.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_multi_files/ddf09.csv',
 '/home/runner/work/dask-examples/dask-examples/dataframes/data/dask_m

To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.npartitions.html)  

To change the number of output files use [repartition](https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html) which is an expensive operation.

In [36]:
ddf.npartitions

30

## Read Multiple files

For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe).

In [37]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f) 
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)

CPU times: user 2.6 s, sys: 350 ms, total: 2.95 s
Wall time: 2.88 s


2592000

In [38]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf

CPU times: user 9.84 ms, sys: 0 ns, total: 9.84 ms
Wall time: 9.3 ms


Unnamed: 0_level_0,ID,name,x,y,times
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,object,float64,float64,object
,...,...,...,...,...
...,...,...,...,...,...
,...,...,...,...,...
,...,...,...,...,...


Remember that `Dask` is lazy - thus it does not *realy* read the file until it needs to...

In [39]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)

CPU times: user 70.5 ms, sys: 6.65 ms, total: 77.2 ms
Wall time: 769 ms


2592000

 ## Consider using client.persist()
 Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell.  Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory  
Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an example in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes)   
This concept should also  be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.


In [40]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)

Unnamed: 0,ID,name,x,y,times
0,995,Yvonne,-0.257197,0.288303,00:00:00
1,1007,Laura,-0.79446,0.63027,00:00:01


# Group By - custom aggregations
In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/main/dataframes/02-groupby.ipynb) that is in the repository -  
This is another example how to try to eliminate the use of `groupby.apply`.   
In this example we are grouping columns into unique lists.

#### Pandas

In [41]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()

Unnamed: 0,name,ID,seconds
0,Yvonne,995,0
1,Laura,1007,0
2,Ursula,1012,0
3,Yvonne,960,0
4,Kevin,988,0


In [42]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list()))) 
               for att_col_gr in gp_col]

In [43]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')      
print(df_edge_att.head(2))

       Weight                                                 ID  \
name                                                               
Alice   99633  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...   
Bob     99782  [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...   

                                                 seconds  
name                                                      
Alice  [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...  
Bob    [60, 29, 11, 09, 44, 05, 77, 54, 50, 01, 23, 7...  
CPU times: user 22.4 ms, sys: 0 ns, total: 22.4 ms
Wall time: 20.1 ms


* Remeber that in any some cases `Pandas` is more efficiante (assuming that you can load all the data into the RAM).  

#### Dask

In [44]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)

Unnamed: 0,name,ID,seconds
0,Yvonne,995,0
1,Laura,1007,1


In [45]:
ddf.columns

Index(['ID', 'name', 'x', 'y', 'times', 'seconds'], dtype='object')

In [46]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) 
               for att_col_gr in gp_col]

In [47]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['name']
  Missing: [0]

We can do better...   
Using [dask custom aggregation](https://docs.dask.org/en/latest/generated/dask.dataframe.groupby.Aggregation.html) is consideribly better

In [48]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)

In [49]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)  

CPU times: user 171 ms, sys: 4.06 ms, total: 175 ms
Wall time: 1.21 s


Unnamed: 0_level_0,Weight,ID,seconds
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,99633,"[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...","[23, 55, 51, 21, 28, 58, 35, 06, 53, 11, 39, 3..."
Bob,99782,"[1024, 1025, 1026, 1027, 1028, 1029, 1030, 103...","[23, 55, 51, 21, 28, 58, 06, 35, 53, 39, 11, 3..."


## [Debugging](https://docs.dask.org/en/latest/debugging.html)
Debugging may be challenging...
1. Run code without client 
2. Use Dashboard profiler
3. Verify integrity of DAG

### Corrupted DAG  
In this example we show that once the DAG is currupted you may need to reset the calculation

In [50]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,1011,Xavier,-0.409016,0.331355


In [51]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))



Is everything OK?

* Even if the function is corrected the DAG is corrupted

In [52]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))



We need to reset the dataframe

In [53]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)



Unnamed: 0_level_0,id,name,x,y,col
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2000-01-01 00:00:00,1000,Alice,-0.805901,-0.690125,
2000-01-01 00:00:01,974,Quinn,0.341908,0.012709,1.345898
