# DATA SCIENCE AT-SCALE

# DO WORK
When you get to class ...
- Open DS-DC-14_11_data_science_at_scale.ipynb
- run `conda install dask` to get a copy of the Dask library for Python

## DATA SCIENCE AT SCALE LEARNING OBJECTIVES

- Define Big Data and its challenges
- Learn how to deal with data that doesn't fit into memory
- Understand why Hadoop and Spark exist
- Get an introduction to using Spark and MLlib

# GUIDED PRACTICE

Let's continue the exercise from last class so we have a chance to go through the full data science workflow as a class

## ACTIVITY: PROJECT PRACTICE
Objective: Review content thus far in the course, bring the material together, find weak areas, and get help in class.

Using the flights data from the last example try to make it through the data science workflow in the first hour of class.

There are many ways to manipulate this data set: 
- Consider what is a proper "categorical" variable, and keep only what is significant. 
- You will easily have 20+ variables (start with one). Aim to have at least three visuals that clearly explain the relationship of variables you've used against the predictive survival value.
- Generate the AUC or precision-recall curve (based on which you think makes more sense), and have a statement that defines, compared to a baseline, how your model performs and any caveats.  
  - For example: "My model on average performs at x rate, but the features under-perform and explain less of the data at these thresholds." Consider this as practice for your own project, since the steps you'll take to present your work will be relatively similar.
  
[Pandas super cheatsheet](http://nbviewer.jupyter.org/github/justmarkham/pandas-videos/blob/master/pandas.ipynb)


In [1]:
import pandas as pd
import sklearn.linear_model as lm
import matplotlib.pyplot as plt

df = pd.read_csv('assets/dataset/flight_delays.csv')

df = df.join(pd.get_dummies(df['DAY_OF_WEEK'], prefix='dow'))
df = df[df.DEP_DEL15.notnull()].copy()

In [None]:
df.info()

In [None]:
df.UNIQUE_CARRIER.value_counts()

In [2]:
df = df.join(pd.get_dummies(df['UNIQUE_CARRIER'], prefix='airline'))
df = df[df.DEP_DEL15.notnull()].copy()


In [3]:
df = df.drop('UNIQUE_CARRIER', axis=1)

In [None]:
df.columns

In [4]:
features = ['airline_AA','airline_AS','airline_DL','airline_EV','airline_F9', 'airline_HA','airline_MQ','airline_NK', 'airline_OO','airline_UA', 'airline_US', 'airline_VX', 'airline_WN' ]

In [5]:
X= df[features]
y= df['DEP_DEL15']

In [6]:
from sklearn import cross_validation

In [7]:
X_train,X_test,y_train,y_test = cross_validation.train_test_split(X,y, random_state=77)

In [8]:
from sklearn import metrics
from sklearn import dummy

In [9]:
from sklearn import linear_model

In [10]:
lm = linear_model.LogisticRegression().fit(X_train,y_train)

In [11]:
y_pred = lm.predict(X_test)

In [12]:
print lm.score(X_test,y_test)

0.802632267975


In [13]:
print metrics.classification_report(y_test, y_pred)

             precision    recall  f1-score   support

        0.0       0.80      1.00      0.89     91964
        1.0       0.00      0.00      0.00     22614

avg / total       0.64      0.80      0.71    114578



  'precision', 'predicted', average, warn_for)


In [14]:
print metrics.accuracy_score(y_test, y_pred)
print metrics.roc_auc_score(y_test,y_pred)

0.802632267975
0.5


In [15]:
print metrics.confusion_matrix(y_test, y_pred)

[[91964     0]
 [22614     0]]


In [None]:
dm = dummy.DummyClassifier()

In [None]:
dm.fit(X_train, y_train)

In [None]:
y_pred1= dm.predict(X_test)

In [None]:
print metrics.accuracy_score(y_test, y_pred1)
print metrics.roc_auc_score(y_test, y_pred1)

In [16]:
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier(n_estimators=10, max_depth=8)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
print metrics.accuracy_score(y_test, y_pred)
print metrics.roc_auc_score(y_test, y_pred)

0.802632267975
0.5


# BREAK

# WHAT IS OUT OF CORE LEARNING?
Objective: Learn how to deal with the problem of too much data on one computer

