# Tutorial: Scalable ML 01
To visit the original guide go to [link](https://tomaugspurger.github.io/scalable-ml-01).

The goal here is to fight the memory and time constraints when datasets grow bigger than RAM and the data starts to be a problem.

**Citing the author:**
1. **I'm constrained by size:** My training dataset fits in RAM, but I have to predict for a much larger dataset. Or, my training dataset doesn't even fit in RAM. I'd like to scale out by adopting algorithms that work in batches locally, or on a distributed cluster.
2. **I'm constrained by time:** I'd like to fit more models (think hyper-parameter optimization or ensemble learning) on my dataset in a given amount of time. I'd like to scale out by fitting more models in parallel, either on my laptop by using more cores, or on a cluster.

These aren't mutually exclusive or exhaustive, but they should serve as a nice framework for our discussion. I'll be showing where the usual pandas + scikit-learn for in-memory analytics workflow breaks down, and offer some solutions for scaling out to larger problems.

This post will focus on cases where your training dataset fits in memory, but you must predict on a dataset that's larger than memory. Later posts will explore into parallel, out-of-core, and distributed training of machine learning models.



## Libraries

In [3]:
"""
Data in-memory Storage Interface
"""
import pandas as pd

"""
Sklearn
"""
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import Imputer, StandardScaler
from sklearn.linear_model import LinearRegression, LogisticRegression

# We'll use FunctionTransformer for simple transforms
from sklearn.preprocessing import FunctionTransformer
# TransformerMixin gives us fit_transform for free
from sklearn.base import TransformerMixin

"""
Jupyter Notebook
"""
from IPython.display import display

## Configurations

In [4]:
%matplotlib inline

"""
Jupyter Notebook
"""
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

"""
Pandas
"""
pd.options.display.max_rows = 10

## Data
First we need to download the dataset for the test we are performing.

In [None]:
%load download.py

Check data file structure.

In [6]:
ls -lh data/*.csv

-rw-r--r--  1 solid  staff   1.6G Dec  6 19:45 data/yellow_tripdata_2009-01.csv
-rw-r--r--  1 solid  staff   1.5G Dec  6 19:45 data/yellow_tripdata_2009-02.csv
-rw-r--r--  1 solid  staff   1.5G Dec  6 19:45 data/yellow_tripdata_2009-03.csv
-rw-r--r--  1 solid  staff   1.3G Dec  6 19:45 data/yellow_tripdata_2009-04.csv


Lets load the first DataFrame into Memory.

In [7]:
%%time

dtype = {
    'vendor_name': 'category',
    'Payment_Type': 'category',
}

df = pd.read_csv("data/yellow_tripdata_2009-01.csv", dtype=dtype,
                 parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)
display(df.head())

Unnamed: 0,vendor_name,Trip_Pickup_DateTime,Trip_Dropoff_DateTime,Passenger_Count,Trip_Distance,Start_Lon,Start_Lat,Rate_Code,store_and_forward,End_Lon,End_Lat,Payment_Type,Fare_Amt,surcharge,mta_tax,Tip_Amt,Tolls_Amt,Total_Amt
0,VTS,2009-01-04 02:52:00,2009-01-04 03:02:00,1,2.63,-73.991957,40.721567,,,-73.993803,40.695922,CASH,8.9,0.5,,0.0,0.0,9.4
1,VTS,2009-01-04 03:31:00,2009-01-04 03:38:00,3,4.55,-73.982102,40.73629,,,-73.95585,40.76803,Credit,12.1,0.5,,2.0,0.0,14.6
2,VTS,2009-01-03 15:43:00,2009-01-03 15:57:00,5,10.35,-74.002587,40.739748,,,-73.869983,40.770225,Credit,23.7,0.0,,4.74,0.0,28.44
3,DDS,2009-01-01 20:52:58,2009-01-01 21:14:00,1,5.0,-73.974267,40.790955,,,-73.996558,40.731849,CREDIT,14.9,0.5,,3.05,0.0,18.45
4,DDS,2009-01-24 16:18:23,2009-01-24 16:24:56,1,0.4,-74.00158,40.719382,,,-74.008378,40.72035,CASH,3.7,0.0,,0.0,0.0,3.7


CPU times: user 53.3 s, sys: 5.2 s, total: 58.5 s
Wall time: 59.5 s


Now let's define our X and y to the task at hand. Taking into account that we want to predict the Amount of Tip a client is going to pay, we create the following Dataset.

In [8]:
X = df.drop("Tip_Amt", axis=1)
y = df['Tip_Amt'] > 0

X_train, X_test, y_train, y_test = train_test_split(X, y)

Let's check the size of the obtained folds, per se, train and test size.

In [9]:
print('train sample size: {}\ntest sample size: {}'.format(len(X_train), len(X_test)))

train sample size: 7073603
test sample size: 2357868


Let's fix the `Payment_Type`, since there are some incosistensies in this feature.

In [10]:
df['Payment_Type'].drop_duplicates()

0            CASH
1          Credit
3          CREDIT
9            Cash
96      No Charge
1167      Dispute
Name: Payment_Type, dtype: category
Categories (6, object): [CASH, CREDIT, Cash, Credit, Dispute, No Charge]

In [11]:
df['Payment_Type'].str.lower()

0            cash
1          credit
2          credit
3          credit
4            cash
            ...  
9431466      cash
9431467    credit
9431468    credit
9431469      cash
9431470      cash
Name: Payment_Type, Length: 9431471, dtype: object

## Build the Pipeline
**Citing the author in this one:**
* The downside of this approach is that we now have to remember which pre-processing steps we did, and in what order. The pipeline from raw data to fit model is spread across multiple python objects. **A better approach is to use scikit-learn's Pipeline object.**

In [12]:
sequence = [
    Imputer(strategy='median'), 
    StandardScaler(), 
    LinearRegression()
]
pipeline = make_pipeline(*sequence)

Let's fit our data into the pipeline and check if it works fine.

But remember, we have one problem only. How do we do the lowercase() transformation in new data? Do not worry, we can add it to the pipeline. Check the code bellow.

In [14]:
def payment_lowerer(X):
    return X.assign(Payment_Type=X.Payment_Type.str.lower())

We do not need to use all the classes in this dataset, so a column selector would be awesome.

In [15]:
class ColumnSelector(TransformerMixin):
    "Select `columns` from `X`"
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X[self.columns]

**Citing the author:**
* Internally, pandas stores datetimes like Trip_Pickup_DateTime as a 64-bit integer representing the nanoseconds since some time in the 1600s. If we left this untransformed, scikit-learn would happily transform that column to its integer representation, which may not be the most meaningful item to stick in a linear model for predicting tips. A better feature might the hour of the day:

In [16]:
class HourExtractor(TransformerMixin):
    "Transform each datetime in `columns` to integer hour of the day"
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X.assign(**{col: lambda x: x[col].dt.hour
                           for col in self.columns})

**Citing the author:**
* Likewise, we'll need to ensure the categorical variables (in a statistical sense) are categorical dtype (in a pandas sense). We want categoricals so that we can call get_dummies later on without worrying about missing or extra categories in a subset of the data throwing off our linear algebra.

In [17]:
class CategoricalEncoder(TransformerMixin):
    """
    Convert to Categorical with specific `categories`.

    Examples
    --------
    >>> CategoricalEncoder({"A": ['a', 'b', 'c']}).fit_transform(
    ...     pd.DataFrame({"A": ['a', 'b', 'a', 'a']})
    ... )['A']
    0    a
    1    b
    2    a
    3    a
    Name: A, dtype: category
    Categories (2, object): [a, b, c]
    """
    def __init__(self, categories):
        self.categories = categories

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X = X.copy()
        for col, categories in self.categories.items():
            X[col] = X[col].astype('category').cat.set_categories(categories)
        return X

**Citing the author:**
* Finally, we'd like to normalize the quantitative subset of the data. Scikit-learn has a StandardScaler, which we'll mimic here, to just operate on a subset of the columns.

In [18]:
class StandardScaler(TransformerMixin):
    "Scale a subset of the columns in a DataFrame"
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        # Yes, non-ASCII symbols can be a valid identfiers in python 3
        self.μs = X[self.columns].mean()
        self.σs = X[self.columns].std()
        return self

    def transform(self, X, y=None):
        X = X.copy()
        X[self.columns] = X[self.columns].sub(self.μs).div(self.σs)
        return X

**Time to build the pipeline**, let's define everything now!

In [45]:
# The columns at the start of the pipeline
columns = ['vendor_name', 'Trip_Pickup_DateTime',
           'Passenger_Count', 'Trip_Distance',
           'Payment_Type', 'Fare_Amt', 'surcharge']

# The mapping of {column: set of categories}
categories = {
    'vendor_name': ['CMT', 'DDS', 'VTS'],
    'Payment_Type': ['cash', 'credit', 'dispute', 'no charge'],
}

scale = ['Trip_Distance', 'Fare_Amt', 'surcharge']

pipe = make_pipeline(
    ColumnSelector(columns),
    HourExtractor(['Trip_Pickup_DateTime']),
    FunctionTransformer(payment_lowerer, validate=False),
    CategoricalEncoder(categories),
    FunctionTransformer(pd.get_dummies, validate=False),
    StandardScaler(scale),
    Imputer(strategy='median'),
    LogisticRegression(),
)

In [46]:
pipe.steps

[('columnselector', <__main__.ColumnSelector at 0x17c33c8d0>),
 ('hourextractor', <__main__.HourExtractor at 0x17c33cf98>),
 ('functiontransformer-1', FunctionTransformer(accept_sparse=False,
            func=<function payment_lowerer at 0x18cd47488>, inv_kw_args=None,
            inverse_func=None, kw_args=None, pass_y='deprecated',
            validate=False)),
 ('categoricalencoder', <__main__.CategoricalEncoder at 0x17c33c630>),
 ('functiontransformer-2', FunctionTransformer(accept_sparse=False,
            func=<function get_dummies at 0x105f28378>, inv_kw_args=None,
            inverse_func=None, kw_args=None, pass_y='deprecated',
            validate=False)),
 ('standardscaler', <__main__.StandardScaler at 0x17c33cdd8>),
 ('imputer',
  Imputer(axis=0, copy=True, missing_values='NaN', strategy='median', verbose=0)),
 ('logisticregression',
  LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
            intercept_scaling=1, max_iter=100, multi_class='ovr

In [47]:
%time pipe.fit(X_train, y_train)

CPU times: user 57.7 s, sys: 5.91 s, total: 1min 3s
Wall time: 1min 7s


Pipeline(memory=None,
     steps=[('columnselector', <__main__.ColumnSelector object at 0x17c33c8d0>), ('hourextractor', <__main__.HourExtractor object at 0x17c33cf98>), ('functiontransformer-1', FunctionTransformer(accept_sparse=False,
          func=<function payment_lowerer at 0x18cd47488>, inv_kw_args=None,
          inve...ty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

In [48]:
%time pipe.score(X_train, y_train)

CPU times: user 3.97 s, sys: 1.72 s, total: 5.69 s
Wall time: 5.66 s


0.99310506965120882

In [49]:
%time pipe.score(X_test, y_test)

CPU times: user 1.3 s, sys: 488 ms, total: 1.79 s
Wall time: 1.78 s


0.99313320338543121

It turns out people essentially tip if and only if they're paying with a card, so this isn't a particularly difficult task. Or perhaps more accurately, tips are only recorded when someone pays with a card.

## Scalling Out with Dask

OK, so we've fit our model and it's been basically normal. Maybe we've been overly-dogmatic about doing everything in a pipeline, but that's just good model hygiene anyway.

Now, to scale out to the rest of the dataset. We'll predict the probability of tipping for every cab ride in the dataset (bearing in mind that the full dataset doesn't fit in my laptop's RAM, so we'll do it out-of-core).

To make things a bit easier we'll use dask, though it isn't strictly necessary for this section. It saves us from writing a for loop (big whoop). Later on well see that we can, reuse this code when we go to scale out to a cluster (that part is pretty cool, actually). Dask can scale down to a single laptop, and up to thousands of cores.

In [50]:
import dask.dataframe as dd

df = dd.read_csv("data/*.csv", dtype=dtype,
                 parse_dates=['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime'],)

X = df.drop("Tip_Amt", axis=1)

In [51]:
yhat = X.map_partitions(lambda x: pd.Series(pipe.predict_proba(x)[:, 1],
                                            name='yhat'),
                        meta=('yhat', 'f8'))

As you can see, we have a mighty dataset with 34M points, WTF?

In [56]:
%time yhat.to_frame().to_parquet("data/predictions.parq")

CPU times: user 4min 54s, sys: 38.9 s, total: 5min 33s
Wall time: 3min 3s
