In [1]:
import os
import time, datetime
import math

import numpy as np
import pandas as pd

import missingno
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

mpl.style.use(['Solarize_Light2'])
print("Using style of 'Solarize_Light2' to plot")


# Plot the Figures Inline
%matplotlib inline

print('All traditional libs imported properly!')

Using style of 'Solarize_Light2' to plot
All traditional libs imported properly!


In [2]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import dask.delayed as delayed

print('Dask lib dependencies imported properly!')

Dask lib dependencies imported properly!


In [3]:
from functools import reduce

# **PROBLEM TRY TO ASK**
**What patterns can we find in the data that are correlated with increases or decreases in the number of parking tickets issued by the New York City parking authority?**

# **LOADING DATA**

1. Creating DataFrames from delimited text files and defining data schemas
2. Extracting data from a SQL relational database and manipulating it using Dask
3. Reading data from distributed filesystems (S3 and HDFS)
4. Working with data stored in Parquet format

One of the unique challenges that data scientists face is our tendency to
study data at rest, or data that wasn’t specifically collected for the purpose of
predictive modeling and analysis. This is quite different from a traditional
academic study in which data is carefully and thoughtfully collected.
Consequentially, you’re likely to come across a wide variety of storage media
and data formats throughout your career.

# **Reading data from text files**

Delimited text files come in many flavors, but all
share the common concept of using special characters called delimiters that
are used to divide data up into logical rows and columns.

Every delimited text file format has two types of delimiters: row delimiters
and column delimiters. A row delimiter is a special character that indicates
that you’ve reached the end of a row, and any additional data to the right of it
should be considered part of the next row. The most common row delimiter
is simply a newline character (\n) or a carriage return followed by a newline
character (\r\n). Delimiting rows by line is a standard choice because it
provides the additional benefit of breaking up the raw data visually and
reflects the layout of a spreadsheet.

Likewise, a column delimiter indicates the end of a column, and any data
to the right of it should be treated as part of the next column. Of all the
popular column delimiters out there, the comma (,) is the most frequently
used. In fact, delimited text files that use comma column delimiters have a
special file format named for it: comma-separated values or CSV for short.
Among other common options are pipe (|), tab, space, and semicolon.

Two additional attributes of a delimited text file that we haven’t discussed
yet include an optional header row and text qualifiers. A header row is simply
the use of the first row to specify names of columns. Text qualifiers are yet another type of special character used to denote that
the contents of the column is a text string. They can be very useful in
instances where the actual data is allowed to contain characters that are also
being used as row or column delimiters. This is a fairly common issue when
working with CSV files that contain text data, because commas normally
show up in text. Surrounding these columns with text qualifiers indicates
that any instances of the column or row delimiters inside the text qualifiers
should be ignored. 

In [4]:
fy14 = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv')
fy15 = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2015.csv')
fy16 = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2016.csv')
fy17 = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

In [5]:
fy17.tail(2)

  args2 = [_execute_task(a, cache) for a in args]


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
210393,1415514203,HGU9544,NY,PAS,11/15/2069,40,SUBN,JEEP,P,0,...,BROWN,0.0,2011,-,0,,,,,
210394,1415995370,GPP1608,NY,PAS,11/19/2069,21,SDN,TOYOT,S,38080,...,GRAY,0.0,2011,-,0,,,,,


By default, Dask assumes that your CSV files will have a
header row, and our file indeed has a header row. 

In [6]:
fy17.columns

Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation'],
      dtype='object')

In [7]:
fy14.columns

Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation', 'Latitude', 'Longitude', 'Comm

If you happen to take a look at the columns of any other DataFrame, such as
fy14 (Parking Tickets for 2014), you’ll notice that the columns are
different from the fy17 (Parking Tickets for 2017) DataFrame. It looks as
though the NYC government changed what data it collects about parking
violations in 2017. For example, the latitude and longitude of the violation
was not recorded prior to 2017, so these columns won’t be useful for
analyzing year-over-year trends (such as how parking violation “hotspots”
migrate throughout the city). If we simply concatenated the datasets together
as is, we would get a resulting DataFrame with an awful lot of missing
values. Before we combine the datasets, we should find the columns that all
four of the DataFrames have in common. Then we should be able to simply
union the DataFrames together to produce a new DataFrame that contains
all four years of data.

We could manually look at each DataFrame’s columns and deduce which
columns overlap, but that would be terribly inefficient. Instead, we’ll
automate the process by taking advantage of the DataFrames’ columns
attribute and Python’s set operations. 

In [6]:
from functools import reduce

In [7]:
columns = [set(fy14.columns),
           set(fy15.columns),
           set(fy16.columns),
           set(fy17.columns)]

In [8]:
columns[3]

{'Date First Observed',
 'Days Parking In Effect    ',
 'Double Parking Violation',
 'Feet From Curb',
 'From Hours In Effect',
 'House Number',
 'Hydrant Violation',
 'Intersecting Street',
 'Issue Date',
 'Issuer Code',
 'Issuer Command',
 'Issuer Precinct',
 'Issuer Squad',
 'Issuing Agency',
 'Law Section',
 'Meter Number',
 'No Standing or Stopping Violation',
 'Plate ID',
 'Plate Type',
 'Registration State',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Street Name',
 'Sub Division',
 'Summons Number',
 'Time First Observed',
 'To Hours In Effect',
 'Unregistered Vehicle?',
 'Vehicle Body Type',
 'Vehicle Color',
 'Vehicle Expiration Date',
 'Vehicle Make',
 'Vehicle Year',
 'Violation Code',
 'Violation County',
 'Violation Description',
 'Violation In Front Of Or Opposite',
 'Violation Legal Code',
 'Violation Location',
 'Violation Post Code',
 'Violation Precinct',
 'Violation Time'}

In [9]:
common_columns_for_check = list(reduce(lambda a, i: a.intersection(i), columns))

In [10]:
common_columns_for_check = sorted(common_columns_for_check)

In [11]:
len(common_columns_for_check)

43

In [12]:
common_columns_mine = list(columns[0].intersection(columns[1]).intersection(columns[2]).intersection(columns[3]))

