_Lambda School Data Science — Big Data_

# AWS SageMaker

### Links

#### AWS
- The Open Guide to Amazon Web Services: EC2 Basics _(just this one short section!)_ https://github.com/open-guides/og-aws#ec2-basics
- AWS in Plain English https://www.expeditedssl.com/aws-in-plain-english
- Amazon SageMaker » Create an Amazon SageMaker Notebook Instance https://docs.aws.amazon.com/sagemaker/latest/dg/gs-setup-working-env.html
- Amazon SageMaker » Install External Libraries https://docs.aws.amazon.com/sagemaker/latest/dg/nbi-add-external.html 

`conda install -n python3 bokeh dask datashader fastparquet numba python-snappy`

#### Dask
- Why Dask? https://docs.dask.org/en/latest/why.html
- Use Cases https://docs.dask.org/en/latest/use-cases.html
- User Interfaces https://docs.dask.org/en/latest/user-interfaces.html

#### Numba
- A ~5 minute guide http://numba.pydata.org/numba-doc/latest/user/5minguide.html

## 1. Estimate pi
https://en.wikipedia.org/wiki/Approximations_of_π#Summing_a_circle's_area

### With plain Python

In [1]:
import random

def monte_carlo_pi(nsamples):
    acc = 0
    for _ in range(int(nsamples)):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) < 1.0:
            acc += 1
    return 4.0 * acc / nsamples

In [2]:
%%time
monte_carlo_pi(1e7)

CPU times: user 4.17 s, sys: 0 ns, total: 4.17 s
Wall time: 4.17 s


3.142004

### With Numba
http://numba.pydata.org/

In [3]:
from numba import jit

In [4]:
@jit(nopython=True)
def monte_carlo_pi(nsamples):
    acc = 0
    for _ in range(int(nsamples)):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) < 1.0:
            acc += 1
    return 4.0 * acc / nsamples

In [5]:
%%time
monte_carlo_pi(1e7)

CPU times: user 404 ms, sys: 7.61 ms, total: 412 ms
Wall time: 410 ms


3.1421756

## 2. Loop a slow function

### With plain Python

In [6]:
from time import sleep

def slow_square(x):
    sleep(1)
    return x**2

In [7]:
%%time
[slow_square(n) for n in range(16)]

CPU times: user 1.42 ms, sys: 234 µs, total: 1.66 ms
Wall time: 16 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225]

### With Dask
- https://examples.dask.org/delayed.html
- http://docs.dask.org/en/latest/setup/single-distributed.html

In [8]:
from dask import compute, delayed

In [9]:
%%time
compute(delayed(slow_square)(n) for n in range(16))

CPU times: user 8.36 ms, sys: 5.39 ms, total: 13.7 ms
Wall time: 1.01 s


([0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225],)

## 3. Analyze millions of Instacart orders

### Download data
https://tech.instacart.com/3-million-instacart-orders-open-sourced-d40d29ead6f2

In [10]:
!wget https://s3.amazonaws.com/instacart-datasets/instacart_online_grocery_shopping_2017_05_01.tar.gz

--2019-06-10 20:43:15--  https://s3.amazonaws.com/instacart-datasets/instacart_online_grocery_shopping_2017_05_01.tar.gz
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.80.131
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.80.131|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 205548478 (196M) [application/x-gzip]
Saving to: ‘instacart_online_grocery_shopping_2017_05_01.tar.gz.2’


2019-06-10 20:43:19 (53.4 MB/s) - ‘instacart_online_grocery_shopping_2017_05_01.tar.gz.2’ saved [205548478/205548478]



In [11]:
!tar --gunzip --extract --verbose --file=instacart_online_grocery_shopping_2017_05_01.tar.gz

instacart_2017_05_01/
instacart_2017_05_01/._aisles.csv
instacart_2017_05_01/aisles.csv
instacart_2017_05_01/._departments.csv
instacart_2017_05_01/departments.csv
instacart_2017_05_01/._order_products__prior.csv
instacart_2017_05_01/order_products__prior.csv
instacart_2017_05_01/._order_products__train.csv
instacart_2017_05_01/order_products__train.csv
instacart_2017_05_01/._orders.csv
instacart_2017_05_01/orders.csv
instacart_2017_05_01/._products.csv
instacart_2017_05_01/products.csv


In [12]:
%cd instacart_2017_05_01

/home/ec2-user/SageMaker/DS-Unit-3-Sprint-3-Big-Data/module1-aws-sagemaker/instacart_2017_05_01


