# Dask 101 - Dataframes Webinar

## 5 Reasons you should use Dask over Pandas

### Imort NYC flights dataset

In [None]:
# import urllib
# import tarfile

# print("- Downloading NYC Flights dataset... ", end='', flush=True)
# url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
# filename, headers = urllib.request.urlretrieve(url, 'nycflights.tar.gz')
# print("Done!", flush=True)



# # extract the .csv files from the tar file
# with tarfile.open(filename, mode='r:gz') as flights:
#             flights.extractall('data/')

In [None]:
import dask.dataframe as dd
import pandas as pd
pd.set_option('display.max_columns', None)
import os
import dask
from dask import delayed
import numpy as np

### Read data into Pandas Dataframe

In [None]:
%time 
pdf=pd.concat([pd.read_csv(os.path.join('data', 'nycflights', i)) for i in os.listdir(os.path.join('data', 'nycflights'))])


In [None]:
pdf.head()

### Read data into Dask Dataframe

In [None]:
%time 
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'))

#### Lazy Evaluation

In [None]:
ddf.tail()

In [None]:
%time 
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),dtype={'TailNum': str,'CRSElapsedTime': float,'Cancelled': bool})


### Partitioned Data

In [None]:
ddf.npartitions

### Pandas vs Dask - Basic Dataframe Operations Comparison

In [None]:
%time 
pdf['Day']=pdf['DayofMonth']
pdf['date']=pd.to_datetime(pdf[['Year', 'Month', 'Day']])
pdf['delay']=pdf['ActualElapsedTime']-pdf['CRSElapsedTime']
pdf['ArrDelay']=pdf['ArrDelay']**2
pdf['DepDelay']=pdf['DepDelay']**2

In [None]:
%time 
ddf['Day']=ddf['DayofMonth']
ddf['date']=dd.to_datetime(ddf[['Year', 'Month', 'Day']])
ddf['delay']=ddf['ActualElapsedTime']-ddf['CRSElapsedTime']
ddf['ArrDelay']=ddf['ArrDelay']**2
ddf['DepDelay']=ddf['DepDelay']**2

In [None]:
ddf.visualize()

### Dask Dataframe - Aggregate operations

In [None]:
%time pdf.DepDelay.max()

In [None]:
%time ddf.DepDelay.max().compute()

In [None]:
ddf.DepDelay.max().visualize()

### Dask Delayed

In [None]:
def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

x = inc(1)
y = inc(2)
z = add(x, y)

z

In [None]:
@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

x = inc(1)
y = inc(2)
z = add(x, y)

print(z)

print(z.compute())

z.visualize()


Reference
https://docs.dask.org/en/stable/delayed.html

### Dask Delayed with Dataframes

In [None]:
%time 
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),dtype={'TailNum': str,'CRSElapsedTime': float,'Cancelled': bool})
ddf['Day']=ddf['DayofMonth']

In [None]:
@dask.delayed
def get_date(v):
    return dd.to_datetime(v)

@dask.delayed
def get_square(v):
    return v**2

In [None]:
%time
delayed_object=dask.delayed(ddf.repartition(npartitions=5))
temp_delayed_object={}
temp_delayed_object['Date']=get_date(delayed_object[['Year', 'Month', 'Day']])
temp_delayed_object['delay']=dask.delayed(delayed_object['ActualElapsedTime']-delayed_object['CRSElapsedTime'])
temp_delayed_object['ArrDelaySquare']=dask.delayed(delayed_object['ArrDelay'].apply(get_square))
temp_delayed_object['DepDelaySquare']=dask.delayed(delayed_object['DepDelay'].apply(get_square))
delayed_object=delayed_object.assign(**temp_delayed_object)

In [None]:
%time
a=delayed_object.compute()

In [None]:
dask.visualize(delayed_object)

### Dask distributed processing

In [None]:
import dask.dataframe as dd
import pandas as pd
pd.set_option('display.max_columns', None)
import os
import dask
from dask import delayed
import numpy as np
import dask.array as da

In [None]:
from dask.distributed import Client
client = Client()
client

# from dask.distributed import Client, LocalCluster
# cluster = LocalCluster()  # Launches a scheduler and workers locally
# client = Client(cluster)  # Connect to distributed cluster and override default

In [None]:
%time 
ddf = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),dtype={'TailNum': str,'CRSElapsedTime': float,'Cancelled': bool})
ddf['Day']=ddf['DayofMonth']

In [None]:
def get_date(v):
    return dd.to_datetime(v)

def get_square(v):
    return dask.array.power(v,2)

def get_diff(v):
    return v.iloc[:,0]-v.iloc[:,1]

def add_columns(v):
    for idx,i in enumerate(v[1:]):
        v[0]['new_'+str(idx)]=i[0]
    return v[0]
    

In [None]:
client.restart()

In [None]:
%time


A=client.map(get_date,[ddf[['Year', 'Month', 'Day']]])
B=client.map(get_diff,[ddf[['ActualElapsedTime','CRSElapsedTime']]])
C=client.map(get_square,[ddf['ArrDelay']])
D=client.map(get_square,[ddf['DepDelay']])
Merge=client.submit(add_columns,[ddf,A,B,C,D])
Merge


In [None]:
Merge

In [None]:
Merge.result().head()

In [None]:
ddf.head()

### Submit delayed object to client

In [None]:
def exec_delayed(v):
    return v.compute()

In [None]:
r=client.submit(exec_delayed,delayed_object)

In [None]:
r._state

In [None]:
r.result()

### References

https://coderzcolumn.com/tutorials/python/dask-distributed-parallel-processing-in-python
https://youtu.be/07EiCpdhtDE
https://youtu.be/v7famjsXdUY

In [None]:
# @delayed(nout=1)
# def test(v):
#     return sum(v)

# test1=dask.delayed(test(dask.delayed([1,2,3])))

In [None]:
# test1.compute()

In [None]:
# # @dask.delayed(nout=1)
# def f(v):
#     return v[0]

# x = dask.delayed(f(dask.delayed([2,1])))
# x.compute()

In [None]:
# @dask.delayed(nout=1)
# def f(v):
#     return sum(v)

# x = dask.delayed(f(dask.delayed([1,2])))
# # x.compute(),y.compute()

In [None]:
# x.compute()

In [None]:
# @dask.delayed(nout=2)
# def makes_two():
#     return 1, 2

# out1, out2 = makes_two()
# out1.compute()