In [13]:
common_columns_mine = sorted(common_columns_mine)

In [14]:
len(common_columns_mine)

43

In [15]:
common_columns_for_check == common_columns_mine

True

In [16]:
common_columns_mine

['Date First Observed',
 'Days Parking In Effect    ',
 'Double Parking Violation',
 'Feet From Curb',
 'From Hours In Effect',
 'House Number',
 'Hydrant Violation',
 'Intersecting Street',
 'Issue Date',
 'Issuer Code',
 'Issuer Command',
 'Issuer Precinct',
 'Issuer Squad',
 'Issuing Agency',
 'Law Section',
 'Meter Number',
 'No Standing or Stopping Violation',
 'Plate ID',
 'Plate Type',
 'Registration State',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Street Name',
 'Sub Division',
 'Summons Number',
 'Time First Observed',
 'To Hours In Effect',
 'Unregistered Vehicle?',
 'Vehicle Body Type',
 'Vehicle Color',
 'Vehicle Expiration Date',
 'Vehicle Make',
 'Vehicle Year',
 'Violation Code',
 'Violation County',
 'Violation Description',
 'Violation In Front Of Or Opposite',
 'Violation Legal Code',
 'Violation Location',
 'Violation Post Code',
 'Violation Precinct',
 'Violation Time']

In [17]:
common_columns_for_check

['Date First Observed',
 'Days Parking In Effect    ',
 'Double Parking Violation',
 'Feet From Curb',
 'From Hours In Effect',
 'House Number',
 'Hydrant Violation',
 'Intersecting Street',
 'Issue Date',
 'Issuer Code',
 'Issuer Command',
 'Issuer Precinct',
 'Issuer Squad',
 'Issuing Agency',
 'Law Section',
 'Meter Number',
 'No Standing or Stopping Violation',
 'Plate ID',
 'Plate Type',
 'Registration State',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Street Name',
 'Sub Division',
 'Summons Number',
 'Time First Observed',
 'To Hours In Effect',
 'Unregistered Vehicle?',
 'Vehicle Body Type',
 'Vehicle Color',
 'Vehicle Expiration Date',
 'Vehicle Make',
 'Vehicle Year',
 'Violation Code',
 'Violation County',
 'Violation Description',
 'Violation In Front Of Or Opposite',
 'Violation Legal Code',
 'Violation Location',
 'Violation Post Code',
 'Violation Precinct',
 'Violation Time']

In [18]:
fy17[common_columns_for_check].head(3)

Unnamed: 0,Date First Observed,Days Parking In Effect,Double Parking Violation,Feet From Curb,From Hours In Effect,House Number,Hydrant Violation,Intersecting Street,Issue Date,Issuer Code,...,Vehicle Year,Violation Code,Violation County,Violation Description,Violation In Front Of Or Opposite,Violation Legal Code,Violation Location,Violation Post Code,Violation Precinct,Violation Time
0,0,,,0,,,,BARNES AVE,07/10/2016,0,...,2001,7,BX,FAILURE TO STOP AT RED LIGHT,,T,,,0,0143A
1,0,,,0,,,,BARNES AVE,07/08/2016,0,...,2001,7,BX,FAILURE TO STOP AT RED LIGHT,,T,,,0,0400P
2,0,,,0,,,,94TH ST,08/23/2016,0,...,2004,5,BX,BUS LANE VIOLATION,,T,,,0,0233P


In [19]:
common_columns = list(reduce(lambda a, i: a.intersection(i), columns))

In [None]:
fy17[common_columns].columns

In [23]:
fy17['Feet From Curb']

Dask Series Structure:
npartitions=33
    int64
      ...
    ...  
      ...
      ...
Name: Feet From Curb, dtype: int64
Dask Name: getitem, 132 tasks

In [24]:
fy17[common_columns].head(3)

Unnamed: 0,Feet From Curb,Issuer Precinct,Street Name,Issuer Command,Issuer Code,Registration State,To Hours In Effect,Intersecting Street,Vehicle Year,Violation Precinct,...,No Standing or Stopping Violation,Violation Location,Violation Post Code,Hydrant Violation,Plate ID,Issue Date,Days Parking In Effect,Law Section,From Hours In Effect,Unregistered Vehicle?
0,0,0,ALLERTON AVE (W/B) @,,0,NY,,BARNES AVE,2001,0,...,,,,,GZH7067,07/10/2016,,1111,,
1,0,0,ALLERTON AVE (W/B) @,,0,NY,,BARNES AVE,2001,0,...,,,,,GZH7067,07/08/2016,,1111,,
2,0,0,SB WEBSTER AVE @ E 1,,0,NY,,94TH ST,2004,0,...,,,,,FZX9232,08/23/2016,,1111,,


In [20]:
fy14[common_columns].head(3)

  args2 = [_execute_task(a, cache) for a in args]


ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-----------------------+---------+----------+
| Column                | Found   | Expected |
+-----------------------+---------+----------+
| Issuer Squad          | object  | int64    |
| Unregistered Vehicle? | float64 | int64    |
| Violation Description | object  | float64  |
| Violation Legal Code  | object  | float64  |
| Violation Post Code   | object  | float64  |
+-----------------------+---------+----------+

The following columns also raised exceptions on conversion:

- Issuer Squad
  ValueError('cannot convert float NaN to integer')
- Violation Description
  ValueError("could not convert string to float: 'BUS LANE VIOLATION'")
- Violation Legal Code
  ValueError("could not convert string to float: 'T'")