A way to train models that don't fit into main memory

**For our purposes we can separate computer memory into two categories**
- Random Access Memory (RAM) -- Main/Core memory
- Hard Disk (HDD/SDD)

**RAM**
- About 1000x faster than hard disk
- Expensive and comes with space limitations 
  - It's difficult to get more than 200GB of RAM on a single computer/server
  - Much less on most computers (16GB)

**HDD/SDD**
- About 1000x slower than RAM, although newer SDD drives are much better
- Could possible store up to 5TB on a single computer/server
  - Much less on most computers (1TB)

**Problem: By default we want to work within RAM because this is fast. This poses some issues for larger datasets because we can't fit them into RAM**

**Solutions**
- Get more RAM
  - One hour of a developer's time can buy 1 month of 8GB of computing power.
- Sample your data
  - For most of the data science workflow you can work on a small sample of your data without affecting results
- Use clever algorithms to store data on harddisk and do processing in RAM
  - Unix command-line tools
  - Blaze/Dask
  - Many other options, however, they tend to be less user-friendly than in RAM tools

### KNOWLEDGE CHECK

I just recieved a 100GB csv file, what should I do with it if I want to explore it?

<!--
ANSWER:
Don't try to read in the whole file. You only need to preview the columns and sample some rows for preview.

You can do this with command line using head, tail, and awk
- To get you started: http://bconnelly.net/working-with-csvs-on-the-command-line/

You can also do this in Python:
- import csv library to read line by line
- Use the chunksize argument for pd.read_csv to read chunk by chunk
- Use Dask, which we will cover today
-->

# DEMO: USE DASK TO FIT A LOGISTIC REGRESSION OUT OF CORE USING STOCHASTIC GRADIENT DESCENT

In [17]:
import dask

In [18]:
import dask.dataframe as dd
# Read 1 MB at a time to keep main memory use low
# The flight delays data is 43 MB on disk so we should have 43 divisions
df = dd.read_csv('assets/dataset/flight_delays.csv', blocksize=1000000)

In [19]:
df.head()

Unnamed: 0,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,CRS_DEP_TIME,DEP_TIME,DEP_DEL15,Unnamed: 15
0,1,4,2015-01-01,AA,19805,AA,12478,1247802,31703,12892,1289203,32575,900.0,855.0,0.0,
1,1,4,2015-01-01,AA,19805,AA,12892,1289203,32575,12478,1247802,31703,900.0,856.0,0.0,
2,1,4,2015-01-01,AA,19805,AA,12478,1247802,31703,12892,1289203,32575,1230.0,1226.0,0.0,
3,1,4,2015-01-01,AA,19805,AA,12892,1289203,32575,12478,1247802,31703,1220.0,1214.0,0.0,
4,1,4,2015-01-01,AA,19805,AA,11298,1129803,30194,12173,1217302,32134,1305.0,1754.0,1.0,


In [20]:
# Oops, this won't give you results, because you need to bring the results into RAM explicitly
df.DAY_OF_MONTH.describe()

dd.Series<describ..., npartitions=1>

In [24]:
df.DEP_TIME.describe().compute()



count    458311.000000
mean       1333.028542
std         479.639617
min           1.000000
25%                NaN
50%                NaN
75%                NaN
max        2400.000000
dtype: float64

In [25]:
# That took a while, it's smart to use sampling where possible
# Especially for Exploratory data analysis
# 30 samples can give a solid estimate 
# Reducing the work load to 4,000 isn't too statistically different than 400,000
df.DEP_TIME.sample(.01).describe().compute()

count    4598.000000
mean     1333.053936
std       476.600229
min         3.000000
25%      1214.000000
50%       905.250000
75%              NaN
max      2358.000000
dtype: float64

In [26]:
# You may have noticed that took just as long because Dask needed time to sample the data
# You only need to sample once
sample_df = df.sample(.01).compute()

In [27]:
# Much faster
sample_df.describe()

