# Intro to Dask Dataframes

Note that you might need to install this pipenv environment into jupyter to make it visible.

Steps are as follows:

1. `pipenv install` and `pipenv install --dev`
2. `python -m ipykernel install --user --name=dask-example`
3. Go to Jupyter -> Kernel -> Change Kernel -> dask-example

In [1]:
# A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index

import pandas as pd
import numpy as np
import datetime
import dask.dataframe as dd

In [2]:
# Create df of length 100 with random values and datetime index

df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list('ABCD'),
                  index=[datetime.datetime.today() - datetime.timedelta(days=x) for x in range(100)])

print(df.shape, "shape of df")

(100, 4) shape of df


In [3]:
df.head()

Unnamed: 0,A,B,C,D
2020-03-31 19:36:32.714912,11,87,5,40
2020-03-30 19:36:32.714921,69,13,83,49
2020-03-29 19:36:32.714923,70,46,9,82
2020-03-28 19:36:32.714924,1,45,99,89
2020-03-27 19:36:32.714925,21,73,20,65


In [4]:
ddf = dd.from_pandas(df, npartitions=5)  # create a dataframe with 5 partitions
print(ddf.shape, "shape of dask dataframe")

(Delayed('int-1d4558ea-2482-4d48-89f9-2c60dd68ddca'), 4) shape of dask dataframe


In [5]:
ddf.to_parquet("example")  # we can see that the output is 5 parts of a parquet file (check the output folder)
temp_ddf = dd.read_parquet("example")



In [6]:
temp_ddf

Unnamed: 0_level_0,A,B,C,D
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2019-12-23 19:36:32.715026,int64,int64,int64,int64
2020-01-12 19:36:32.715008,...,...,...,...
...,...,...,...,...
2020-03-12 19:36:32.714939,...,...,...,...
2020-03-31 19:36:32.714912,...,...,...,...


In [7]:
ddf_1 = dd.read_parquet("example/part.4.parquet")

In [8]:
df_1 = ddf_1.compute()  # calling compute returns a pandas dataframe
type(df_1)

pandas.core.frame.DataFrame

In [9]:
df_1.shape  # this shape is exactly 1/5 of the original dataset size, which is what we expect

(20, 4)

### Reading in Dataframes from S3

In [10]:
# you can also read in data directly from s3
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/2000.csv', usecols=cols,
                  storage_options={'anon': True})

# note you might need to pass in credentials if its a private bucket

In [11]:
df

Unnamed: 0_level_0,Year,Month,DayOfWeek,CRSDepTime,UniqueCarrier,DepDelay,Origin,Dest,Distance
npartitions=9,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,int64,int64,int64,int64,object,float64,object,object,int64
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


### Map partitions

Map partitions is a function specific to dask. This is applied to each partition (i.e. each different pandas dataframe). Map partitions is faster than dask apply

In [12]:
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)
def myadd(df, a, b=1):
    return df.x + df.y + a + b
res = ddf.map_partitions(myadd, 1, b=2)
res.dtype

dtype('float64')

In [13]:
res.compute()

0     5.0
1     7.0
2     9.0
3    11.0
4    13.0
dtype: float64

In [14]:
ddf

Unnamed: 0_level_0,x,y
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1
0,int64,float64
3,...,...
4,...,...
