# Scikit-learn pipeline

In [4]:
import pandas as pd
import numpy as np
from sklearn.pipeline import make_pipeline
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import FunctionTransformer
from sklearn.model_selection import cross_val_score
from sklearn.feature_extraction import DictVectorizer

ds = pd.read_csv('titanic.csv')
features = ds.drop(['survived', 'alive'], axis=1)

empty_space = FunctionTransformer(
    lambda x: x.replace(r'\s+', np.nan, regex=True), validate=False)
df2dict = FunctionTransformer(
    lambda x: x.to_dict(orient='records'), validate=False)

pl = make_pipeline(
    empty_space,
    df2dict,
    DictVectorizer(sparse=False),
    SimpleImputer(strategy='most_frequent'),
    GradientBoostingClassifier(
        n_estimators=100, learning_rate=0.02, random_state=1, max_depth=3)
)

cv = cross_val_score(pl, features, ds.survived, cv=3, scoring='roc_auc')
cv.mean(), cv.std()

(np.float64(0.8590898923081839), np.float64(0.018875854883421084))

# Spark ML GBT pipeline

In [5]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()

sdf = ss.read.csv('titanic.csv', header=True)

numCols = [
    'pclass', 'age', 'sibsp',
    'parch', 'fare', 'alone'
]

for col in numCols:
    sdf = sdf.withColumn(
        col, sdf[col].astype('decimal'))
    
sdf = sdf.withColumn(
    'survived', sdf['survived'].astype('int'))

categoricalCols =[
    'sex', 'embarked', 'class',
    'deck', 'who', 'embark_town'
]

indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=col+'Idx',
        handleInvalid='skip')
    for col in categoricalCols
]

idxCols = [col+'Idx' for col in categoricalCols]

assembler = VectorAssembler(
    inputCols=idxCols + numCols,
    outputCol="features")

cl = GBTClassifier(
    labelCol="survived",
    maxIter=100, maxDepth=3, stepSize=0.02)

pl = Pipeline(stages=indexers + [assembler, cl])

sdf_fna = sdf.fillna(0).replace('', 'NA')
train_df, test_df = sdf_fna.randomSplit([0.7, 0.3])

m = pl.fit(train_df)

predictions = m.transform(test_df)

evaluator = BinaryClassificationEvaluator(
    labelCol="survived",
    rawPredictionCol="prediction",
    metricName="areaUnderROC"
)

evaluator.evaluate(predictions)

25/03/17 01:30:07 WARN Utils: Your hostname, macbook-dllllb.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
25/03/17 01:30:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/17 01:30:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/17 01:30:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/17 01:30:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


0.7166666666666668

# Spark ML LR pileline

In [6]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()

sdf = ss.read.csv('titanic.csv', header=True)

numCols = [
    'pclass', 'age', 'sibsp',
    'parch', 'fare', 'alone'
]

for col in numCols:
    sdf = sdf.withColumn(
        col, sdf[col].astype('decimal'))
    
sdf = sdf.withColumn(
    'survived', sdf['survived'].astype('int'))

categoricalCols =[
    'sex', 'embarked', 'class',
    'deck', 'who', 'embark_town'
]

indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=col+'Idx',
        handleInvalid='skip')
    for col in categoricalCols
]

encoders = [
    OneHotEncoder(
        inputCol=col+'Idx',
        outputCol=col+'Bin')
    for col in categoricalCols
]

encCols = [col+'Bin' for col in categoricalCols]

assembler = VectorAssembler(
    inputCols=encCols + numCols,
    outputCol="features")

cl = LogisticRegression(labelCol="survived", maxIter=10, regParam=0.1)

pl = Pipeline(stages=indexers + encoders + [assembler, cl])

sdf_fna = sdf.fillna(0).replace('', 'NA')
train_df, test_df = sdf_fna.randomSplit([0.7, 0.3])

m = pl.fit(train_df)

predictions = m.transform(test_df)

evaluator = BinaryClassificationEvaluator(
    labelCol="survived",
    rawPredictionCol="prediction",
    metricName="areaUnderROC"
)

evaluator.evaluate(predictions)

0.7854651162790697

# Logistic regression feature importace for polynomial features

In [7]:
import numpy as np
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, PolynomialExpansion
from pyspark.ml.pipeline import Pipeline

from pyspark.sql import SparkSession
ss = SparkSession.builder.config('spark.driver.bindAddress', '127.0.0.1').getOrCreate()

sdf = ss.read.csv('titanic.csv', header=True)

# only 2-category features can be used without binarization
categoricalCols =['sex'] #,'embarked', 'class', 'deck', 'who', 'embark_town']

indexers = [
    StringIndexer(inputCol=col, outputCol=col+'Idx', handleInvalid='skip')
    for col in categoricalCols
]

idxCols = [col+'Idx' for col in categoricalCols]

numCols = ['pclass', 'age', 'sibsp', 'parch', 'fare', 'alone']

for col in numCols:
    sdf = sdf.withColumn(col, sdf[col].astype('decimal'))
    
sdf = sdf.withColumn('survived', sdf['survived'].astype('int'))

assembler = VectorAssembler(
    inputCols=idxCols + numCols,
    outputCol="features")

pe = PolynomialExpansion(degree=2, inputCol='features', outputCol='features_p')

cl = LogisticRegression(featuresCol='features_p', labelCol="survived", maxIter=10, regParam=0.1)

pl = Pipeline(stages=indexers + [assembler, pe, cl])

m = pl.fit(sdf.fillna(0).replace('', 'NA'))

import pandas as pd

fnames = idxCols + numCols
pnames = [
    n+'*'+n2
    for i, n in zip(range(len(fnames)), fnames)
    for n2 in (['1']+fnames)[:i+2]]

weights = m.stages[-1].coefficients.array

pd.DataFrame(
    {'weights': weights, 'importance': np.abs(weights), 'names': pnames}
).sort_values('importance', ascending=False)[:10]

25/03/17 01:30:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Unnamed: 0,weights,importance,names
0,0.719942,0.719942,sexIdx*1
1,0.719942,0.719942,sexIdx*sexIdx
2,-0.272012,0.272012,pclass*1
14,0.136841,0.136841,parch*1
3,0.122216,0.122216,pclass*sexIdx
4,-0.069104,0.069104,pclass*pclass
15,-0.067579,0.067579,parch*sexIdx
10,-0.066205,0.066205,sibsp*sexIdx
18,-0.041047,0.041047,parch*sibsp
11,-0.030314,0.030314,sibsp*pclass


# String Indexer output

In [8]:
from pyspark.ml.feature import StringIndexer
import numpy as np
from pyspark.ml.linalg import Vectors

from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()

si = StringIndexer(inputCol='in', outputCol='out')

rows = [
    {'in': 'm'},
    {'in': 'm'},
    {'in': 'f'},
    {'in': 'f'},
    {'in': 'm'},
]

df = ss.createDataFrame(rows)

si.fit(df).transform(df).toPandas()

Unnamed: 0,in,out
0,m,0.0
1,m,0.0
2,f,1.0
3,f,1.0
4,m,0.0


# Polynomial features order

In [9]:
from pyspark.ml.feature import PolynomialExpansion
import numpy as np
from pyspark.ml.linalg import Vectors

from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()

pe = PolynomialExpansion(degree=2, inputCol='in', outputCol='out')

rows = [
    {'in': Vectors.dense([2, 10, 20])},
]

df = ss.createDataFrame(rows)

pe.transform(df).collect()[0].out.toArray()

array([  2.,   4.,  10.,  20., 100.,  20.,  40., 200., 400.])