Unnamed: 0,DAY_OF_MONTH,DAY_OF_WEEK,AIRLINE_ID,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,CRS_DEP_TIME,DEP_TIME,DEP_DEL15,Unnamed: 15
count,4710.0,4710.0,4710.0,4710.0,4710.0,4710.0,4710.0,4710.0,4710.0,4710.0,4577.0,4577.0,0.0
mean,15.850318,4.022505,19975.247771,12664.241401,1266427.0,31722.077707,12685.657325,1268568.0,31722.953079,1328.595541,1338.201224,0.199039,
std,8.944432,1.934784,403.007598,1532.472012,153246.9,1287.284303,1506.319106,150631.6,1286.056786,468.347694,478.183937,0.399321,
min,1.0,1.0,19393.0,10135.0,1013503.0,30113.0,10135.0,1013503.0,30073.0,5.0,2.0,0.0,
25%,8.0,2.0,19790.0,11292.0,1129202.0,30627.0,11292.0,1129202.0,30647.0,930.0,,,
50%,16.0,4.0,19977.0,12889.0,1288903.0,31453.0,12889.0,1288903.0,31453.0,1323.0,,,
75%,23.0,6.0,20366.0,13930.0,1393003.0,32467.0,14027.0,1402702.0,32467.0,1720.0,,,
max,31.0,7.0,21171.0,16218.0,1621801.0,35991.0,15991.0,1599102.0,35991.0,2359.0,2400.0,1.0,


In [28]:
# model fit changes as well, you need to feed in the data chunk by chunk
# This means that some models cannot work out of core
from sklearn import linear_model

In [35]:
# Stochastic Gradient Descent with a log-loss is logistic regression
sgd = linear_model.SGDClassifier(loss='log')

In [36]:
df = df[['DEP_TIME', 'DEP_DEL15']]
df = df.dropna(subset=['DEP_DEL15'])

In [37]:
# Get the number of divisions
divs = len(df.divisions) - 1

In [38]:
# Iterate through divisions and update model fit
for n in range(divs):
    # Get a chunk of rows
    ndf = df.get_partition(n).compute()
    
    # Pull out features and outcome
    X = ndf[['DEP_TIME']]
    y = ndf['DEP_DEL15']
    sgd.partial_fit(X, y, classes=[0, 1])
    print 'Division: ', n, sgd.intercept_, sgd.coef_

Division:  0 [-723.89493824] [[ 363.43804538]]
Division:  1 [-984.20131488] [[-498.94129837]]
Division:  2 [-1198.58423557] [[-319.33309038]]
Division:  3 [-1375.23714976] [[ 65.32571009]]
Division:  4 [-1522.42858587] [[-116.25029569]]
Division:  5 [-1642.42131475] [[-27.43357549]]
Division:  6 [-1702.37768073] [[ 239.50904152]]
Division:  7 [-1775.80744095] [[ 221.4808159]]
Division:  8 [-1835.67327596] [[-67.47479507]]
Division:  9 [-1875.84238331] [[ 40.86592996]]
Division:  10 [-1911.73675281] [[ 7.16831207]]
Division:  11 [-1944.47413797] [[-1.67357044]]
Division:  12 [-1972.76390958] [[-23.42563066]]
Division:  13 [-1987.83137173] [[-44.64415282]]
Division:  14 [-2011.22619876] [[-47.88477093]]
Division:  15 [-2042.09682559] [[-20.94050061]]
Division:  16 [-2070.74434542] [[ 116.91780076]]
Division:  17 [-2079.76657809] [[ 26.16944056]]
Division:  18 [-2087.43747343] [[-10.60659786]]
Division:  19 [-2095.57291176] [[-27.59208108]]
Division:  20 [-2103.4961846] [[-65.80390923]]
D

### KNOWLEDGE CHECK

1. Why is the coefficient changing as we add more divisions? 
2. What happens to the confidence interval of our estimate?
3. Can we overfit by training on too much data?

**Out-of-core learning in Python can be a bit difficult**

Unfortunately, dask is not fully integrated with scikit-learn yet. So, you would need to make custom implementations of train_test_split or GridSearchCV
- To get you started:
    - https://github.com/dask/dask-learn
    - http://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html
        
