## Creating a Regression Model

In this exercise, you will implement a regression model that will predict the final score of the students.

### Import Spark SQL and Spark ML Libraries

First, import the libraries you will need:

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("yarn").appName("GLIM_PGPBABI").enableHiveSupport().getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1485153670758_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [41]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from __future__ import division

### Load Source Data
We will load the studentmaster Hive table into a Saprk Dataframe

In [3]:
student_df = spark.sql("SELECT * FROM studentmaster")

In [13]:
student_df.printSchema()

root
 |-- studentid: integer (nullable = true)
 |-- school: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- addresstype: integer (nullable = true)
 |-- famsize: integer (nullable = true)
 |-- pstatus: integer (nullable = true)
 |-- medu: integer (nullable = true)
 |-- fedu: integer (nullable = true)
 |-- mjob: integer (nullable = true)
 |-- fjob: integer (nullable = true)
 |-- reason: integer (nullable = true)
 |-- guardian: integer (nullable = true)
 |-- traveltime: integer (nullable = true)
 |-- studytime: integer (nullable = true)
 |-- failures: integer (nullable = true)
 |-- schoolsup: integer (nullable = true)
 |-- famsup: integer (nullable = true)
 |-- paid: integer (nullable = true)
 |-- activities: integer (nullable = true)
 |-- nursery: integer (nullable = true)
 |-- higher: integer (nullable = true)
 |-- internet: integer (nullable = true)
 |-- romantic: integer (nullable = true)
 |-- famrel: integer (nullable = true)
 |-

### Prepare the Data
Most modeling begins with exhaustive exploration and preparation of the data. In this example, the data has been cleaned for you. You will select all the columns to use as *features* and create a Boolean *label* field named **BelowMedian** with the value **1** for students scoring less than the median, or **0** if they score greater than or equal to the median score.

(Note that in a real scenario, you would perform additional tasks such as handling missing or duplicated data, scaling numeric columns, and using a process called *feature engineering* to create new features for your model).

In [21]:
data = student_df.withColumnRenamed("g3","label").drop("smoteclass")

In [19]:
data.count()

2320

In [22]:
data.printSchema()

root
 |-- studentid: integer (nullable = true)
 |-- school: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- addresstype: integer (nullable = true)
 |-- famsize: integer (nullable = true)
 |-- pstatus: integer (nullable = true)
 |-- medu: integer (nullable = true)
 |-- fedu: integer (nullable = true)
 |-- mjob: integer (nullable = true)
 |-- fjob: integer (nullable = true)
 |-- reason: integer (nullable = true)
 |-- guardian: integer (nullable = true)
 |-- traveltime: integer (nullable = true)
 |-- studytime: integer (nullable = true)
 |-- failures: integer (nullable = true)
 |-- schoolsup: integer (nullable = true)
 |-- famsup: integer (nullable = true)
 |-- paid: integer (nullable = true)
 |-- activities: integer (nullable = true)
 |-- nursery: integer (nullable = true)
 |-- higher: integer (nullable = true)
 |-- internet: integer (nullable = true)
 |-- romantic: integer (nullable = true)
 |-- famrel: integer (nullable = true)
 |-

### Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. In this exercise, you will use 80% of the data for training, and reserve 20% for testing.

In [24]:
splits = data.randomSplit([0.8, 0.2])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print "Training Rows:", train_rows, " Testing Rows:", test_rows

Training Rows: 1887  Testing Rows: 433

## Define the Pipeline
A predictive model often requires multiple stages of feature preparation. For example, it is common when using some algorithms to distingish between continuous features (which have a calculable numeric value) and categorical features (which are numeric representations of discrete categories). It is also common to *normalize* continuous numeric features to use a common scale (for example, by scaling all numbers to a proportinal decimal value between 0 and 1).

A pipeline consists of a a series of *transformer* and *estimator* stages that typically prepare a DataFrame for
modeling and then train a predictive model. In this case, you will create a pipeline with six stages:
- A **VectorAssembler** that combines categorical features into a single vector
- A **VectorIndexer** that creates indexes for a vector of categorical features
- A **VectorAssembler** that creates a vector of continuous numeric features
- A **MinMaxScaler** that normalizes continuous numeric features
- A **VectorAssembler** that creates a vector of categorical and continuous features
- A **LinearRegression** model.

