# Introduction

After partitioining the validation data set and processing with the pipeline, we will now implement our streaming data strategy to both train some online regressors using incremental learning, and process the accumulating training data simultaneously. This way we will also understand benchmark performance of incremental learning regressors. This is intended to link R processing pipeline that is extracting training data real-time.



In [38]:
############################################
# Note that this code will reset the cache
############################################
import pandas as pd
import numpy as np
import pickle

# Cache KNUMBERs to be skipped: initially these are documents in the validation set
disk = "/Volumes/Iomega_HDD/2016/Data science/Predictive-Modeling-510k-decision-time/"
with open(disk+"X_val.pkl", "rb") as f:
    X_val = pickle.load(f)
skip_knumber = list(X_val.KNUMBER)

with open(disk+"y_val.pkl", "rb") as f:
    y_val = pickle.load(f)

# Pickle these Knumbers to use and dynamically update later
with open(disk+"skip_knumber.pkl", "wb") as f:
    pickle.dump(skip_knumber,f)

# Just to obtain column names:
current_chunk = pd.read_csv(disk+"processed510kdata.csv", nrows=1)
column_names = list(current_chunk.columns)

# Initiate the skiprows to use and dynamically update later
skiprows = 0
with open(disk+"skiprows.pkl", "wb") as f:
    pickle.dump(skiprows,f)
    
# Initiate chunk_number to use and update later
chunk_number = 1
with open(disk+"chunk_number.pkl", "wb") as f:
    pickle.dump(chunk_number,f)
    
# A data frame to hold validation results to be updated
results = pd.DataFrame(columns=['chunk', 'nsamples', "SGDRegressor", 'PassiveAggressiveRegressor'])
with open(disk+"processed_chunks/results.pkl", "wb") as f:
    pickle.dump(results,f) 



In [39]:
# Feature extraction pipeline and utility functions
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import MaxAbsScaler,FunctionTransformer, Imputer
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.feature_extraction.text import HashingVectorizer

# First we build two utility functions to parse numeric and text data, 
# and wrap them using FunctionTransformer, so that they can be integrated into a sklearn pipeline:
def text_columns(X_train):
    return X_train.TEXT_FEATURES

def numeric_columns(X_train):
    numeric = ['APPLICANT_PRIOR_CLEARANCE_TO_DATE','DEVICENAME_PRIOR_CLEARANCE_TO_DATE']
    temp = X_train[numeric]
    return temp

get_numeric_data = FunctionTransformer(func = numeric_columns, validate=False) 
get_text_data = FunctionTransformer(func = text_columns,validate=False) 
# Note how we avoid putting any arguments into text_columns and numeric_columns

# We also need to create our regex token pattern to use in HashingVectorizer. 
TOKENS_ALPHANUMERIC = '[A-Za-z0-9]+(?=\\s+)'   
#Note this regex will match either a whitespace or a punctuation to tokenize the string vector on these preferences  

# We also need to redefine the default feature selection function for regression to properly place into our pipeline:
def f_regression(X,Y):
    import sklearn
    return sklearn.feature_selection.f_regression(X,Y,center = False) # default is center = True

# Function to add feature interactions in Sparse matrix
# From: https://github.com/drivendataorg/box-plots-sklearn/blob/master/src/features/SparseInteractions.py
from itertools import combinations

import numpy as np
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin

class SparseInteractions(BaseEstimator, TransformerMixin):
    def __init__(self, degree=2, feature_name_separator="_"):
        self.degree = degree
        self.feature_name_separator = feature_name_separator

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if not sparse.isspmatrix_csc(X):
            X = sparse.csc_matrix(X)

        if hasattr(X, "columns"):
            self.orig_col_names = X.columns
        else:
            self.orig_col_names = np.array([str(i) for i in range(X.shape[1])])

        spi = self._create_sparse_interactions(X)
        return spi

    def get_feature_names(self):
        return self.feature_names

    def _create_sparse_interactions(self, X):
        out_mat = []
        self.feature_names = self.orig_col_names.tolist()

        for sub_degree in range(2, self.degree + 1):
            for col_ixs in combinations(range(X.shape[1]), sub_degree):
                # add name for new column
                name = self.feature_name_separator.join(self.orig_col_names[list(col_ixs)])
                self.feature_names.append(name)

                # get column multiplications value
                out = X[:, col_ixs[0]]
                for j in col_ixs[1:]:
                    out = out.multiply(X[:, j])

                out_mat.append(out)

        return sparse.hstack([X] + out_mat)

# Definition of the actual pipeline
#  We will try to optimize for these values in feature selection
n_tokens = 1000
n_features = 600

