# Pipelines
Data pipelines are a series of automated data transformations that ensure the validity of your work for routine data maintenance tasks. Each stage of a pipeline feeds from the previous stage, i.e. the output of a stage is plugged into the input of the next stage and data flows through the pipeline from beginning to end just as water flows through a pipeline. Many organizations rely on data engineering teams to encode common tasks into pipelines.

Examples of data transformations:
- change in scale, units, or base
- text vectorization
- image vectorization
- sound file vectorization
- missing data imputation
- clipping

In [42]:
from sklearn.pipeline import Pipeline
import pandas as pd
import json

data = pd.read_csv("data/stumbleupon.tsv", sep='\t')
data['title'] = data.boilerplate.map(lambda x: json.loads(x).get('title', ''))
data['body'] = data.boilerplate.map(lambda x: json.loads(x).get('body', ''))

# fill NA with empty cells and check data
data2 = data.fillna('')

In [47]:
data2.title.isnull().sum()

0

In [16]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7395 entries, 0 to 7394
Data columns (total 29 columns):
url                               7395 non-null object
urlid                             7395 non-null int64
boilerplate                       7395 non-null object
alchemy_category                  7395 non-null object
alchemy_category_score            7395 non-null object
avglinksize                       7395 non-null float64
commonlinkratio_1                 7395 non-null float64
commonlinkratio_2                 7395 non-null float64
commonlinkratio_3                 7395 non-null float64
commonlinkratio_4                 7395 non-null float64
compression_ratio                 7395 non-null float64
embed_ratio                       7395 non-null float64
framebased                        7395 non-null int64
frameTagRatio                     7395 non-null float64
hasDomainLink                     7395 non-null int64
html_ratio                        7395 non-null float64
image_r

In [48]:
# set label as target
y = data.label
y

0       0
1       1
2       1
3       1
4       0
5       0
6       1
7       0
8       1
9       1
10      0
11      0
12      1
13      1
14      0
15      0
16      0
17      0
18      1
19      1
20      0
21      1
22      0
23      1
24      1
25      0
26      0
27      0
28      1
29      1
       ..
7365    1
7366    0
7367    1
7368    1
7369    1
7370    0
7371    1
7372    0
7373    0
7374    0
7375    0
7376    1
7377    0
7378    1
7379    1
7380    0
7381    0
7382    1
7383    1
7384    1
7385    0
7386    1
7387    1
7388    0
7389    0
7390    0
7391    0
7392    1
7393    1
7394    0
Name: label, dtype: int64

In [51]:
# check target proportion
y.value_counts()/len(y)

1    0.51332
0    0.48668
Name: label, dtype: float64

In [60]:
# countvectorize our first title
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(max_features=1000, ngram_range=(1, 2), stop_words='english', binary=True)
#max_features is the most number of columns it will make
#ngram_range is defining one word and two word pairs, 
#stop_words=
#binary True marks 1 if word exists instead of keeping the counts

vectorizer.fit(['IBM Sees Holographic Calls Air Breathing'])

CountVectorizer(analyzer=u'word', binary=True, decode_error=u'strict',
        dtype=<type 'numpy.int64'>, encoding=u'utf-8', input=u'content',
        lowercase=True, max_df=1.0, max_features=1000, min_df=1,
        ngram_range=(1, 2), preprocessor=None, stop_words='english',
        strip_accents=None, token_pattern=u'(?u)\\b\\w\\w+\\b',
        tokenizer=None, vocabulary=None)

In [63]:
vectorizer.vocabulary_

{u'air': 0,
 u'air breathing': 1,
 u'breathing': 2,
 u'calls': 3,
 u'calls air': 4,
 u'holographic': 5,
 u'holographic calls': 6,
 u'ibm': 7,
 u'ibm sees': 8,
 u'sees': 9,
 u'sees holographic': 10}

Example of how Count Vectorizer works:
![Example](assets/CountVectorizer.jpg)

In [65]:
# get n-grams
vectorizer.get_feature_names()

[u'air',
 u'air breathing',
 u'breathing',
 u'calls',
 u'calls air',
 u'holographic',
 u'holographic calls',
 u'ibm',
 u'ibm sees',
 u'sees',
 u'sees holographic']

In [69]:
# vectorize our original training title
vectorizer.transform(["IBM Sees Holograpic Air"]).todense()

matrix([[1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0]])

In [79]:
# Use `fit` to learn the vocabulary of the titles
vectorizer.fit(data2.title)

# Use `transform` to generate the sample X word matrix - one column per feature (word or n-grams)
X = vectorizer.transform(data2.title)


In [90]:
# build Logit and CV score
from sklearn.linear_model import LogisticRegression
from sklearn.cross_validation import cross_val_score

model = LogisticRegression()

cross_val_score(model, X, y)

array([ 0.74574209,  0.75659229,  0.75487013])

In [109]:
# Split the data into a training set
training = data2[:6000]
X_train = training.title
y_train = training.label

# reserve future data, unavailable at training time
# These rows are obtained in the future
X_new = data[:6000]["title"].fillna("")

pipeline = Pipeline([
        ('vec', vectorizer),
        ('model', model)],)

# Fit the full pipeline

