<h1 style="padding-top: 25px;padding-bottom: 25px;text-align: left; padding-left: 10px; background-color: #DDDDDD; 
    color: black;"> <img style="float: left; padding-right: 10px;" src="https://raw.githubusercontent.com/Harvard-IACS/2018-CS109A/master/content/styles/iacs.png" height="50px"> <a href='https://harvard-iacs.github.io/2021-AC215/' target='_blank'><strong><font color="#A41034">AC215: Advanced Practical Data Science, MLOps</font></strong></a></h1>

# **<font color="#A41034">Exercise 1 - Dask</font>**

**Harvard University**<br/>
**Fall 2021**<br/>
**Instructor:**
Pavlos Protopapas<br/>

<hr style="height:2pt">

## **<font color="#A41034">Setup Notebook</font>**

**Copy & setup Colab**

1) Select "File" menu and pick "Save a copy in Drive"

**Installs**

In [3]:
!pip install dask dask[dataframe] dask-image



**Imports**

In [4]:
import os
import requests
import zipfile
import tarfile
import shutil
import math
import json
import time
import sys
import numpy as np
import pandas as pd

# Dask
import dask
import dask.dataframe as dd
import dask.array as da
from dask.diagnostics import ProgressBar

**Utils**

Here are some util functions that we will be using for this exercise

In [5]:
def download_file(packet_url, base_path="", extract=False, headers=None):
  if base_path != "":
    if not os.path.exists(base_path):
      os.mkdir(base_path)
  packet_file = os.path.basename(packet_url)
  with requests.get(packet_url, stream=True, headers=headers) as r:
      r.raise_for_status()
      with open(os.path.join(base_path,packet_file), 'wb') as f:
          for chunk in r.iter_content(chunk_size=8192):
              f.write(chunk)
  
  if extract:
    if packet_file.endswith(".zip"):
      with zipfile.ZipFile(os.path.join(base_path,packet_file)) as zfile:
        zfile.extractall(base_path)
    else:
      packet_name = packet_file.split('.')[0]
      with tarfile.open(os.path.join(base_path,packet_file)) as tfile:
        tfile.extractall(base_path)

## **<font color="#A41034">Dataset</font>**

### **Load Data**

In [6]:
start_time = time.time()
download_file("https://github.com/dlops-io/datasets/releases/download/v1.0/Parking_Violations_Issued_-_Fiscal_Year_2017.csv.zip", base_path="datasets", extract=True)
execution_time = (time.time() - start_time)/60.0
print("Download execution time (mins)",execution_time)

Download execution time (mins) 0.7224647760391235


In [7]:
parking_violation_csv = os.path.join("datasets","Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

## Q1: Compute Pi with a Slowly Converging Series

Leibniz published one of the oldest known series in 1676.  While this is easy to understand and derive, it converges very slowly.
https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80 <br/>
$$\frac{\pi}{4} = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} ...$$

While this is a genuinely cruel way to compute the value of $\pi$, it’s a fun opportunity to use brute force on a problem instead of thinking.
Compute using at least four billion terms in this sequence. Compare your time taken with numpy and dask.  On my mac, with numpy this took 44 seconds and with dask it took 5.7 seconds. 

*Hint:* Use dask array

**Checking  1e9 * 4  terms with numpy**

If 1e9 * 4 fails, try 1e9 * 2 or increase memory 

In [8]:
# Your code here
start = time.time()

pi_computed = np.sum(1/np.arange(1, 2e9, 4) - 1/np.arange(3, 2e9, 4))
pi_computed += np.sum(1/np.arange(2e9 + 1, 4e9, 4) - 1/np.arange(2e9 + 3, 4e9, 4))
pi_computed += np.sum(1/np.arange(4e9 + 1, 6e9, 4) - 1/np.arange(4e9 + 3, 6e9, 4))
pi_computed += np.sum(1/np.arange(6e9 + 1, 8e9, 4) - 1/np.arange(6e9 + 3, 8e9, 4))

print(f"Time: {time.time() - start:.4f}s")

pi_computed *= 4

Time: 24.7762s


In [9]:
# Error 
error = np.abs(pi_computed-np.pi)

# Report Results
print(f'Pi real value = {np.pi:14.12f}')
print(f'Pi computed value = {pi_computed:14.12f}')
print(f'Error = {error:6.3e}')

Pi real value = 3.141592653590
Pi computed value = 3.141592653340
Error = 2.500e-10


**Checking  1e9 * 4  terms with Dask**

In [12]:
# Your code here
start = time.time()

step3_pi = da.sum(1/da.arange(1, 8e9, 4) - 1/da.arange(3, 8e9, 4)).compute()