- Violation Post Code
  ValueError("could not convert string to float: 'H -'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Issuer Squad': 'object',
       'Unregistered Vehicle?': 'float64',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Post Code': 'object'}

to the call to `read_csv`/`read_table`.

Looks like Dask ran into trouble when trying to read the fy14 data!
Thankfully, the Dask development team has given us some pretty detailedinformation in this error message about what happened. Five columns—
Issuer Squad, Unregistered Vehicle?, Violation Description, Violation Legal
Code, and Violation Post Code—failed to be read correctly because their
datatypes were not what Dask expected.

As we learned in chapter 2, Dask
uses random sampling to infer datatypes to avoid scanning the entire
(potentially massive) DataFrame. Although this usually works well, it can
break down when a large number of values are missing in a column or the
vast majority of data can be classified as one datatype (such as an integer),
but a small number of edge cases break that assumption (such as a random
string or two). When that happens, Dask will throw an exception once it
begins to work on a computation. In order to help Dask read our dataset
correctly, we’ll need to manually define a schema for our data instead of
relying on type inference. Before we get around to doing that, let’s review
what datatypes are available in Dask so we can create an appropriate schema
for our data.

### **DASK DATATYPES**

Unlike
most collections and objects in Python, Dask DataFrames use explicit typing
rather than duck typing. This means that all values contained in a column
must conform to the same datatype. As we saw already, Dask will throw
errors if values in a column are found that violate the column’s datatype.

Since Dask DataFrames consist of partitions made up of Pandas
DataFrames, which in turn are complex collections of NumPy arrays, Dask
sources its datatypes from NumPy. The NumPy library is a powerful and
important mathematics library for Python. It enables users to perform
advanced operations from linear algebra, calculus, and trigonometry. This
library is important for the needs of data science because it provides the
cornerstone mathematics for many statistical analysis methods and machine
learning algorithms in Python. 

Many of NumPy datatyes reflect the primitive types in Python. The
biggest difference is that NumPy datatypes can be explicitly sized with a
specified bit-width. For example, the int32 datatype is a 32-bit integer that
allows any integer between −2,147,483,648 and 2,147,483,647. Python, by
comparison, always uses the maximum bit-width based on your operating
system and hardware’s support. So, if you’re working on a computer with a64-bit CPU and running a 64-bit OS, Python will always allocate 64 bits of
memory to store an integer. The advantage of using smaller datatypes where
appropriate is that you can hold more data in RAM and the CPU’s cache at
one time, leading to faster, more efficient computations. This means that
when creating a schema for your data, you should always choose the smallest
possible datatype to hold your data. The risk, however, is that if a value
exceeds the maximum size allowed by the particular datatype, you will
experience overflow errors, so you should think carefully about the range
and domain of your data.

If none of the NumPy datatypes are appropriate for the kind of data you
have, a column can be stored as an object type, which represents any Python
object. This is also the datatype that Dask will default to when its type
inference comes across a column that has a mix of numbers and strings, or
when type inference cannot determine an appropriate datatype to use.
However, one common exception to this rule happens when you have a
column with a high percentage of missing data.

Would you really believe that a column called Violation Description should
be a floating-point number? Probably not! Typically, we can expect
description columns to be text, and therefore Dask should use an object
datatype. Then why did Dask’s type inference think the column holds 64-bit
floating-point numbers? It turns out that a large majority of records in this
DataFrame have missing violation descriptions. In the raw data, they are
simply blank. Dask treats blank records as null values when parsing files, and
by default fills in missing values with NumPy’s NaN (not a number) object
called np.nan. If you use Python’s built-in type function to inspect the
datatype of an object, it reports that np.nan is a float type. So, since Dask’s
type inference randomly selected a bunch of np.nan objects when trying to
infer the type of the Violation Description column, it assumed that the
column must contain floating-point numbers.

### **CREATING SCHEMAS FOR DASK DATAFRAMEs**

Oftentimes when working with a dataset, you’ll know each column’s
datatype, whether it can contain missing values, and its valid range of values
ahead of time. This information is collectively known as the data’s schema.
You’re especially likely to know the schema for a dataset if it came from a
relational database. Each column in a database table must have a well-knowndatatype. If you have this information ahead of time, using with Dask is as
easy as writing up the schema and applying it to the read_csv method.

However, sometimes you might
not know what the schema is ahead of time, and you’ll need to figure it out
on your own. Perhaps you’re pulling data from a web API which hasn’t been
properly documented or you’re analyzing a data extract and you don’t have
access to the data source. Neither of these approaches is ideal because they
can be tedious and time consuming, but sometimes you may really have no
other option. Here are two methods you can try:

    1. Guess-and-check
    2. Manually sample the data

The guess-and-check method isn’t complicated. If you have well-named
columns, such as Product Description, Sales Amount, and so on, you can try
to infer what kind of data each column contains using the names. If you run
into a datatype error while running a computation like the ones we’ve seen,
simply update the schema and start over again. 

The manual sampling method aims to be a bit more sophisticated but can
take more time up front since it involves scanning through some of the data
to profile it. However, if you’re planning to analyze the dataset anyways, it’s
not “wasted” time in the sense that you will be familiarizing yourself with the
data while creating the schema. Let's see how to do this:

In [21]:
dtype_tuples = [(x, np.str) for x in common_columns]

In [22]:
dtype_tuples

[('Street Name', str),
 ('Issue Date', str),
 ('Violation Description', str),
 ('Violation Post Code', str),
 ('Plate Type', str),
 ('Vehicle Make', str),
 ('From Hours In Effect', str),
 ('Violation Location', str),
 ('Date First Observed', str),
 ('Law Section', str),
 ('Double Parking Violation', str),
 ('Street Code1', str),
 ('Issuer Squad', str),
 ('Unregistered Vehicle?', str),
 ('Violation County', str),
 ('Hydrant Violation', str),
 ('Violation In Front Of Or Opposite', str),
 ('To Hours In Effect', str),
 ('Issuer Command', str),
 ('Time First Observed', str),
 ('Vehicle Color', str),
 ('Registration State', str),
 ('Feet From Curb', str),
 ('Issuer Precinct', str),
 ('Violation Time', str),
 ('Violation Precinct', str),
 ('House Number', str),
 ('Plate ID', str),
 ('Violation Code', str),
 ('No Standing or Stopping Violation', str),
 ('Street Code2', str),
 ('Vehicle Body Type', str),
 ('Summons Number', str),
 ('Issuer Code', str),
 ('Vehicle Expiration Date', str),
 ('Issu

In [23]:
dtypes = dict(dtype_tuples)

In [24]:
dtypes

{'Street Name': str,
 'Issue Date': str,
 'Violation Description': str,
 'Violation Post Code': str,
 'Plate Type': str,
 'Vehicle Make': str,
 'From Hours In Effect': str,
 'Violation Location': str,
 'Date First Observed': str,
 'Law Section': str,
 'Double Parking Violation': str,
 'Street Code1': str,
 'Issuer Squad': str,
 'Unregistered Vehicle?': str,
 'Violation County': str,
 'Hydrant Violation': str,
 'Violation In Front Of Or Opposite': str,
 'To Hours In Effect': str,
 'Issuer Command': str,
 'Time First Observed': str,
 'Vehicle Color': str,
 'Registration State': str,
 'Feet From Curb': str,
 'Issuer Precinct': str,
 'Violation Time': str,
 'Violation Precinct': str,
 'House Number': str,
 'Plate ID': str,
 'Violation Code': str,
 'No Standing or Stopping Violation': str,
 'Street Code2': str,
 'Vehicle Body Type': str,
 'Summons Number': str,
 'Issuer Code': str,
 'Vehicle Expiration Date': str,
 'Issuing Agency': str,
 'Vehicle Year': str,
 'Intersecting Street': str,
 '

First we need to build a dictionary that maps column names to datatypes.
This must be done because the dtype argument that we’ll feed this object
into later expects a dictionary type. To do that, we first walk
through the common_columns list that we made earlier to hold all of the
column names that can be found in all four DataFrames. We transform each
column name into a tuple containing the column name and the np.str
datatype, which represents strings. On the second line, we take the list of
tuples and convert them into a dict, now that we’ve constructed a generic schema, we can apply it to
the read_csv function to use the schema to load the fy14 data into a
DataFrame.

In [25]:
fy14 = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv', dtype = dtypes)

In [26]:
fy14[common_columns].head(3)

Unnamed: 0,Street Name,Issue Date,Violation Description,Violation Post Code,Plate Type,Vehicle Make,From Hours In Effect,Violation Location,Date First Observed,Law Section,...,Issuer Code,Vehicle Expiration Date,Issuing Agency,Vehicle Year,Intersecting Street,Meter Number,Violation Legal Code,Sub Division,Days Parking In Effect,Street Code3
0,W 175 ST,08/04/2013,,,PAS,AUDI,ALL,33,0,408,...,921043,20140831,P,2013,,-,,F1,BBBBBBB,21190
1,W 177 ST,08/04/2013,,,COM,FORD,ALL,33,0,408,...,921043,20140430,P,2012,,-,,C,BBBBBBB,40404
2,W 163 ST,08/05/2013,,,COM,CHEVR,ALL,33,0,408,...,921043,20140228,P,0,,-,,F7,BBBBBBB,13610


This time we specified the dtype argument and passed in our
schema dictionary. What happens under the hood is Dask will disable type
inference for the columns that have matching keys in the dtype dictionary
and use the explicitly specified types instead. While it’s perfectly reasonable
to include only the columns you want to change, **it’s best to not rely on
Dask’s type inference at all whenever possible**. Here I’ve shown you how to
create an explicit schema for all columns in a DataFrame, and **I encourage
you to make this a regular practice when working with big datasets**. With this
particular schema, we’re telling Dask to just assume that all of the columns
are strings. Now if we try to view the first five rows of the DataFrame again,
using fy14[common_columns].head(), Dask doesn’t throw an error message!
But we’re not done yet. We now need to have a look at each column and pick
a more appropriate datatype (if possible) to maximize efficiency. Let’s have a
look at the Vehicle Year column.

In [27]:
fy14.dtypes

Summons Number                        object
Plate ID                              object
Registration State                    object
Plate Type                            object
Issue Date                            object
Violation Code                        object
Vehicle Body Type                     object
Vehicle Make                          object
Issuing Agency                        object
Street Code1                          object
Street Code2                          object
Street Code3                          object
Vehicle Expiration Date               object
Violation Location                    object
Violation Precinct                    object
Issuer Precinct                       object
Issuer Code                           object
Issuer Command                        object
Issuer Squad                          object
Violation Time                        object
Time First Observed                   object
Violation County                      object
Violation 

In [31]:
fy14['Vehicle Year'].dtype

dtype('O')

In [32]:
fy14['Vehicle Year'].unique().head(10)

0    2013
1    2012
2       0
3    2010
4    2011
5    2001
6    2005
7    1998
8    1995
9    2003
Name: Vehicle Year, dtype: object

It looks like they are all integers that would fit
comfortably in the uint16 datatype. uint16 is the most appropriate because
years can’t be negative values, and these years are too large to be stored in
uint8 (which has a maximum size of 255). If we had seen any letters or
special characters, we would not need to proceed any further with analyzing
this column. The string datatype we had already selected would be the only
datatype suitable for the column.

One thing to be careful about is that a sample of 10 unique values might
not be a sufficiently large enough sample size to determine that there aren’t
any edge cases you need to consider. You could use .compute() instead of
.head() to bring back all the unique values, but this might not be a good idea
if the particular column you’re looking at has a high degree of uniqueness to
it (such as a primary key or a high-dimensional category). The range of 10–
50 unique samples has served me well in most cases, but sometimes you will
still run into edge cases where you will need to go back and tweak your
datatypes.

Since we’re thinking an integer datatype might be appropriate for this
column, we need to check one more thing: Are there any missing values in
this column? As you learned earlier, Dask represents missing values with
np.nan, which is considered to be a float type object. Unfortunately, np.nan
cannot be cast or coerced to an integer uint16 datatype. In the next chapter
we will learn how to deal with missing values, but for now if we come across
a column with missing values, we will need to ensure that the column will
use a datatype that can support the np.nan object. This means that if the
Vehicle Year column contains any missing values, we’ll be required to use a
float32 datatype and not the uint16 datatype we originally thought
appropriate because uint16 is unable to store np.nan.

In [35]:
fy14['Vehicle Year'].isna().sum().compute()

1

In [36]:
with ProgressBar():
    print(fy14['Vehicle Year'].isnull().values.any().compute())

[########################################] | 100% Completed |  1min  4.2s
True


In [37]:
fy14.loc[fy14['Vehicle Year'] == np.nan].compute()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA


In [38]:
%%time
fy14_pdf = pd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv')



Wall time: 54.7 s


In [42]:
fy14_pdf.loc[fy14_pdf['Vehicle Year'] == np.nan]

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA


In [43]:
fy14_pdf['Vehicle Year'].isna().sum()

1

In [44]:
fy14_pdf['Vehicle Year'].isna()

0          False
1          False
2          False
3          False
4          False
           ...  
9100273    False
9100274    False
9100275    False
9100276    False
9100277    False
Name: Vehicle Year, Length: 9100278, dtype: bool

In [45]:
np.where(fy14_pdf['Vehicle Year'].isna())

(array([6096292], dtype=int64),)

In [46]:
fy14_pdf.loc[6096292, 'Vehicle Year']

nan

In [47]:
fy14_pdf.shape

(9100278, 51)

Since we have missing
values in the Vehicle Year column, we must use the float32 datatype for the
column instead of uint16.

Now, we should repeat the process for the remaining 42 columns.

In this particular
instance, we could also use the data dictionary posted on the Kaggle webpage
(at https://www.kaggle.com/new-york-city/nyc-parking-tickets/data) to help
speed along this process.

In [28]:
dtypes = {'Date First Observed': np.str,
          'Days Parking In Effect ': np.str,
          'Double Parking Violation': np.str,
          'Feet From Curb': np.float32,
          'From Hours In Effect': np.str,
          'House Number': np.str,
          'Hydrant Violation': np.str,
          'Intersecting Street': np.str,
          'Issue Date': np.str,
          'Issuer Code': np.float32,
          'Issuer Command': np.str,
          'Issuer Precinct': np.float32,
          'Issuer Squad': np.str,
          'Issuing Agency': np.str,
          'Law Section': np.float32,
          'Meter Number': np.str,
          'No Standing or Stopping Violation': np.str,
          'Plate ID': np.str,
          'Plate Type': np.str,
          'Registration State': np.str,
          'Street Code1': np.uint32,
          'Street Code2': np.uint32,
          'Street Code3': np.uint32,
          'Street Name': np.str,
          'Sub Division': np.str,
          'Summons Number': np.uint32,
          'Time First Observed': np.str,
          'To Hours In Effect': np.str,
          'Unregistered Vehicle?': np.str,
          'Vehicle Body Type': np.str,
          'Vehicle Color': np.str,
          'Vehicle Expiration Date': np.str,
          'Vehicle Make': np.str,
          'Vehicle Year': np.float32,
          'Violation Code': np.uint16,
          'Violation County': np.str,
          'Violation Description': np.str,
          'Violation In Front Of Or Opposite': np.str,
          'Violation Legal Code': np.str,
          'Violation Location': np.str,
          'Violation Post Code': np.str,
          'Violation Precinct': np.float32,
          'Violation Time': np.str,
         }

dtypes now is the final schema for the NYC Parking Ticket data. Let’s
use it to reload all four of the DataFrames, then union all four years of data
together into a final DataFrame.

In [29]:
%%time
fy14_ddf = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv', dtype = dtypes, usecols = common_columns)
fy15_ddf = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2015.csv', dtype = dtypes, usecols = common_columns)
fy16_ddf = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2016.csv', dtype = dtypes, usecols = common_columns)
fy17_ddf = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', dtype = dtypes, usecols = common_columns)

Wall time: 359 ms


In [30]:
fy14_ddf.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
61642,3711182958,193YUR,CT,PAS,06/25/2014,37,SUBN,CHEVR,T,40930,...,BLACK,,0.0,304-3007,0.0,10 3,37-Expired Muni Meter,,,
61643,3711182982,FPN7411,NY,PAS,06/25/2014,20,4DSD,CHEVR,T,40930,...,RD,,0.0,,0.0,10 3,20A-No Parking (Non-COM),,,


In [36]:
fy15_ddf.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
194060,3749343767,97720MD,NY,COM,06/03/2015,51,DUMP,PETER,T,10540,...,WHITE,,1995.0,,0.0,K 41,51-Sidewalk,,,
194061,3389998556,73040KA,NY,COM,06/16/2015,69,VAN,FORD,T,34090,...,WH,,2006.0,,0.0,03 6,69-Failure to Disp Muni Recpt,,,


In [31]:
fy16_ddf.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
189616,4064533668,68718MG,NY,COM,06/08/2016,82,PICK,DODGE,T,0,...,WH,,2015.0,,0.0,04 2,82-Unaltered Commerc Vehicle,,,
189617,4064533680,FZX4974,NY,PAS,06/08/2016,38,4DSD,HONDA,T,0,...,BK,,2013.0,,0.0,04 2,38-Failure to Display Muni Rec,,,


In [38]:
fy17_ddf.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
210393,1415514203,HGU9544,NY,PAS,11/15/2069,40,SUBN,JEEP,P,0,...,BROWN,0,2011.0,-,0.0,,,,,
210394,1415995370,GPP1608,NY,PAS,11/19/2069,21,SDN,TOYOT,S,38080,...,GRAY,0,2011.0,-,0.0,,,,,


Instead of loading four separate files into four separate DataFrames,
now loading all CSV files contained in the folder
into a single DataFrame by using the * wildcard. Dask provides this for
convenience since it’s common to split large datasets into multiple files,
especially on distributed filesystems.

As before, we’re passing the final
schema into the dtype argument, and we’re now also passing the list of
columns we want to keep into the usecols argument. usecols takes a list of
column names and drops any columns from the resulting DataFrame that
aren’t specified in the list.

In [33]:
%%time
ddf = dd.read_csv('*.csv', dtype = dtypes, usecols = common_columns)
# ddf = ddf.compute()

Wall time: 410 ms


In [34]:
ddf

Unnamed: 0_level_0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
npartitions=142,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1
,uint32,object,object,object,object,uint16,object,object,object,uint32,uint32,uint32,object,object,float32,float32,float32,object,object,object,object,object,object,object,object,object,object,float32,object,object,object,object,object,object,object,float32,object,float32,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [44]:
ddf.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
210393,1415514203,HGU9544,NY,PAS,11/15/2069,40,SUBN,JEEP,P,0,...,BROWN,0,2011.0,-,0.0,,,,,
210394,1415995370,GPP1608,NY,PAS,11/19/2069,21,SDN,TOYOT,S,38080,...,GRAY,0,2011.0,-,0.0,,,,,


Should load all 4 sets and have a 42.3 million obs, but cannot do this on my local pc

In [67]:
%%time
ddf_2 = dd.concat([fy14_ddf, fy15_ddf, fy16_ddf, fy17_ddf], interleave_partitions = True)

Wall time: 84.8 ms


In [73]:
ddf_2.tail(2)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
210393,1415514203,HGU9544,NY,PAS,11/15/2069,40,SUBN,JEEP,P,0,...,BROWN,0,2011.0,-,0.0,,,,,
210394,1415995370,GPP1608,NY,PAS,11/19/2069,21,SDN,TOYOT,S,38080,...,GRAY,0,2011.0,-,0.0,,,,,


usecols is an interesting argument because if you look at the Dask API
documentation, it’s not listed. It might not be immediately obvious why this
is, but it’s because the argument comes from Pandas. Since each partition of
a Dask DataFrame is a Pandas DataFrame, you can pass along any Pandas
arguments through the *args and **kwargs interfaces and they will control
the underlying Pandas DataFrames that make up each partition. This
interface is also how you can control things like which column delimiter
should be used, whether the data has a header or not, and so on. The Pandas
API documentation for read_csv and its many arguments can be found at
http://pandas.pydata.org/pandasdocs/stable/generated/pandas.read_csv.html.

# **Reading data from relational databases(Done on Ubuntu+Docker)**

Reading data from a relational database system (RDBMS) into Dask is fairly
easy. In fact, you’re likely to find that the most tedious part of interfacing
with RDBMSs is setting up and configuring your Dask environment to do so.
Because of the wide variety of RDBMSs used in production environments, we
can’t cover the specifics for each one here. But, a substantial amount of
documentation and support is available online for the specific RDBMS you’re
working with. The most important thing to be aware of is that when using
Dask in a multi-node cluster, your client machine is not the only machine
that will need access to the database. Each worker node needs to be able to
access the database server, so it’s important to install the correct software
and configure each node in the cluster to be able to do so.

Dask uses the **SQL Alchemy** library to interface with RDBMSs, and I
recommend using the **pyodbc** library to manage your ODBC drivers. This
means you will need to install and configure SQL Alchemy, pyodbc, and the
ODBC drivers for your specific RDBMS on each machine in your cluster for
Dask to work correctly. To learn more about SQL Alchemy, you can check out
www.sqlalchemy.org/library.html. Likewise, you can learn more about
pyodbc at https://github.com/mkleehammer/pyodbc/wiki.

In [10]:
import pyodbc

In [31]:
pyodbc.drivers()

['ODBC Driver 17 for SQL Server']

In [32]:
[item for item in pyodbc.drivers()][-1]

'ODBC Driver 17 for SQL Server'

In [33]:
username = 'sa'
password = 'hidden'
hostname = 'localhost'
database_name = 'SARS'
odbc_driver = 'ODBC+Driver+17+for+SQL+Server'

In [34]:
connection_string = 'mssql+pyodbc://{0}:{1}@{2}/{3}?driver={4}'.format(username, password, hostname, database_name, odbc_driver)

In [35]:
data = dd.read_sql_table('Inventory ', connection_string, index_col = 'id')

In [36]:
data.head()



Unnamed: 0_level_0,name,quantity
id,Unnamed: 1_level_1,Unnamed: 2_level_1
1,banana,150
2,orange,154


The last line demonstrates how to use
the read_sql_table function to connect to the database and create the
DataFrame. The first argument is the name of the database table you want to
query, the second argument is the connection string, and the third argument
is the column to use as the DataFrame’s index. These are the three required
arguments for this function to work.

In [37]:
data

Unnamed: 0_level_0,name,quantity
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
1.0,object,int64
2.0,...,...


You should be aware of a few important assumptions:
   1. First, concerning datatypes, you might think that Dask gets datatype
information directly from the database server since the database has a
defined schema already. Instead, Dask samples the data and infers datatypes
just like it does when reading a delimited text file. However, Dask
sequentially reads the first five rows from the table instead of randomly
sampling data across the dataset. Because databases indeed have a welldefined schema, Dask’s type inference is much more reliable when reading
data from an RDBMS versus a delimited text file. However, it’s still not
perfect. Because of the way data might be sorted, edge cases can come up
that cause Dask to choose incorrect datatypes.For example, a string column
might have some rows where the strings contain only numbers (“1456,”
“2986,” and so on.) If the data is sorted in such a way that only these
numeric-like strings appear in the sample Dask takes when inferring
datatypes, it may incorrectly assume the column should be an integer
datatype instead of a string datatype. 

   2. The second assumption is how the data should be partitioned. If the
index_col (currently set to 'Summons Number') is a numeric or date/time
datatype, Dask will automatically infer boundaries and partition the data
based on a 256 MB block size (which is larger than read_csv’s 64 MB block
size). However, if the index_col is not a numeric or date/time datatype, you
must either specify the number of partitions or the boundaries to partition
the data by.

For example: Even partitioning on a non-numeric or date/time index as following:

In [52]:
data = dd.read_sql_table('Inventory', connection_string, index_col = 'quantity', npartitions = 4)

In [53]:
data

Unnamed: 0_level_0,id,name
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1
150.0,int64,object
151.0,...,...
152.0,...,...
153.0,...,...
154.0,...,...


OR: 
Custom partitioning on a non-numeric or date/time index

In [41]:
partition_boundaries = sorted(['banana', 'orange'])

In [43]:
data = dd.read_sql_table('Inventory', connection_string, index_col = 'name', divisions = partition_boundaries)

In [44]:
data

Unnamed: 0_level_0,id,quantity
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
banana,int64,int64
orange,...,...


   3. The third assumption that Dask makes when you pass only the minimum
required parameters is that you want to select all columns from the table.
You can limit the columns you get back using the columns argument, which
behaves similarly to the usecols argument in read_csv. While you are
allowed to use SQL Alchemy expressions in the argument, I recommend that
you avoid offloading any computations to the database server, since you lose
the advantages of parallelizing that computation that Dask gives you.

In [54]:
column_filter = ['name']

In [55]:
data = dd.read_sql_table('Inventory', connection_string, index_col='id', columns = column_filter)

In [57]:
data.head()



Unnamed: 0_level_0,name
id,Unnamed: 1_level_1
1,banana
2,orange


   4. The fourth and final assumption made by providing the minimum
arguments is the schema selection. When I say “schema” here, I’m not
referring to the datatypes used by the DataFrame; I’m referring to the
database schema object that RDBMSs use to group tables into logical clusters
(such as dim/fact in a data warehouse or sales, hr, and so on, in a
transactional database). If you don’t provide a schema, the database driver
will use the default for the platform. For SQL Server, this results in Dask
looking for the Inventory table in the dbo schema. If we had put the table in adifferent schema, perhaps one called chapterFour, we would receive a “table
not found” error.

Passing
the schema name into the schema argument will cause Dask to use the
provided database schema rather than the default.

Like read_csv, Dask allows you to forward along arguments to the
underlying calls to the Pandas read_sql function being used at the partition
level to create the Pandas DataFrames. We’ve covered all the most important
functions here, but if you need an extra degree of customization, have a look
at the API documentation for the Pandas read_sql function. All its
arguments can be manipulated using the *args and **kwargs interfaces
provided by Dask DataFrames.

# **Reading data from distributed filesystems (HDFS and S3)**

While it’s very likely that many datasets you’ll come across throughout your
work will be stored in relational databases, powerful alternatives are rapidly
growing in popularity. Most notable are the developments in distributed
filesystem technologies from 2006 onward. Powered by technologies like
Apache Hadoop and Amazon’s Simple Storage System (or S3 for short),
distributed filesystems bring the same benefits to file storage that distributed
computing brings to data processing: increased throughput, scalability, and
robustness. Using a distributed computing framework alongside a distributed
filesystem technology is a harmonious combination: in the most advanced
distributed filesystems, such as the Hadoop Distributed File System (HDFS),
nodes are aware of data locality, allowing computations to be shipped to the
data rather than the data shipped to the compute resources. This saves a lot
of time and back-and-forth communication over the network.

why keeping data isolated so a single node can have some performance consequences?
A significant bottleneck is caused by the need to chunk up and ship data to
the other nodes in the cluster. Under this configuration, when Dask reads in
the data, it will partition the DataFrame as usual, but the other worker nodes
can’t do any work until a partition of data is sent to them. Because it takes
some time to transfer these 64 MB chunks over the network, the total
computation time will be increased by the time it takes to ship data back and
forth between the node that has the data and the other workers. This
becomes even more problematic if the size of the cluster grows by anysignificant amount. If we had several hundred (or more) worker nodes vying
for chunks of data all at once, the networking stack on the data node could
easily get saturated with requests and slow to a crawl. Both of these
problems can be mitigated by using a distributed filesystem.

Instead of creating a bottleneck by holding data on only one node, the
distributed filesystem chunks up data ahead of time and spreads it across
multiple machines. It’s standard practice in many distributed filesystems to
store redundant copies of chunks/partitions both for reliability and
performance. From the perspective of reliability, storing each partition in
triplicate (which is a common default configuration) means that two
separate machines would have to fail before any data loss occurs. The
probability of two machines failing in a short amount of time is much lower
than the probability of one machine failing, so it adds an extra layer of safety
at a nominal cost of additional storage.
From the performance perspective, spreading the data out across the
cluster makes it more likely that a node containing the data will be available
to run a computation when requested. Or, in the event that all worker nodes
that hold that partition are already busy, one of them can ship the data to
another worker node. In this case, spreading out the data avoids any single
node getting saturated by requests for data. If one node is busy serving up a
bunch of data, it can offload some of those requests to other nodes that hold
the requested data. 

The node controlling the orchestration of the distributed computation (called
the driver) knows that the data it wants to process is available in a few
locations because the distributed filesystem maintains a catalogue of the
data held within the system. It will first ask the machines that have the data
locally whether they’re busy or not. If one of the nodes is not busy, the driver
will instruct the worker node to perform the computation. If all the nodes are
busy, the driver can either choose to wait until one of the worker nodes is
free, or instruct another free worker node to get the data remotely and run
the computation.

HDFS and S3 are two of the most popular distributed
filesystems, but they have one key difference for our purposes: HDFS is
designed to allow computations to run on the same nodes that serve up data,
and S3 is not. Amazon designed S3 as a web service dedicated solely to file
storage and retrieval. There’s absolutely no way to execute application code
on S3 servers. This means that when you work with data stored in S3, you
will always have to transmit partitions from S3 to a Dask worker node in
order to process it. 

### **Reading data from HDFS**

Above we have a read_csv call that should look very familiar by now.
In fact, the only thing that’s changed is the file path. Prefixing the file path
with hdfs:// tells Dask to look for the files on an HDFS cluster instead of the
local filesystem, and localhost indicates that Dask should query the local
HDFS NameNode for information on the whereab

All the arguments for read_csv that you learned before can still be used
here. In this way, Dask makes it extremely easy to work with HDFS. The only
additional requirement is that you install the hdfs3 library on each of your
Dask workers. This library allows Dask to communicate with HDFS;
therefore, this functionality won’t work if you haven’t installed the package

### **Reading data from S3**

Above read_csv call is (again) almost exactly the same as listing4.17. This time, however, we’ve prefixed the file path with s3:// to tell Dask
that the data is located on an S3 filesystem, and my-bucket lets Dask know to
look for the files in the S3 bucket associated with your AWS account named
“my-bucket”.

In order to use the S3 functionality, you must have the s3fs library
installed on each Dask worker. Like hdfs3, this library can be installed simply
through pip or conda (from the conda-forge channel). The final requirement
is that each Dask worker is properly configured for authenticating with S3.
s3fs uses the boto library to communicate with S3. You can learn more about
configuring boto at
http://boto.cloudhackers.com/en/latest/getting_started.html. The most
common S3 authentication configuration consists of using the AWS Access
Key and AWS Secret Access Key. Rather than injecting these keys in your
code, it’s a better idea to set these values using environment variables or a
configuration file. Boto will check both the environment variables and the
default configuration paths automatically, so there’s no need to pass
authentication credentials directly to Dask. Otherwise, as with using HDFS,
the call to read_csv allows you to do all the same things as if you were
operating on a local filesystem. Dask really makes it easy to work with
distributed filesystems!

Following we try an open public S3 bucket to fetch data:

In [3]:
taxi_data = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2018-04.csv',
                        storage_options = {'anon': True, 'use_ssl': False})

In [4]:
taxi_data

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
npartitions=13,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
,int64,object,object,int64,float64,int64,object,int64,int64,int64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [6]:
taxi_data = taxi_data.compute()

In [7]:
taxi_data

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2018-04-01 00:22:20,2018-04-01 00:22:26,1,0.00,1,N,145,145,2,2.5,0.5,0.5,0.00,0.0,0.3,3.80
1,1,2018-04-01 00:47:37,2018-04-01 01:08:42,1,6.70,1,N,152,90,2,22.5,0.5,0.5,0.00,0.0,0.3,23.80
2,1,2018-04-01 00:02:13,2018-04-01 00:17:52,2,4.10,1,N,239,158,1,15.5,0.5,0.5,3.35,0.0,0.3,20.15
3,1,2018-04-01 00:46:49,2018-04-01 00:52:05,1,0.70,1,N,90,249,1,5.5,0.5,0.5,1.35,0.0,0.3,8.15
4,1,2018-04-01 00:19:04,2018-04-01 00:19:09,1,0.00,1,N,145,145,2,2.5,0.5,0.5,0.00,0.0,0.3,3.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
603234,1,2018-04-30 23:15:20,2018-04-30 23:32:58,1,3.60,1,N,148,112,1,14.5,0.5,0.5,3.15,0.0,0.3,18.95
603235,2,2018-04-30 23:02:02,2018-04-30 23:03:37,5,0.01,1,N,151,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30
603236,2,2018-04-30 23:38:18,2018-04-30 23:44:57,1,1.62,1,N,186,125,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56
603237,2,2018-04-30 23:07:08,2018-04-30 23:23:04,1,6.36,1,N,261,162,2,20.0,0.5,0.5,0.00,0.0,0.3,21.30


# **Reading data in Parquet format**

CSV and other delimited text files are great for their simplicity and
portability, but they aren’t really optimized for the best performance,
especially when performing complex data operations such as sorts, merges,
and aggregations. While a wide variety of file formats attempt to increase
efficiency in many different ways, with mixed results, one of the more recent
high-profile file formats is Apache Parquet. Parquet is a high-performance
columnar storage format jointly developed by Twitter and Cloudera that was
designed with use on distributed filesystems in mind. Its design brings
several key advantages to the table over text-based formats: more efficient
use of IO, better compression, and strict typing.

 Figure 4.10 shows the
difference in how data is stored in Parquet format versus a row-oriented
storage scheme like CSV.

With row-oriented formats, values are stored on disk and in memory
sequentially based on the row position of the data. Consider what we’d have
to do if we wanted to perform an aggregate function over x, such as finding
the mean. To collect all the values of x, we’d have to scan over 10 values in
order to get the 4 values we want. This means we spend more time waiting
for IO completion just to throw away over half of the values read from disk.
Compare that with the columnar format: in that format, we’d simply grab the
sequential chunk of x values and have all four values we want. This seeking
operation is much faster and more efficient.

Another significant advantage of applying column-oriented chunking of
the data is that the data can now be partitioned and distributed by column.
This leads to much faster and more efficient shuffle operations, since only
the columns that are necessary for an operation can be transmitted over the
network instead of entire rows.

Finally, efficient compression is also a major advantage of Parquet. With
column-oriented data, it’s possible to apply different compression schemes
to individual columns so the data becomes compressed in the most efficient
way possible. Python’s Parquet library supports many of the popular
compression algorithms such as gzip, lzo, and snappy.

To use Parquet with Dask, you need to make sure you have the fastparquet
or pyarrow library installed, both of which can be installed either via pip or
conda (conda-forge). I would generally recommend using pyarrow over
fastparquet, because it has better support for serializing complex nested data
structures. You can also install the compression libraries you want to use,
such as python-snappy or python-lzo, which are also available via pip or
conda (conda-forge). 

As a side note, we will be using
Parquet format extensively through the book, and in the next chapter you
will write some of the NYC Parking Ticket dataset to Parquet format.
Therefore, you will see the **read_parquet** method many more times! This
discussion is here to simply give you a first look at how to use the method.
Now, without further ado, here’s how to use the read_parquet method.

The read_parquet method is used to
create a Dask DataFrame from one or more Parquet files, and the only
required argument is the path. One thing to notice about this call that might
look strange: nyc-parking-tickets-prq is a directory, not a file. That’s
because datasets stored as Parquet are typically written to disk prepartitioned, resulting in potentially hundreds or thousands of individual
files. Dask provides this method for convenience so you don’t have to
manually create a long list of filenames to pass in. You can specify a single
Parquet file in the path if you want to, but it’s much more typical to see
Parquet datasets referenced as a directory of files rather than individual files.

read Parquet from distributed filesystems. Just as
with delimited text files, the only difference is specifying a distributed
filesystem protocol, such as hdfs or s3, and specifying the relevant path to
the data.

Parquet is stored with a predefined schema, so there are no options to
mess with datatypes. The only real relevant options that Dask gives you to
control importing Parquet data are column filters and index selection. These
work the same way as with the other file formats. By default, they will be
inferred from the schema stored alongside the data, but you can override that
selection by manually passing in values to the relevant arguments.

we pick a few columns that we want to read from the dataset
and put them in a list called columns. We then pass in the list to the columns
argument, and we specify Plate ID to be used as the index by passing it in to
the index argument. The result of this will be a Dask DataFrame containing
only the three columns shown here and sorted/indexed by the Plate ID
column.

Parquet format offers good performance because it’s a column-oriented
format and highly compressible. Whenever possible, try to get your dataset
in Parquet format.