In [13]:
!ls -lh *.csv

-rw-r--r-- 1 ec2-user ec2-user 2.6K May  2  2017 aisles.csv
-rw-r--r-- 1 ec2-user ec2-user  270 May  2  2017 departments.csv
-rw-r--r-- 1 ec2-user ec2-user 551M May  2  2017 order_products__prior.csv
-rw-r--r-- 1 ec2-user ec2-user  24M May  2  2017 order_products__train.csv
-rw-r--r-- 1 ec2-user ec2-user 104M May  2  2017 orders.csv
-rw-r--r-- 1 ec2-user ec2-user 2.1M May  2  2017 products.csv


### With Pandas

#### Load & merge data

In [14]:
import pandas as pd

In [15]:
%%time
order_products = pd.concat([
    pd.read_csv('order_products__prior.csv'), 
    pd.read_csv('order_products__train.csv')])

order_products.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 33819106 entries, 0 to 1384616
Data columns (total 4 columns):
order_id             int64
product_id           int64
add_to_cart_order    int64
reordered            int64
dtypes: int64(4)
memory usage: 1.3 GB
CPU times: user 10.3 s, sys: 2.16 s, total: 12.5 s
Wall time: 12.5 s


In [16]:
order_products.info(memory_usage=True)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 33819106 entries, 0 to 1384616
Data columns (total 4 columns):
order_id             int64
product_id           int64
add_to_cart_order    int64
reordered            int64
dtypes: int64(4)
memory usage: 1.3 GB


In [17]:
order_products.head()

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered
0,2,33120,1,1
1,2,28985,2,1
2,2,9327,3,0
3,2,45918,4,1
4,2,30035,5,0


In [18]:
products = pd.read_csv('products.csv')
products.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49688 entries, 0 to 49687
Data columns (total 4 columns):
product_id       49688 non-null int64
product_name     49688 non-null object
aisle_id         49688 non-null int64
department_id    49688 non-null int64
dtypes: int64(3), object(1)
memory usage: 1.5+ MB


In [19]:
products.head()

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19
1,2,All-Seasons Salt,104,13
2,3,Robust Golden Unsweetened Oolong Tea,94,7
3,4,Smart Ones Classic Favorites Mini Rigatoni Wit...,38,1
4,5,Green Chile Anytime Sauce,5,13


In [20]:
%%time
order_products = pd.merge(order_products, products[['product_id', 'product_name']])

CPU times: user 7.01 s, sys: 1.87 s, total: 8.88 s
Wall time: 8.86 s


In [21]:
order_products.head()

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered,product_name
0,2,33120,1,1,Organic Egg Whites
1,26,33120,5,0,Organic Egg Whites
2,120,33120,13,0,Organic Egg Whites
3,327,33120,5,1,Organic Egg Whites
4,390,33120,28,1,Organic Egg Whites


#### Most popular products?

In [22]:
%%time
order_products['product_name'].value_counts()[:10]

CPU times: user 2.57 s, sys: 48.4 ms, total: 2.61 s
Wall time: 2.6 s


Banana                    491291
Bag of Organic Bananas    394930
Organic Strawberries      275577
Organic Baby Spinach      251705
Organic Hass Avocado      220877
Organic Avocado           184224
Large Lemon               160792
Strawberries              149445
Limes                     146660
Organic Whole Milk        142813
Name: product_name, dtype: int64

#### Organic?

In [23]:
order_products['organic'] = order_products['product_name'].str.contains('Organic')

In [24]:
%%time
order_products['organic'].value_counts(normalize=True)

CPU times: user 303 ms, sys: 137 ms, total: 440 ms
Wall time: 438 ms


False    0.684912
True     0.315088
Name: organic, dtype: float64

### With Dask
https://examples.dask.org/dataframe.html

In [25]:
import dask.dataframe as dd
from dask.distributed import Client

#### Load & merge data
https://examples.dask.org/dataframes/01-data-access.html#Read-CSV-files

In [26]:
%%time
order_products = dd.read_csv('order_products_*.csv')

CPU times: user 17.8 ms, sys: 3.73 ms, total: 21.5 ms
Wall time: 20.5 ms


In [27]:
order_products = dd.merge(order_products, products[['product_id', 'product_name']])

http://docs.dask.org/en/latest/dataframe-performance.html#persist-intelligently

In [28]:
order_products.head()

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered,product_name
0,2,33120,1,1,Organic Egg Whites
1,26,33120,5,0,Organic Egg Whites
2,120,33120,13,0,Organic Egg Whites
3,327,33120,5,1,Organic Egg Whites
4,390,33120,28,1,Organic Egg Whites