pipeline510k = Pipeline([
    
    ("union",FeatureUnion( # Note that FeatureUnion() accepts list of tuples, the first half of each tuple is the name of the transformer
        
        transformer_list = [
            
            ("numeric_subpipeline", Pipeline([ # Note we have subpipeline branches inside the main pipeline
                
                ("parser",get_numeric_data), # Step1: parse the numeric data (note how we avoid () when using FunctionTransformer objects)
                ("imputer",Imputer()), # Step2: impute missing values (we don't expect any)
            
            ])), # Branching point of the FeatureUnion
            
            ("text_subpipeline",Pipeline([
            
                ("parser",get_text_data), # Step1: parse the text data 
                ("tokenizer",HashingVectorizer(token_pattern= TOKENS_ALPHANUMERIC,n_features= 2 ** 18,
                                             stop_words = "english",# We will remove English stop words before tokenization
                                             ngram_range = (1,1),decode_error='ignore', # We will tokenize to single words only
                                             non_negative=True, norm=None, binary=True,alternate_sign=False  
                                            )) # Step2: use HashingVectorizer for automated tokenization and feature extraction
                                            #('dim_red1', SelectKBest(f_regression, k = n_tokens)) # Step3: use dimension reduction to select n_tokens of best features
                
            ]))
        ]
    
    )),# Branching point to the main pipeline: at this point all features are numeric
    
    #("int", SparseInteractions(degree=2)), # Add polynomial interaction terms 
    ("scaler",MaxAbsScaler()) # Scale the features
    #('dim_red2', SelectKBest(f_regression, k =n_features)) # Add another dimension reduction step at the end
])

In [40]:
# Process the validation set and save into disk
disk = "/Volumes/Iomega_HDD/2016/Data science/Predictive-Modeling-510k-decision-time/"
import warnings
from datetime import datetime
warnings.filterwarnings("ignore")
start = datetime.now()
X_val_proc = pipeline510k.fit_transform(X_val)
process = datetime.now() - start
print("It took: " + str(process.seconds/60) + " minutes.")

It took: 0.26666666666666666 minutes.


In [41]:
X_val_proc.shape

(8083, 262146)

In [42]:
with open(disk+"X_val_proc.pkl","wb") as f:
        pickle.dump(X_val_proc,f)
print("Saved the processed validation set.")  

Saved the processed validation set.


In [43]:
# Instance of regressors
from sklearn.linear_model import SGDRegressor,PassiveAggressiveRegressor
sgd = SGDRegressor()
pagg = PassiveAggressiveRegressor()

In [None]:
import pandas as pd
import pickle
import numpy as np
import warnings
from datetime import datetime
from sklearn.metrics import mean_squared_error

chunksize = 10 # This is the lmiting factor for the memory
iterations = 3400

# Function to return validation rmse scores
def calculate_rmse(X_val_trans,y_val,reg):
    preds = reg.predict(X_val_trans)
    return np.sqrt(mean_squared_error(y_true=y_val,y_pred=preds))    

# Load previously procesed and loacked-down validation data
with open(disk+"X_val_proc.pkl","rb") as f:
    X_val_proc = pickle.load(f)
print("Loaded processed validation set.")


with open(disk+"y_val.pkl","rb") as f:
    y_val = pickle.load(f)
print("Loaded targets for validation set.") 

warnings.filterwarnings("ignore")