**Some alternatives that take the idea of out of core learning further**
- [MLlib](http://spark.apache.org/mllib/)
- [Mahout](http://mahout.apache.org/)
- [Vowpal Wabbit](https://github.com/JohnLangford/vowpal_wabbit/wiki)

**Sanity Check**
- Most data science and machine learning problems start out as larger than main memory and are reduced to in memory by the time you begin to build models. So, the lack of dask and scikit-learn integration is not a likely limitation.

# WHAT IS BIG DATA?

Objective: Learn how to deal with the problem of too much data on several computers

We've talked about data that can't fit into RAM, what about data that can't fit within a 5TB hard-disk or needs to be computed quickly?

**WHAT IS BIG DATA AND ITS CHALLENGES?**
- Big Data is data that is too big to process or store on a single machine

**Processing Challenge:**
- Data is growing faster than CPU speeds

**Storage Challenge:**
- Data is growing faster than single-machine storage

## Solution (at the moment)

Distributed Computing / Cloud Computing / Cluster Computing

- Use a large number of commodity HW
- Not as expensive as premium hardware (super computers)
- Easy to add capacity
- Easy to mix and match HW
- Cheaper per CPU/disk


**Distributed Computing Challenges**

**HardWare challenges:**
- Slow machines
- Uneven capacity and performance
- HW failures
- Hard drives, memory, etc all fails
- Increased latency
- Network speeds slower than bus speeds

**SoftWare challenge:**
- How do you program and distribute work?


### KNOWLEDGE CHECK
Is RAM vs Hard Disk still an issue when we distribute our work to multiple computers?

<!--
Answer:
Definitely. In fact, the trade-off between the two storage types is what inspires Spark
-->

# HADOOP MAP REDUCE

**Map Reduce** is a two-phase divide and conquer algorithm invented and published by Google in 2004.

1. In the mapper phase, data is split into chunks and the same operation is performed on each chunk, while
2. In the reducer phase, data is aggregated back to produce a final result.

![Map-Reduce](assets/images/map-reduce.png)

This is actually a really obvious idea from a programming perspective, but making it convenient to use is quite an accomplishment

In [39]:
numbers = [1, 2, 3, 4]
def add_one(x):
    return x + 1

print map(lambda x: x + 1, numbers)
print map(add_one, numbers)
# What's a lambda?
# Just a syntax for making a short function

[2, 3, 4, 5]
[2, 3, 4, 5]


In [40]:
numbers = [1, 2, 3, 4]
def iter_sum(a, b):
    return a + b

print reduce(lambda a, b: a + b, numbers)
print reduce(iter_sum, numbers)

10
10


In [41]:
numbers = [1, 2, 3, 4]
print reduce(
    lambda a, b: a + b,
    map(lambda x: x + 1, 
    numbers)
)

14


**How does this help?**
- Every mapper can be run on a separate computer
  - Awesome, we can distribute work over several computers
- However, we still need the final result to fit on a single computer after the reducer step

![word-count](assets/images/word-count.png)

# APACHE SPARK

**Apache Spark** is a scalable, efficient improvement to Map Reduce that was developed at Berkeley AMPLab in 2010

Uses in-memory computing (RAM) to increase performance

![Apache vs Hadoop](assets/images/apache_vs_hadoop.png)

**MAP REDUCE VS SPARK**

- Disk IO is much slower than Memory
- Keep more data in-memory
- Disk IO is in the order of milliseconds
- Memory IO is in the order of nanoseconds

**Other Improvements:**
- Support for additional operations in addition to MR
- Support for interactive and streaming processing in addition to batch
- Support for Scala, R, and Python in addition to Java

# BREAK

# EXERCISE
DEMO AND INDEPENDENT PRACTICE ON DATABRICKS CLOUD

https://github.com/ga-students/DS-DC-14/tree/master/lessons/lesson-11/starter-code

# TOPIC REVIEW: DATA SCIENCE AT SCALE


- What is Big Data and what are its challenges?
- What are the differences between Map Reduce and Spark?
- What are the differences between Spark RDDs and Spark DataFrames?

# UPCOMING WORK
- Final Project Milestone 2 will be due Monday, September 12th

![Syllabus](assets/images/syllabus.png)

![timeline](assets/images/timeline.png)

# LESSON: DATA SCIENCE AT SCALE

EXIT TICKET

DON’T FORGET TO FILL OUT YOUR EXIT TICKET

http://goo.gl/forms/gG5qAw9QljgkHC2q1
