<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [None]:
#!python3 -m pip install "dask[complete]"

In [1]:
from dask.distributed import Client, LocalCluster
from dask import dataframe as dd
import pandas as pd
import sys
import time

sys.path.append("..")
from utils import my_print

In [3]:
N_REPS = 1
DATA_DIR = "../data/"
COMPLAINTS_NAME = "complaints_python_script.csv"
ZIP_NAME = "zip_zcta_python_script.csv"
OUTPUT_NAME = "311_reduced_python_script.csv"
USE_DASK = False
TESTING = False

# Setup general parametes
rename_dict = {  # We are only interested in some columns
    "Created Date": "date",
    "Complaint Type": "type",
    "Descriptor": "descriptor",
    "Incident Zip": "zip"
}
save_params = dict(index=False, na_rep="null") #, date_format="%m/%d/%Y %I:%M:%S %p")
read_params = dict(
        usecols=list(rename_dict.keys()),
        parse_dates=["Created Date"], 
        dtype={'Incident Zip': 'object'},
        low_memory=False,
)

# Get around different reading/merging functions later on
custom_read_func = dd.read_csv if USE_DASK else pd.read_csv
custom_merge_func = dd.merge if USE_DASK else pd.merge

# Dask/Pandas specific parameters
if USE_DASK:
    cluster = LocalCluster()
    client = Client(cluster)
    save_params["single_file"] = True
elif TESTING:
    read_params["nrows"] = 1000


all_times = []
for i in range(N_REPS):
    # Setup
    start_time = time.time()
    past_time = my_print(iteration=i, text="Start time epoch", 
                     past_time=start_time, use_duration=False)
    # Read data
    df = custom_read_func(DATA_DIR+COMPLAINTS_NAME, **read_params)
    # This is faster than reading it as a date
    df["Created Date"] = pd.to_datetime(df["Created Date"], format="%m/%d/%Y %H:%M:%S %p")
    past_time = my_print(iteration=i, text="Time taken reading data", past_time=past_time)
    
    if USE_DASK and TESTING:  # Dask doesn't have nrows parameter
        df = df.head(n=1000) 
    zips = custom_read_func(DATA_DIR+ZIP_NAME, dtype={'zip': 'object'})
    
    # log
    past_time = my_print(iteration=i, text="Time taken reading data", past_time=past_time)

    # Rename, filter valid dates, join and save.
    df = df.rename(columns=rename_dict)
    df.query("date >= '12/01/2010' and date < '11/01/2020'", inplace=True)
    merged = custom_merge_func(df, zips, on="zip", how="inner").drop(columns=["zip"])
    merged.to_csv(DATA_DIR+OUTPUT_NAME, **save_params)
    
    # Log
    past_time = my_print(iteration=i, text="Total time", past_time=start_time)
    all_times.append(int(past_time-start_time))

if USE_DASK:
    client.close()

Iteration 0: Start time epoch (s): 1607638107.2230413
Iteration 0: Time taken reading data (s): 3401
Iteration 0: Time taken reading data (s): 0
Iteration 0: Total time (s): 3555


In [6]:
all_times

[3555]

In [7]:
3401/3555

0.9566807313642757

In [14]:
# Uses 30 GB RAM

In [4]:
2*60 + 4

124

In [5]:
3*60 + 35

215

In [7]:
# Uses at most ~GB RAM
print(f"Dask took {1810/60:.2f} minutes")

Dask took 30.17 minutes