for i in range(iterations):
    disk = "/Volumes/Iomega_HDD/2016/Data science/Predictive-Modeling-510k-decision-time/"

    #####################################################################
    # Streaming training data processing section
    #####################################################################
    with open(disk+"chunk_number.pkl", "rb") as f:
        chunk_number = pickle.load(f)
    print(">> Started processing chunk: "+ str(chunk_number))
    start = datetime.now()
    
    # Get the number of rows to be skipped
    with open(disk+"skiprows.pkl", "rb") as f:
        skiprows=pickle.load(f)

    # Read current_chunk from accumluating file: "processed510kdata.csv"
    current_chunk = pd.read_csv(disk+"processed510kdata.csv", header= 0,
                                skiprows= skiprows,nrows= chunksize,
                               names = column_names)
    print(">> Loaded chunk: "+ str(chunk_number))

    # Determine the chunks to be skipped and update current_chunk
    with open(disk+"skip_knumber.pkl", "rb") as f:
        skip_knumber = pickle.load(f)

    difference = pd.DataFrame()
    difference["KNUMBER"] = np.setdiff1d(current_chunk.KNUMBER,skip_knumber)
    current_chunk = pd.merge(current_chunk,difference,on = 'KNUMBER')
    print(">> Filtered chunk: "+ str(chunk_number))


    # Seperate features and target before the pipeline step
    X = current_chunk.drop("DECISIONTIME", axis=1) # Features
    y = current_chunk.DECISIONTIME # Target values

    # Process the current chunk using pipeline
    current_chunk_proc = pipeline510k.transform(X)
    print(">> Pipeline-processed chunk: "+ str(chunk_number))
    print(">> Shape of the processed chunk: "+ str(current_chunk_proc.shape))
    
    # Save the procesed current_chunk features and target into "processed_chunks" folder
    # Note that we will use chunk_number for indexing these chunks so that we can aggregate them in the right order later
    # Note that we are saving all 1048578 features without feature selection at the moment
    
    with open(disk+"processed_chunks/X_train_chunk_proc_"+str(chunk_number)+".pkl", "wb") as f:
        pickle.dump(current_chunk_proc,f)

    with open(disk+"processed_chunks/y_train_chunk_proc_"+str(chunk_number)+".pkl", "wb") as f:
        pickle.dump(y,f)      

    print(">> Saved features and target for chunk: "+ str(chunk_number))    
    print(">> Completed processing chunk: "+ str(chunk_number))
    process = datetime.now() - start
    print("It took: " + str(process.seconds/60) + " minutes to process this chunk.")
    
    #####################################################################
    # Online (incremental learning) section
    #####################################################################
    ###########################
    # Train online regressors
    ##########################
    
    # Container to collect current rmse scores
    scores = pd.DataFrame()
    scores["chunk"] = pd.Series(chunk_number)
    scores["nsamples"] = pd.Series(skiprows+chunksize)
    
    sgd.partial_fit(current_chunk_proc.toarray(),y)
    sgd_val_score = calculate_rmse(X_val_proc.toarray(),y_val,sgd)
    print(" >>> Completed Sgd regressor with a validation rmse of "+str(sgd_val_score))
    pagg.partial_fit(current_chunk_proc.toarray(),y)    
    pagg_val_score = calculate_rmse(X_val_proc.toarray(),y_val,pagg)          
    print(" >>> Completed Pagg regressor with a validation rmse of "+str(pagg_val_score))
    print("*" * 40)
    
    scores["SGD"] = pd.Series(sgd_val_score)
    scores["PAGG"] = pd.Series(pagg_val_score)
    
    # Update results data frame by adding scores
    with open(disk+"processed_chunks/results.pkl", "rb") as f:
        results = pickle.load(f)
    results = pd.concat([results,scores], axis = 0)
    with open(disk+"processed_chunks/results.pkl", "wb") as f:
        pickle.dump(results,f)
        
    print(">> Completed incremental learning for chunk: "+ str(chunk_number))
    
    #######################################
    # Upon each sucessful process, update:
    #######################################

    # skip_knumber
    skip_knumber.extend(current_chunk.KNUMBER.tolist())
    with open(disk+"skip_knumber.pkl", "wb") as f:
        pickle.dump(skip_knumber,f)

    # skiprows
    skiprows += chunksize
    with open(disk+"skiprows.pkl", "wb") as f:
        pickle.dump(skiprows,f)

    # chunk_number
    chunk_number += 1
    with open(disk+"chunk_number.pkl", "wb") as f:
        pickle.dump(chunk_number,f)
    print("*" * 80)

Loaded processed validation set.
Loaded targets for validation set.
>> Started processing chunk: 1
>> Loaded chunk: 1
>> Filtered chunk: 1
>> Pipeline-processed chunk: 1
>> Shape of the processed chunk: (7, 262146)
>> Saved features and target for chunk: 1
>> Completed processing chunk: 1
It took: 0.0 minutes to process this chunk.
 >>> Completed Sgd regressor with a validation rmse of 459.187675774
 >>> Completed Pagg regressor with a validation rmse of 122.803106328
****************************************
>> Completed incremental learning for chunk: 1
********************************************************************************
>> Started processing chunk: 2
>> Loaded chunk: 2
>> Filtered chunk: 2
>> Pipeline-processed chunk: 2
>> Shape of the processed chunk: (8, 262146)
>> Saved features and target for chunk: 2
>> Completed processing chunk: 2
It took: 0.0 minutes to process this chunk.
 >>> Completed Sgd regressor with a validation rmse of 315.535562446
 >>> Completed Pagg reg

In [102]:
with open(disk+"processed_chunks/results.pkl", "rb") as f:
        results = pickle.load(f)
results        

Unnamed: 0,chunk,nsamples,SGDRegressor,PassiveAggressiveRegressor
0,,100,122.006116,106.630278
0,,200,113.2682,155.717459


In [18]:
X_val_proc100 = SelectKBest(score_func=f_regression,k = 100)



TypeError: __init__() got multiple values for argument 'score_func'