## **PySpark Machine Learning**
###Pipeline Example ([Docs](https://spark.apache.org/docs/latest/ml-pipeline.html))

In machine learning, it is common to run a sequence of algorithms. We can bundle these algorithms and data processing steps into a Spark ML Pipeline.

A Pipeline is specified as a sequence of stages, and each stage is either a **Transformer** or an **Estimator**. The stages are specified as an ordered array (or DAG).

It's often a best practice to save a model or a pipeline to disk for later use. 

Below is an example Spark ML Pipeline that shows two Transformers (Tokenizer and HashingTF) and one Estimator (Logistic Regression). 

## **Install Spark Dependencies**



In [None]:
# Install Spark dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-3.1.1-bin-hadoop3.2.tgz
!wget --no-cookies --no-check-certificate https://ftp.wayne.edu/apache/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar zxvf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

In [6]:
!ls -al | grep spark

drwxr-xr-x 13 1000 1000      4096 Feb 22 02:11 spark-3.1.1-bin-hadoop3.2
-rw-r--r--  1 root root 228721937 Feb 22 02:45 spark-3.1.1-bin-hadoop3.2.tgz



## **Download Data**


In [7]:
!wget https://raw.githubusercontent.com/zaratsian/iaa_2020/master/data/loan_200k.csv

--2021-03-25 13:38:48--  https://raw.githubusercontent.com/zaratsian/iaa_2020/master/data/loan_200k.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18066044 (17M) [text/plain]
Saving to: ‘loan_200k.csv’


2021-03-25 13:38:49 (41.7 MB/s) - ‘loan_200k.csv’ saved [18066044/18066044]



## **Import Python and PySpark Libraries**

In [8]:
# Set up required environment variables
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Import Dependencies
import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, LinearRegression, GeneralizedLinearRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator

## **Initialize Spark Session**

In [11]:
spark = SparkSession.builder.appName("Spark ML Pipeline Example").master("local[*]").getOrCreate()

## **Read CSV into Spark**

In [12]:
loan_rawdata = spark.read.load('loan_200k.csv', format="csv", header=True, inferSchema=True)

## **Display first few records**

In [13]:
loan_rawdata.show(5, truncate=False)

+------+---------+---------+--------------+-------------+-------+-----+---------+-----------------+----------+------+--------+-------+------------------+--------+----------+----------+-----------+
|id    |member_id|loan_amnt|term_in_months|interest_rate|payment|grade|sub_grade|employment_length|home_owner|income|verified|default|purpose           |zip_code|addr_state|open_accts|credit_debt|
+------+---------+---------+--------------+-------------+-------+-----+---------+-----------------+----------+------+--------+-------+------------------+--------+----------+----------+-----------+
|123688|123685   |1800     |36            |17.22        |64.38  |G    |G3       |1                |0         |1896.0|0       |0      |debt_consolidation|853xx   |AZ        |3         |702        |
|139940|139937   |500      |36            |9.01         |15.91  |B    |B2       |1                |0         |2000.0|0       |1      |other             |727xx   |AR        |2         |0          |
|288342|288338 

In [14]:
loan_rawdata.dtypes

[('id', 'int'),
 ('member_id', 'int'),
 ('loan_amnt', 'int'),
 ('term_in_months', 'int'),
 ('interest_rate', 'double'),
 ('payment', 'double'),
 ('grade', 'string'),
 ('sub_grade', 'string'),
 ('employment_length', 'int'),
 ('home_owner', 'int'),
 ('income', 'double'),
 ('verified', 'int'),
 ('default', 'int'),
 ('purpose', 'string'),
 ('zip_code', 'string'),
 ('addr_state', 'string'),
 ('open_accts', 'int'),
 ('credit_debt', 'int')]

## **Light Data Exploration**

In [16]:
loan_rawdata.groupby(loan_rawdata.default).count().show(20,False)


+-------+------+
|default|count |
+-------+------+
|1      |26775 |
|0      |173225|
+-------+------+



## **Split Data into Training and Test**

In [17]:
traindata, testdata = loan_rawdata.randomSplit([0.80, 0.20])

In [18]:
print('Training Data: {}'.format(traindata.count()))
print('Test Data:     {}'.format(testdata.count()))

Training Data: 160280
Test Data:     39720


In [19]:
traindata.groupby(traindata.default).count().show(20,False)

+-------+------+
|default|count |
+-------+------+
|1      |21484 |
|0      |138796|
+-------+------+



In [20]:
testdata.groupby(testdata.default).count().show(20,False)

+-------+-----+
|default|count|
+-------+-----+
|1      |5291 |
|0      |34429|
+-------+-----+



## **Create Transform Objects**
These can be used within your feature engineering and model pipeline.

In [21]:
si  = StringIndexer(inputCol="purpose", outputCol="purpose_index")
hot = OneHotEncoder(inputCol="purpose_index", outputCol="purpose_features")
va  = VectorAssembler(inputCols=["loan_amnt", "interest_rate", "employment_length", "home_owner", "income", "verified", "open_accts", "credit_debt", "purpose_features"], outputCol="features")
dtr = DecisionTreeRegressor(featuresCol="features", labelCol="default", predictionCol="prediction", maxDepth=2, varianceCol="variance")
gbr = GBTRegressor(featuresCol="features", labelCol="default", predictionCol="prediction", maxDepth=5, maxBins=32, maxIter=20, seed=12345)
gbc = GBTClassifier(featuresCol="features", labelCol="default", predictionCol="prediction", maxDepth=5, maxIter=20, seed=12345)


In [22]:
pipeline = Pipeline(stages=[si, hot, va, gbc])

model = pipeline.fit(traindata)

In [23]:
predictions = model.transform(testdata)

In [24]:
predictions.select(['id','loan_amnt','default','prediction']).show(10,False)

+------+---------+-------+----------+
|id    |loan_amnt|default|prediction|
+------+---------+-------+----------+
|55521 |1000     |0      |0.0       |
|62102 |3200     |0      |0.0       |
|65426 |4000     |0      |0.0       |
|68926 |2300     |0      |0.0       |
|72323 |5000     |1      |0.0       |
|85818 |5000     |1      |0.0       |
|85961 |1200     |1      |0.0       |
|90665 |8500     |0      |0.0       |
|98984 |3500     |1      |0.0       |
|100214|5000     |0      |0.0       |
+------+---------+-------+----------+
only showing top 10 rows



In [25]:
predictions.groupBy(predictions['default'],predictions['prediction']).count().show()

+-------+----------+-----+
|default|prediction|count|
+-------+----------+-----+
|      1|       0.0| 5282|
|      0|       0.0|34419|
|      1|       1.0|    9|
|      0|       1.0|   10|
+-------+----------+-----+



In [26]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="default")
evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

0.8667673716012084