<p style="font-family: Arial; font-size:3.75em;color:purple; font-style:bold"><br>
Spark Pipeline </p><br>
                                
Similar to scikit-learn, Pyspark has a pipeline API.

A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

Index the string to numeric
Create the one hot encoder
Transform the data
Two APIs do the job: StringIndexer, OneHotEncoder

First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from sklearn.datasets import load_boston
from pyspark import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
pd.pandas.set_option("display.max_columns", None)


sc = SparkSession.builder.appName('Kithavho').getOrCreate()
sqlContext = SQLContext(sc)
data_frame = sc.read.csv('titanic_train.csv', header=True, inferSchema=True)
data_frame.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [2]:
indexer = StringIndexer(inputCol='Sex', outputCol='Sex_encoded')

In [3]:
indexed = indexer.fit(data_frame).transform(data_frame)
indexed.select('Sex', 'Cabin', 'Sex_encoded').show(11)

+------+-----+-----------+
|   Sex|Cabin|Sex_encoded|
+------+-----+-----------+
|  male| null|        0.0|
|female|  C85|        1.0|
|female| null|        1.0|
|female| C123|        1.0|
|  male| null|        0.0|
|  male| null|        0.0|
|  male|  E46|        0.0|
|  male| null|        0.0|
|female| null|        1.0|
|female| null|        1.0|
|female|   G6|        1.0|
+------+-----+-----------+
only showing top 11 rows



In [4]:
data_frame.groupBy('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [5]:
indexer_two = StringIndexer(inputCol='Embarked', outputCol='Embarked_indexed')
indexed_two = indexer_two.fit(data_frame).transform(data_frame)
one_hot = OneHotEncoder(dropLast=False, inputCol='Embarked_indexed', outputCol='Embarked_encoded')
one_hotted = one_hot.fit(indexed_two).transform(indexed_two)
one_hotted.select('Embarked', 'Embarked_encoded').show()

+--------+----------------+
|Embarked|Embarked_encoded|
+--------+----------------+
|       S|   (3,[0],[1.0])|
|       C|   (3,[1],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       Q|   (3,[2],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       C|   (3,[1],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       Q|   (3,[2],[1.0])|
|       S|   (3,[0],[1.0])|
|       S|   (3,[0],[1.0])|
|       C|   (3,[1],[1.0])|
+--------+----------------+
only showing top 20 rows



In [6]:
one_hotted = one_hotted.fillna({'Age': 35, 'Cabin': 'K7'})
#one_hotted.show(5)
one_hotted = one_hotted.dropna()

In [7]:
one_hotted = one_hotted.drop('PassengerId', 'Name')
one_hotted.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = true)
 |-- Embarked_indexed: double (nullable = false)
 |-- Embarked_encoded: vector (nullable = true)



### Build the pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

**Encode the categorical data

**Index the label feature

**Add continuous variable

**Assemble the steps.

**Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.


##### 1. Encode the categorical data

This step is exaclty the same as the above example, except that you loop over all the categorical features.

The stages empty list is for stages in your pipeline

In [8]:
categoricals = ['Cabin', 'Sex', 'Ticket']
stages = []

for categorical in categoricals:
    string_indexer = StringIndexer(inputCol=categorical, outputCol=categorical+'Index')
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categorical+'class_vec'])
    stages+=[string_indexer, encoder]

#### 2. Index the target variable

Spark like most libraries do not accept string values for the target so if your dataset has string target then follow the following piece of code.

In [9]:
label_string_index = StringIndexer(inputCol='Survived', outputCol='Survived_two')
stages+=[label_string_index]


#### 3.) Add continuous variable

Input cols of Vector Assembler takes alist of columns. You can create a new list containing all new columns.
the code below will populate the list with encoded categorical features and continuous features

In [10]:
continuous_features = ['Pclass', 'SibSp', 'Parch', 'Fare']
assembler_inputs = [c+'class_vec' for c in categoricals]+ continuous_features

#### 4.) Assemble the steps

Finally, you pass all the steps into the assembler

In [11]:
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')
stages+=[assembler]

Now that all steps are ready we can push the data into the pipeline

In [12]:
data_frame = data_frame.fillna({'Age': 35, 'Cabin': 'K7'})
#one_hotted.show(5)
data_frame= data_frame.dropna()
data_frame = data_frame.drop('PassengerId', 'Name')

In [13]:
# Create a Pipeline
pipeline = Pipeline(stages=stages)
pipeline_model=pipeline.fit(data_frame)
model = pipeline_model.transform(data_frame)

In [14]:
model.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = false)
 |-- Embarked: string (nullable = true)
 |-- CabinIndex: double (nullable = false)
 |-- Cabinclass_vec: vector (nullable = true)
 |-- SexIndex: double (nullable = false)
 |-- Sexclass_vec: vector (nullable = true)
 |-- TicketIndex: double (nullable = false)
 |-- Ticketclass_vec: vector (nullable = true)
 |-- Survived_two: double (nullable = false)
 |-- features: vector (nullable = true)