# This means we perform the steps laid out above
# First we fit the vectorizer,
# And then feed the output of that into the fit function of the model

pipeline.fit(X_train, y_train)


# Here again we apply the full pipeline for predictions
# The text is transformed automatically to match the features from the pipeline
pipeline.predict_proba(X_new)

array([[ 0.49530115,  0.50469885],
       [ 0.60684225,  0.39315775],
       [ 0.41797412,  0.58202588],
       ..., 
       [ 0.8437683 ,  0.1562317 ],
       [ 0.60684225,  0.39315775],
       [ 0.20297257,  0.79702743]])

In [113]:
from sklearn.preprocessing import FunctionTransformer

def fun(x):
    return x+1
    

# Split the data into a training set
training = data2[:6000]
X_train = training.title
y_train = training.label

# reserve future data, unavailable at training time
# These rows are obtained in the future
X_new = data[:6000]["title"].fillna("")

pipeline = Pipeline([
        ('vec', vectorizer),
        ('model', model)], )

# Fit the full pipeline

# This means we perform the steps laid out above
# First we fit the vectorizer,
# And then feed the output of that into the fit function of the model

pipeline.fit(X_train, y_train)


# Here again we apply the full pipeline for predictions
# The text is transformed automatically to match the features from the pipeline
pipeline.predict_proba(X_new)

array([[ 0.49530115,  0.50469885],
       [ 0.60684225,  0.39315775],
       [ 0.41797412,  0.58202588],
       ..., 
       [ 0.8437683 ,  0.1562317 ],
       [ 0.60684225,  0.39315775],
       [ 0.20297257,  0.79702743]])

In [96]:
pipeline.predict(X_new)

array([1, 0, 1, ..., 0, 0, 1])

In [99]:
#data[6000:].label

### Merging Feature Sets in Pipelines

We may want to merge many different feature sets automatically. Here we can use scikit-learn's `FeatureUnion`.

While scikit-learn pipelines help with managing the transformation from raw data, there may be many steps before this takes place in your pipeline. These pipelines are often referred to as ETL pipelines for (Extract, Transform, Load). In an ETL pipeline, the data is pulled or extracted from some source (like a database), transformed or manipulated, and then loaded into whatever system will analyze the data.

Many data science teams rely on software tools to manage these ETL pipelines. If a transformation step fails, these tools alert you, or ensure that step can be re-run. If these transformation steps need to happen daily or weekly, these tools can manage that timeline.

One of the most popular Python tools for this is [Luigi](https://github.com/spotify/luigi) developed by Spotify.
An alternative is [Airflow](https://github.com/airbnb/airflow) by AirBnB.

In [103]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# test `make_pipeline` vs `Pipeline`; are they different? NO both do the same thing
pipe1 = make_pipeline(StandardScaler(), LogisticRegression())

pipe2 = Pipeline(steps=[('standardscaler', StandardScaler()),
                     ('logistic_regr', LogisticRegression())])


In [104]:
pipe1

Pipeline(steps=[('standardscaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('logisticregression', LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

In [105]:
pipe2

Pipeline(steps=[('standardscaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('logistic_regr', LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False))])

### Check
In pairs, assign one function to each pair, they have to read about it in the doc and then explain it to the class.

1. Binarizer
1. KernelCenterer
1. MaxAbsScaler
1. MinMaxScaler
1. Normalizer
1. OneHotEncoder
1. PolynomialFeatures
1. RobustScaler
1. StandardScaler
1. Data Imputation

1. Imputer
1. Function Transformer

1. FunctionTransformer
1. Label Manipulators

1. LabelBinarizer
1. LabelEncoder
1. MultiLabelBinarizer

In [108]:
# implement custom transformers by extending the BaseClass in sklearn
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class FeatureMultiplier(BaseEstimator, TransformerMixin):
    def __init__(self, factor):
        self.factor = factor

    def transform(self, X, *_):
        return X * self.factor

    def fit(self, *_):
        return self

fm = FeatureMultiplier(2)

test = np.diag((1,2,3,4))
print test

fm.transform(test)

[[1 0 0 0]
 [0 2 0 0]
 [0 0 3 0]
 [0 0 0 4]]


array([[2, 0, 0, 0],
       [0, 4, 0, 0],
       [0, 0, 6, 0],
       [0, 0, 0, 8]])

How does this compare with `FunctionTransformer` from the preprocessing module?

Optional: Implement a custom transformer that selects a specific feature from a Pandas dataframe. It should be initialized with the column name or the column index and it should return the selected column when transforming a dataframe.

Revisit the salary prediction lab. How could you use `make_pipeline` and `make_union` to build a pipeline that performs the same steps all in one pass?

You will have to build something like this:

>Data: SelectCategoricalFeaturesTransformer: OneHotEncoder: FeatureUnion: Model: SelectNumericalFeaturesTransformer: Scaler

Students:
- Review lab and identify the steps that were performed
- For each step, determine input and output
- Is the input the whole dataframe or only a subset of the features?
- Is the output new features or a prediction?
- Identify what kind of transformer is needed:
    - Is it a custom transformer?
    - Does scikit-learn provide a transformer like this out of the box?
- If features are treated differently, how do we recombine ([Feature Union](http://scikit-learn.org/stable/modules/pipeline.html)) them?