# Spark Model Persistence

<font color='steelblue'>
<h3>
<span style="font-family:Comic sans MS; font-size:1.5em;">
Example of persisting pipeline and model<br>


 </span>
</h3>
</font>

<font color='gray'>
<span style="font-family:Comic sans MS; font-size:1.2em;">
Following processing is done:<br>
    <ul>
        <li> Two folders will be created in the current folder where this notebook is run.</li><ol>  
        <li> For Pipeline </li>
        <li> For Logistic Regression Model </li>
    </ol><br>
        <li> Most of the code is the same as the "Spark Project" except label handling during pipeline creation.</li><br>
        <li> Code for persisting pipeline and model are added.</li>
    </ul>
</span>
</font>

## Dataset Review

Attribute Information:

- age: continuous
- workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K

In [None]:
# Set up the environment for using pyspark
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
spark = SparkSession\
        .builder\
        .appName("Model Persistance")\
        .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")

## Load Data

In [None]:
dataset = spark.read.format('csv').options(header='true', inferSchema='true').load('../datasets/agent.csv')

In [None]:
cols = dataset.columns

In [None]:
dataset.toPandas().head()

## Preprocess Data - Building Pipeline
<br>
<span style="font-family:times, serif; font-size:14pt; font-style:bold">
Since we are going to try algorithms like Logistic Regression, we will have to convert the categorical variables in the dataset into numeric variables.<br>
There are 2 ways we can do this.
<ul>
    <li>Category Indexing</li> <ul>
       <li>This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}.</li>
       <li>This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)</li>
        </ul><br>
    <li>One-Hot Encoding</li>
    <ul><li>This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))</li></ul>
</ul>
    
</span>

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

## income (target variable) processing
<br>
<span style="font-family:times, serif; font-size:14pt; font-style:bold">

<ol>
    <li> Convert label into label indices using the StringIndexer</li>
    <li>For pipeline persistance there is no need for label since it will not be there in new data - so do not add to pipeline stages</li>
    <li>Perform fit and transform on the feature and add it to the dataframe</li>
</ol>
</span>


In [None]:
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
lStrModel = label_stringIdx.fit(dataset)
dataset = lStrModel.transform(dataset)
#stages += [label_stringIdx]

## Vector Assembler<br>
<span style="font-family:times, serif; font-size:14pt; font-style:bold">
Use a `VectorAssembler` to combine all the feature columns into a single vector column.<br>
This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.

</span>


In [None]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

## Run Pipeline

In [None]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)


## Save the pipeline<br>
<span style="font-family:times, serif; font-size:16pt; font-style:bold">
<font color='tomato'>
<ul>
<li><strong>write().overwrite().save is required to replace the existing model in the same folder.</strong><br></li>
<li><strong>if writing to different location for every execution of this notebook then .save(modelname) can be used</strong></li>
    </ul>  
    </font>
</span>

In [None]:
pipelineModel.write().overwrite().save('projPipeline')

In [None]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)

In [None]:
# Efficient way to convert to Pandas rather than converting full Spark Dataframe
dataset.limit(5).toPandas()

## Create Training and Test sets and print their shape

In [None]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=2345)
print('Training dataset count: {}'.format(trainingData.count()))
print('Test dataset count:     {}'.format(testData.count()))

## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [None]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [None]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show()

## Binary Classification Evaluator<br>
<span style="font-family:times, serif; font-size:14pt; font-style:bold">
We can use ``BinaryClassificationEvaluator`` to evaluate our model. We can set the required column names in `rawPredictionCol` and `labelCol` Param and the metric in `metricName` Param.
</span>


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [None]:
# print the metric name
evaluator.getMetricName()

## Now that the model is ready, save it

In [None]:
lrModel.write().overwrite().save('projlrModel')