<a href='https://ai.meng.duke.edu'> = <img align="left" style="padding-top:10px;" src=https://storage.googleapis.com/aipi_datasets/Duke-AIPI-Logo.png>

# Introduction to Dask

In [1]:
import os

import pandas as pd
import numpy as np
import dask.dataframe as dd

import warnings
warnings.filterwarnings("ignore")

In [2]:
# This downloads the necessary data files into the same directory where you have saved this notebook
# Run this before any other code cell

import urllib.request
from pathlib import Path
import os
import tarfile
path = Path()

# Dictionary of file names and download links
files = {'nycflights.tar':'https://storage.googleapis.com/aipi_datasets/nycflights.tar'}

# Download file(s)
for key,value in files.items():
    filename = path/key
    url = value
    # Download and unzip if it does not already exist
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url,filename)

        tar = tarfile.open(filename)
        tar.extractall(path=path)

In [7]:
# Set up the local cluster
from dask.distributed import Client

print(f"I have {os.cpu_count()} logical cores")

client = Client()
client

I have 8 logical cores


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:54172/status,

0,1
Dashboard: http://127.0.0.1:54172/status,Workers: 4
Total threads: 8,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54173,Workers: 4
Dashboard: http://127.0.0.1:54172/status,Total threads: 8
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:54185,Total threads: 2
Dashboard: http://127.0.0.1:54189/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:54177,
Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-lc6z_6w7,Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-lc6z_6w7

0,1
Comm: tcp://127.0.0.1:54184,Total threads: 2
Dashboard: http://127.0.0.1:54188/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:54178,
Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-kcgdxlle,Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-kcgdxlle

0,1
Comm: tcp://127.0.0.1:54187,Total threads: 2
Dashboard: http://127.0.0.1:54191/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:54179,
Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-dv9ze2x3,Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-dv9ze2x3

0,1
Comm: tcp://127.0.0.1:54186,Total threads: 2
Dashboard: http://127.0.0.1:54190/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:54176,
Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-5mh6x58f,Local directory: /Users/jjr10/Documents/Duke/Courses/AIPI510/class_materials/aipi510_repo/week11/dask-worker-space/worker-5mh6x58f


## Basic Dask DataFrames

In [8]:
# Create a Dask DF from pandas
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400), "b": list("abcaddbe" * 300)}, index=index)
ddf = dd.from_pandas(df, npartitions=10)
ddf

Unnamed: 0_level_0,a,b
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-01 00:00:00,int64,object
2021-09-11 00:00:00,...,...
...,...,...
2021-11-30 00:00:00,...,...
2021-12-09 23:00:00,...,...


In [9]:
# Check the index values covered by each partition
ddf.divisions

(Timestamp('2021-09-01 00:00:00', freq='H'),
 Timestamp('2021-09-11 00:00:00', freq='H'),
 Timestamp('2021-09-21 00:00:00', freq='H'),
 Timestamp('2021-10-01 00:00:00', freq='H'),
 Timestamp('2021-10-11 00:00:00', freq='H'),
 Timestamp('2021-10-21 00:00:00', freq='H'),
 Timestamp('2021-10-31 00:00:00', freq='H'),
 Timestamp('2021-11-10 00:00:00', freq='H'),
 Timestamp('2021-11-20 00:00:00', freq='H'),
 Timestamp('2021-11-30 00:00:00', freq='H'),
 Timestamp('2021-12-09 23:00:00', freq='H'))

In [10]:
# Access a particular partition
ddf.partitions[0]

Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-01,int64,object
2021-09-11,...,...


In [11]:
# Perform a computation
ddf.a.mean().compute()


1199.5

In [12]:
# Another example computation
ddf[ddf['b']=='e'].a.mean().compute()

1203.0

## NYC Flights Example 

In [13]:
# Set up Dask dataframe (note: this does not actually load the data yet)
ddf = dd.read_csv(
    os.path.join("nycflights", "*.csv"),
    parse_dates={"Date": [0, 1, 2]},
    dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
)

In [14]:
print(len(ddf))
ddf.head()

2611892


Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


In [15]:
%%time

# Compute mean and standard deviation of departure delay of non-canceled flights
non_cancelled = ddf[~ddf.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

CPU times: user 791 ms, sys: 147 ms, total: 938 ms
Wall time: 4.5 s


In [16]:
%%time

# Compute mean and standard deviation of departure delay of non-canceled flights
# This time caching the subset

non_cancelled = ddf[~ddf.Cancelled]
non_cancelled = non_cancelled.persist() #Cache the non-cancelled flights subset

mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

CPU times: user 511 ms, sys: 87.9 ms, total: 599 ms
Wall time: 2.39 s


In [23]:
%%time

# Which day of the week had the maximum and minimum average delay?

grouped = ddf.groupby("DayOfWeek").DepDelay.mean()
#grouped = grouped.persist()

# Compute the max
maxdelayday = grouped.idxmax()
maxdelayday_res = maxdelayday.compute()
print(maxdelayday_res)

# Compute the min
mindelayday = grouped.idxmin()
mindelayday_res = mindelayday.compute()
print(mindelayday_res)

5
6
CPU times: user 431 ms, sys: 70.9 ms, total: 502 ms
Wall time: 4.16 s


In [17]:
# Close the client and local cluster
client.close()