#### Most popular products?

In [29]:
# Still faster than Pandas because this time includes reading in the file
%time
order_products['product_name'].value_counts().compute()[:10]

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 9.54 µs


Banana                    491291
Bag of Organic Bananas    394930
Organic Strawberries      275577
Organic Baby Spinach      251705
Organic Hass Avocado      220877
Organic Avocado           184224
Large Lemon               160792
Strawberries              149445
Limes                     146660
Organic Whole Milk        142813
Name: product_name, dtype: int64

#### Organic?

In [30]:
%%time
order_products['organic'] = order_products['product_name'].str.contains('Organic')

CPU times: user 8.91 ms, sys: 121 µs, total: 9.03 ms
Wall time: 8.17 ms


In [None]:
%%time
order_products['organic'].value_counts().compute()

## 4. Fit a machine learning model

### Load data

In [32]:
%cd ../ds-predictive-modeling-challenge-data

/home/ec2-user/SageMaker/DS-Unit-3-Sprint-3-Big-Data/module1-aws-sagemaker/ds-predictive-modeling-challenge-data


In [33]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

train_features = pd.read_csv('train_features.csv')
train_labels = pd.read_csv('train_labels.csv')

X_train_numeric = train_features.select_dtypes(np.number)
y_train = train_labels['status_group']

### With 2 cores (like Google Colab)
https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html

In [None]:
%%time
model = RandomForestClassifier(n_estimators=200, oob_score=True, n_jobs=2, random_state=42, verbose=1)
model.fit(X_train_numeric, y_train)
print('Out-of-bag score:', model.oob_score_)

### With 16 cores (on AWS m4.4xlarge)

In [35]:
%time
model = RandomForestClassifier(n_estimators=200, oob_score=True, n_jobs=-1, random_state=42, verbose=1)
model.fit(X_train_numeric, y_train)
print('Out-of-bag score:', model.oob_score_)

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 9.06 µs


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:    0.4s
[Parallel(n_jobs=-1)]: Done 168 tasks      | elapsed:    2.2s
[Parallel(n_jobs=-1)]: Done 200 out of 200 | elapsed:    2.5s finished


Out-of-bag score: 0.7206228956228956


## ASSIGNMENT

Revisit a previous assignment or project that had slow speeds or big data.

Make it better with what you've learned today!

You can use `wget` or Kaggle API to get data. Some possibilities include:

- https://www.kaggle.com/c/ds1-predictive-modeling-challenge
- https://www.kaggle.com/ntnu-testimon/paysim1
- https://github.com/mdeff/fma
- https://tech.instacart.com/3-million-instacart-orders-open-sourced-d40d29ead6f2 



Also, you can play with [Datashader](http://datashader.org/) and its [example datasets](https://github.com/pyviz/datashader/blob/master/examples/datasets.yml)!

In [36]:
# Predictive Modeling Challenge Data (Tanzanian water pump data)

features_df = dd.read_csv('train_features.csv')
labels_df = dd.read_csv('train_labels.csv')
compute(features_df.shape, labels_df.shape)

((59400, 40), (59400, 2))

In [37]:
type(features_df)

dask.dataframe.core.DataFrame

In [49]:
features_df.describe().compute()

Unnamed: 0,id,amount_tsh,gps_height,longitude,latitude,num_private,region_code,district_code,population,construction_year
count,59400.0,59400.0,59400.0,59400.0,59400.0,59400.0,59400.0,59400.0,59400.0,59400.0
mean,37115.131768,317.650385,668.297239,34.077427,-5.706033,0.474141,15.297003,5.629747,179.909983,1300.652475
std,21453.128371,2997.574558,693.11635,6.567432,2.946019,12.23623,17.587406,9.633649,471.482176,951.620547
min,0.0,0.0,-90.0,0.0,-11.64944,0.0,1.0,0.0,0.0,0.0
25%,18519.75,0.0,0.0,33.090347,-8.540621,0.0,5.0,2.0,0.0,0.0
50%,37061.5,0.0,369.0,34.908743,-5.021597,0.0,12.0,3.0,25.0,1986.0
75%,55656.5,20.0,1319.25,37.178387,-3.326156,0.0,17.0,5.0,215.0,2004.0
max,74247.0,350000.0,2770.0,40.345193,-2e-08,1776.0,99.0,80.0,30500.0,2013.0


In [123]:
from dask_ml.preprocessing import LabelEncoder
from dask_ml.impute import SimpleImputer

def wrangle(X):
    df = X.copy()

    # Impute missing observations with existing values
    df = df.fillna('pad')
    print('NaN values filled')

    cols_to_drop = [
        "id",
        "recorded_by",
        "wpt_name",
        "extraction_type_group",
        "extraction_type_class",
        "management",
        "waterpoint_type_group",
        "quantity_group",
        "payment_type",
        "subvillage",
        "scheme_name",
        "num_private",
        "source_class",
        "source",
        "quality_group",
        "region_code",
        "date_recorded",
    ]

    label_categoricals = [
        "scheme_management",
        "management_group",
        "region",
        "ward",
        "extraction_type",
        "waterpoint_type",
        "water_quality",
        "installer",
        "funder",
        "lga",
        "basin",
        "public_meeting",
        "permit",
        "payment",
        "source_type",
        "quantity",
    ]

    # Drop unhelpful features
    df = df.drop(cols_to_drop, axis=1)
    print('Collinear features dropped')

    # Convert date_recorded
#     df["date_recorded"] = df["date_recorded"].astype('M8[us]')
#     print('Date converted')

    # Initialize encoder and transform categorical features
    df[label_categoricals].apply(LabelEncoder().fit_transform, axis=1)
    print('features transformed')

    return df

In [124]:
wrangled_features = wrangle(features_df)
compute(wrangled_features.shape)

NaN values filled
Collinear features dropped
features transformed


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=(None, 'object'))