print(f"Time: {time.time() - start:.4f}s")

step3_pi *= 4

Time: 16.4289s


In [13]:
error = np.abs(step3_pi - np.pi)

# Report Results
print(f'Pi real value = {np.pi:14.12f}')
print(f'Pi computed value = {step3_pi:14.12f}')
print(f'Error = {error:6.3e}')

Pi real value = 3.141592653590
Pi computed value = 3.141592653340
Error = 2.500e-10


## Filter Parking Tickets Dataset

According to the parking tickets data set documentation, the column called ‘Plate Type’ consists mainly of two different types, ‘PAS’ and ‘COM’; presumably for passenger and commercial vehicles, respectively. Maybe the rest are the famous parking tickets from the UN diplomats, who take advantage of diplomatic immunity not to pay their fines.

Create a filtered Dask DataFrame with only the commercial plates.
Persist it, so it is available in memory for future computations. Count the number of summonses in 2017 (i.e., Issue Year in 2016, 2017) issued to commercial plate types. Compute them as a percentage of the total data set. 

*Hint*: This is easy; it is only about 5-7 lines of code.

In [15]:
dict_1 = {'Summons Number': 'int64', 'Plate ID': 'object', 'Registration State': 'object', 'Plate Type': 'object',
 'Issue Date': 'object', 'Violation Code': 'int64', 'Vehicle Body Type': 'object', 'Vehicle Make': 'object',
 'Issuing Agency': 'object', 'Street Code1': 'int64', 'Street Code2': 'int64', 'Street Code3': 'int64',
 'Vehicle Expiration Date': 'int64', 'Violation Location': 'float64', 'Violation Precinct': 'int64', 'Issuer Precinct': 'int64',
 'Issuer Code': 'int64', 'Issuer Command': 'object', 'Issuer Squad': 'object', 'Violation Time': 'object',
 'Time First Observed': 'object', 'Violation County': 'object', 'Violation In Front Of Or Opposite': 'object', 'House Number': 'object',
 'Street Name': 'object', 'Intersecting Street': 'object', 'Date First Observed': 'int64', 'Law Section': 'int64',
 'Sub Division': 'object', 'Violation Legal Code': 'object', 'Days Parking In Effect    ': 'object', 'From Hours In Effect': 'object',
 'To Hours In Effect': 'object', 'Vehicle Color': 'object', 'Unregistered Vehicle?': 'float64', 'Vehicle Year': 'int64',
 'Meter Number': 'object', 'Feet From Curb': 'int64', 'Violation Post Code': 'object', 'Violation Description': 'object',
 'No Standing or Stopping Violation': 'float64', 'Hydrant Violation': 'float64', 'Double Parking Violation': 'float64'}

# This is to avoid the  DtypeWarning 
df = dd.read_csv(parking_violation_csv,  dtype=dict_1)
df.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,0,0,0,,0,0,0,,,0143A,,BX,,,ALLERTON AVE (W/B) @,BARNES AVE,0,1111,D,T,,,,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,0,0,0,,0,0,0,,,0400P,,BX,,,ALLERTON AVE (W/B) @,BARNES AVE,0,1111,D,T,,,,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,0,0,0,,0,0,0,,,0233P,,BX,,,SB WEBSTER AVE @ E 1,94TH ST,0,1111,C,T,,,,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,34330,34350,20180630,14.0,14,14,359594,T102,J,1120A,,NY,O,330.0,7th Ave,,0,408,l2,,Y,0700A,0700P,WH,,2007,,0,04,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,34310,34330,20170228,13.0,13,13,364832,T102,M,0555P,,NY,F,799.0,6th Ave,,0,408,h1,,Y,0700A,0700P,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [16]:
# Your code here
filtered_df = df[df["Plate Type"] == 'COM']
filtered_df.persist()

Unnamed: 0_level_0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
npartitions=33,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1
,int64,object,object,object,object,int64,object,object,object,int64,int64,int64,int64,float64,int64,int64,int64,object,object,object,object,object,object,object,object,object,int64,int64,object,object,object,object,object,object,float64,int64,object,int64,object,object,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
issue_date = dd.to_datetime(filtered_df['Issue Date']).dt.year
filtered_dates = issue_date[(issue_date == 2016) | (issue_date == 2017)].compute()
num_commercial_2017 = filtered_dates.index.size
pct_commercial = (num_commercial_2017/df.index.size).compute()

In [18]:
# Percentage relative to all the parking tickets in 2017
print(f'Number of NYC summonses with commercial plates in 2017 was {num_commercial_2017}')
print(f'Percentage {pct_commercial:5.2f}')

Number of NYC summonses with commercial plates in 2017 was 1838970
Percentage  0.17