If one checks above the new dataset by the entry above, one can see that it includes all columns; old and new transformed ones.

## Build the regressor/classifier depending with the problem
Remember, if your working with an RDD, try converting it to a DataFrame as it is faster to process large datasets

The model dataset is already a dataFrame but to convert one can use toDF() or rdd to work in reverse. 
To pull out the new feature

In [15]:
input_data = model.rdd.map(lambda x: (x['Survived_two'], DenseVector(x['features'])))

to create a train dataFrame you can use sqlContext

In [16]:
#df_train = input_data.toDF()
type(input_data)

pyspark.rdd.PipelinedRDD

In [17]:
type(model)

pyspark.sql.dataframe.DataFrame

In [18]:
 model.select('Sexclass_vec', 'Ticketclass_vec','features').show(10)

+-------------+-----------------+--------------------+
| Sexclass_vec|  Ticketclass_vec|            features|
+-------------+-----------------+--------------------+
|(1,[0],[1.0])|(679,[558],[1.0])|(830,[0,146,705,8...|
|    (1,[],[])|(679,[612],[1.0])|(830,[103,759,826...|
|    (1,[],[])|(679,[671],[1.0])|(830,[0,818,826,8...|
|    (1,[],[])| (679,[46],[1.0])|(830,[19,193,826,...|
|(1,[0],[1.0])|(679,[514],[1.0])|(830,[0,146,661,8...|
|(1,[0],[1.0])|(679,[341],[1.0])|(830,[0,146,488,8...|
|(1,[0],[1.0])|(679,[194],[1.0])|(830,[136,146,341...|
|(1,[0],[1.0])| (679,[14],[1.0])|(830,[0,146,161,8...|
|    (1,[],[])| (679,[28],[1.0])|(830,[0,175,826,8...|
|    (1,[],[])| (679,[65],[1.0])|(830,[0,212,826,8...|
+-------------+-----------------+--------------------+
only showing top 10 rows



#### Create a data_frame for quicker processing

In [19]:
df_train = sqlContext.createDataFrame(input_data, ['Survived_three', 'features'])
df_train.show(2)

+--------------+--------------------+
|Survived_three|            features|
+--------------+--------------------+
|           0.0|[1.0,0.0,0.0,0.0,...|
|           1.0|[0.0,0.0,0.0,0.0,...|
+--------------+--------------------+
only showing top 2 rows



#### Split data into train and test data

In [20]:
train_data, test_data = df_train.randomSplit([0.8, 0.2], seed=1234)
train_data.groupby('Survived_three').agg({'Survived_three': 'count'}).show()

+--------------+---------------------+
|Survived_three|count(Survived_three)|
+--------------+---------------------+
|           0.0|                  429|
|           1.0|                  278|
+--------------+---------------------+



In [21]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='Survived_three', featuresCol='features', maxIter=10, regParam=0.30)
linearModel = lr.fit(train_data)

Now lets print coefficients and intercepts of the linear regression

In [22]:
print('Coefficients: '+ str(linearModel.coefficients))
print('Intercepts: '+ str(linearModel.intercept))

Coefficients: [-0.2811389063904822,0.520199534010019,-0.14796212404392703,0.0017792641670645115,-1.318150425556913,0.25303593189717877,0.6337081990786746,0.3977590396587624,0.6342784165722304,0.5526922459980488,0.6890704290420218,-0.03325509707351257,0.5515850528999431,0.7654130256775779,0.46280940182161134,0.47796339724192116,0.438579689658369,0.43095532129084413,0.540801772861677,0.5720049488240126,-0.5640444174702728,0.6592968596821579,0.0,-0.012500925715604017,0.7926848198337345,0.5380484162680356,-0.27980288458914576,-0.02312147654909265,-0.024269548214975317,0.6699750789157642,0.5496331426740162,0.577475675181702,0.5567793535736245,-0.5976083442535071,0.6621420783366327,0.6862201759151372,0.7831428630255016,0.9013615689498785,0.7956262774521466,0.7955952237285838,0.554508762668509,-0.00504816719683115,0.5556293123321875,0.7887151918795934,-0.46031615051583485,0.7535201382372211,-0.5711555927257356,-0.5783625304132612,0.5801229588238112,-0.5625261003933331,0.0,0.0,-0.5774521363990

#### 5.)  Train and evaluate a model

to generate prediction for your test set,

You can use linearModel with transform() method on test_data

In [23]:
predictions = linearModel.transform(test_data)

You can print the elements in predictions

In [24]:
predictions.printSchema()

root
 |-- Survived_three: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Next we can see the label, prediction and the probability

In [25]:
selected = predictions.select('Survived_three', 'prediction', 'probability')
selected.show()

+--------------+----------+--------------------+
|Survived_three|prediction|         probability|
+--------------+----------+--------------------+
|           0.0|       0.0|[0.69285197803288...|
|           0.0|       0.0|[0.59266210850317...|
|           0.0|       0.0|[0.59668567630403...|
|           0.0|       0.0|[0.59806580809939...|
|           0.0|       0.0|[0.71609531823137...|
|           0.0|       0.0|[0.61190235960121...|
|           0.0|       0.0|[0.70780567179051...|
|           0.0|       0.0|[0.59459792507724...|
|           0.0|       1.0|[0.31325609359218...|
|           0.0|       0.0|[0.81338883913905...|
|           0.0|       1.0|[0.30860478356656...|
|           0.0|       1.0|[0.31905227992710...|
|           0.0|       1.0|[0.28086900041362...|
|           0.0|       1.0|[0.49173058797778...|
|           0.0|       0.0|[0.63791093631553...|
|           0.0|       0.0|[0.54376228840651...|
|           0.0|       0.0|[0.57525427913056...|
|           0.0|    

#### Evaluate the model

You need to look at the accuracy metric to see how well the model performs. Currently there is no API to compute the accuracy measure in Spark.
The default value is the ROC.

Lets construct an accuracy measure.

In [26]:
cm = predictions.select('Survived_three', 'prediction')

In [27]:
cm.groupby('Survived_three').agg({'Survived_three': 'count'}).show()

+--------------+---------------------+
|Survived_three|count(Survived_three)|
+--------------+---------------------+
|           0.0|                  120|
|           1.0|                   62|
+--------------+---------------------+



In [28]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|              140|
|       1.0|               42|
+----------+-----------------+



To compute the accuracy we can use the filter function where Survived matched with predictions then divide by total count

In [29]:
acc = cm.filter(cm.Survived_three==cm.prediction).count()/cm.count()
print(f'The model accurcay is {acc*100}')

The model accurcay is 74.72527472527473


You can wrap everything in a function

In [30]:
def accuracy_m(model):
    predictions = model.transform(test_data)
    cm = predictions.select('Survived_three', 'prediction')
    acc = cm.filter(cm.Survived_three==cm.prediction).count()/cm.count()
    print(f'The model accurcay is {acc*100}')
    

In [31]:
accuracy_m(linearModel)

The model accurcay is 74.72527472527473


#### ROC metrics

In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [33]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Survived_three' )
print(f'For the metric {evaluator.getMetricName()}, is {evaluator.evaluate(predictions)}')


For the metric areaUnderROC, is 0.8558467741935487


### Step 6) Hyperparameter turning

In [34]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

Create paramGrid for cross validation

In [35]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5]).build())

Finally you use five-fold crossvalidation

In [36]:
from time import *
start_time = time()

cv = CrossValidator(estimator = lr, estimatorParamMaps = paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train_data)
end_time = time()
elapsed_time = end_time-start_time
print(f'The model took {elapsed_time} to train')

The model took 169.82509541511536 to train


In [37]:
accuracy_m(cvModel)
best_model = cvModel.bestModel
best_model.extractParamMap()

The model accurcay is 77.47252747252747


{Param(parent='LogisticRegression_7297c5e62178', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_7297c5e62178', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_7297c5e62178', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_7297c5e62178', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_7297c5e62178', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_7297c5e62178', name='labelCol', doc='label column name.'): 'Survived_three',
 Param(parent='LogisticRegression_7297c5e62178', name='maxIter', doc='max number of iterations (

In [38]:
import os, tempfile

path = tempfile.mkdtemp()
best_model.save(sc, path)

TypeError: save() takes 2 positional arguments but 3 were given