((59400, 23),)

In [146]:
type(wrangled_features)

dask.dataframe.core.DataFrame

In [125]:
wrangled_features.head(1)

Unnamed: 0,amount_tsh,funder,gps_height,installer,longitude,latitude,basin,region,district_code,lga,...,scheme_management,permit,construction_year,extraction_type,management_group,payment,water_quality,quantity,source_type,waterpoint_type
0,6000.0,Roman,1390,Roman,34.938093,-9.856322,Lake Nyasa,Iringa,5,Ludewa,...,VWC,False,1999,gravity,user-group,pay annually,soft,enough,spring,communal standpipe


In [179]:
type(wrangled_features)

dask.dataframe.core.DataFrame

In [176]:
wrangled_features.shape

(Delayed('int-7d8c7c40-eb5e-464b-9a8c-4ec2f2f9ada0'), 23)

In [178]:
compute(wrangled_features.shape)

((59400, 23),)

In [126]:
# Split features and labels into train & test datasets
# Dask has (and requires) its own implementation of train_test_split
from dask_ml.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(
    wrangled_features,
    labels_df["status_group"],
    test_size=0.3,
    random_state=42,
)

In [127]:
print(compute(X_train.shape, X_val.shape, y_train.shape, y_val.shape))

((41536, 23), (17864, 23), (41536,), (17864,))


In [128]:
# Take a look at the train data
X_train.head(1)

Unnamed: 0,amount_tsh,funder,gps_height,installer,longitude,latitude,basin,region,district_code,lga,...,scheme_management,permit,construction_year,extraction_type,management_group,payment,water_quality,quantity,source_type,waterpoint_type
0,6000.0,Roman,1390,Roman,34.938093,-9.856322,Lake Nyasa,Iringa,5,Ludewa,...,VWC,False,1999,gravity,user-group,pay annually,soft,enough,spring,communal standpipe


In [185]:

# OTOH: can't use sklearn's RandomizedSearchCV either because dask
# dataframes don't implement the necessary methods, like .iloc for row indexing
# Also, dask_ml.xgboost is only a viable option when you need to 
# perform distributed training on, say, many EC2 clusters
from dask_ml.model_selection import RandomizedSearchCV
from xgboost import XGBClassifier

params = {
    "max_depth": [3, 4, 5, 6, 7],
    "learning_rate": [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
    "n_estimators": [100, 150, 200],
    "gamma": [0, 0.1, 0.2, 0.3, 0.5, 0.6, 0.8, 1.0],
    "min_child_weight": [0, 1, 2, 3],
}

xgb = XGBClassifier(
        booster="gbtree",
        objective="multi:softmax",
        random_state=42,
        n_jobs=-1,
)

# Can't call numba's @jit here
opt = RandomizedSearchCV(
    xgb,
    param_distributions=params,
    n_iter=5,
    n_jobs=-1,
    cache_cv=False,
)


In [186]:
opt.fit(X_train, y_train).compute()

AssertionError: 

In [None]:
# WIP