In [48]:
#field names to be excluded from the list of categorical variables
remove_from_cat_input = ['studentid','age', 'absences', 'g1', 'g2', 'label']

In [49]:
cat_inputs = train.columns

for i in remove_from_cat_input:
    cat_inputs.remove(i)

cat_inputs

['school', 'sex', 'addresstype', 'famsize', 'pstatus', 'medu', 'fedu', 'mjob', 'fjob', 'reason', 'guardian', 'traveltime', 'studytime', 'failures', 'schoolsup', 'famsup', 'paid', 'activities', 'nursery', 'higher', 'internet', 'romantic', 'famrel', 'freetime', 'goout', 'dalc', 'walc', 'health']

In [50]:
num_inputs = ['age','absences', 'g1', 'g2']

In [51]:
catVect = VectorAssembler(inputCols = cat_inputs, outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = num_inputs, outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features")
pipeline = Pipeline(stages=[catVect, catIdx, numVect, minMax, featVect, lr])

### Tune Parameters
You can tune parameters to find the best model for your data. To do this you can use the  **CrossValidator** class to evaluate each combination of parameters defined in a **ParameterGrid** against multiple *folds* of the data split into training and validation datasets, in order to find the best performing parameters. Note that this can take a long time to run because every parameter combination is tried multiple times.

In [53]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
cv = CrossValidator(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=2)

In [54]:
model = cv.fit(train)

### Test the Model
Now you're ready to apply the model to the test data.

In [55]:
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(20)

+--------------------+------------------+---------+
|            features|        prediction|trueLabel|
+--------------------+------------------+---------+
|[0.0,0.0,1.0,0.0,...| 9.628018866283005|       10|
|[0.0,0.0,1.0,0.0,...|10.452940348838975|       10|
|(32,[2,4,5,6,8,10...|12.714638998809821|       13|
|(32,[2,5,6,7,8,9,...| 11.88654685222408|       11|
|(32,[2,4,5,6,7,8,...|14.139191553579556|       14|
|(32,[2,5,6,7,8,9,...| 9.334267264535855|        9|
|(32,[2,4,5,6,7,8,...|11.441449616039977|       11|
|[0.0,0.0,1.0,0.0,...|  16.0856559485407|       16|
|(32,[2,4,5,6,7,8,...|  15.6423285283139|       15|
|[0.0,0.0,1.0,0.0,...|10.585249122132774|       12|
|[0.0,0.0,1.0,0.0,...|13.836816150581921|       13|
|[0.0,0.0,1.0,0.0,...|12.215546702899967|       15|
|(32,[2,4,5,6,7,8,...|12.044995881019481|       14|
|[0.0,0.0,1.0,0.0,...|14.917163456141093|       17|
|[0.0,0.0,1.0,0.0,...|12.429675508592256|       13|
|[0.0,0.0,1.0,0.0,...| 16.81304403947036|       17|
|[0.0,0.0,1.

### Examine the Predicted and Actual Values
You can plot the predicted values against the actual values to see how accurately the model has predicted. In a perfect model, the resulting scatter plot should form a perfect diagonal line with each predicted value being identical to the actual value - in practice, some variance is to be expected.
Run the cells below to create a temporary table from the **predicted** DataFrame and then retrieve the predicted and actual label values using SQL. You can then display the results as a scatter plot, specifying **-** as the function to show the unaggregated values.

In [56]:
predicted.createOrReplaceTempView("regressionPredictions")

In [57]:
%%sql
SELECT trueLabel, prediction FROM regressionPredictions

### Retrieve the Root Mean Square Error (RMSE)
There are a number of metrics used to measure the variance between predicted and actual values. Of these, the root mean square error (RMSE) is a commonly used value that is measured in the same units as the prediced and actual values - so in this case, the RMSE indicates the average difference between predicted and actual scores obtained in the final exams. You can use the **RegressionEvaluator** class to retrieve the RMSE

In [58]:
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print "Root Mean Square Error (RMSE):", rmse

Root Mean Square Error (RMSE): 1.1372543448