#### Load Libraries, Data, and Define Evaluation Functions

In [0]:
# SKLearn Metrics
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import median_absolute_error

def get_scores(y_test, predictions):
    RMSE = mean_squared_error(y_test, predictions, squared=False)
    R2 = r2_score(y_test, predictions)

    print('RMSE:    ', int(RMSE))
    print('R2:      ', R2.round(4))

In [0]:
# MLlib Metrics
from pyspark.ml.evaluation import RegressionEvaluator

def mllib_metrics(predictions):
    rmse = RegressionEvaluator(labelCol="Scores", predictionCol="prediction", metricName="rmse")
    rmse = rmse.evaluate(predictions)
    print(rmse)

    r2 = RegressionEvaluator(labelCol="Scores", predictionCol="prediction", metricName="r2")
    r2 = r2.evaluate(predictions)
    print(r2)

In [0]:
# Again, we wanted to use data that we are familiar with from CSE 450.
df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv('dbfs:/FileStore/bitamss_mlib/score.csv')
display(df)

Hours,Scores
2.5,21
5.1,47
3.2,27
8.5,75
3.5,30
1.5,20
9.2,88
5.5,60
8.3,81
2.7,25


#### Data Preprocessing
----

Features: Hours (Spent studying in hours)

Target: Scores (Grade received)

#### Splitting Datasets

[Documentation for Splitting Datasets using MLlib](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html)

##### Scaling, Normalizing, Bucketizing, etc.

[Documentation for Feature Extraction using MLlib](https://spark.apache.org/docs/1.4.1/ml-features.html)

###### MLlib

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

# 80% for training. 20% for testing.
train_data, test_data = df.randomSplit([0.8, 0.2])

###### SKlearn

In [0]:
from sklearn.preprocessing import MinMaxScaler as MinMaxScalerSK
import pandas as pd

train_sk = train_data.toPandas()
test_sk = test_data.toPandas()

sc = MinMaxScalerSK()
train_sk[['Scaled_Hours']] = sc.fit_transform(train_sk[['Hours']])
test_sk[['Scaled_Hours']] = sc.transform(test_sk[['Hours']])

train_sk = pd.DataFrame(train_sk, columns = train_sk.columns)
test_sk = pd.DataFrame(test_sk, columns = test_sk.columns)

x_train_sk = train_sk[['Scaled_Hours']]
y_train_sk = train_sk[['Scores']]

x_test_sk = test_sk[['Scaled_Hours']]
y_test_sk = test_sk[['Scores']]

#### Gradient Boosted Regression

###### SKlearn

In [0]:
from sklearn.ensemble import GradientBoostingRegressor
model_gb = GradientBoostingRegressor()
model_gb.fit(x_train_sk, y_train_sk)
predictions = model_gb.predict(x_test_sk)
get_scores(y_test_sk, predictions)

RMSE:     7
R2:       0.9204
  return f(*args, **kwargs)


###### MLlib

[Documentation on Gradient Boosted Regression Models using MLlib](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression)

In [0]:
from pyspark.ml.regression import GBTRegressor

# Feature Extractions: https://spark.apache.org/docs/1.4.1/ml-features.html

# VectorAssembler Transformation - Converting column to vector type
vec_assembler = VectorAssembler(inputCols=['Hours'], outputCol="Hours_Vect")

# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol="Hours_Vect", outputCol="features")

# Model & Parameters
gbt = GBTRegressor(maxDepth=2, labelCol = 'Scores', featuresCol='features')

pipeline = Pipeline(stages=[vec_assembler, scaler, gbt])
model = pipeline.fit(train_data)
pred = model.transform(test_data)

In [0]:
mllib_metrics(pred)

8.713959962618295
0.8995925973816665


#### Random Forest Regression

###### SKlearn

In [0]:
from sklearn.ensemble import RandomForestRegressor as RandomForestRegressorSK
model_rf = RandomForestRegressorSK()
model_rf.fit(x_train_sk, y_train_sk)
predictions = model_rf.predict(x_test_sk)
get_scores(y_test_sk, predictions)

RMSE:     6
R2:       0.9489
  original_result = original(self, *args, **kwargs)


###### MLlib

[Documentation on Random Forest Regression using MLlib](https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression)

In [0]:
from pyspark.ml.regression import RandomForestRegressor

# Feature Extractions: https://spark.apache.org/docs/1.4.1/ml-features.html

# VectorAssembler Transformation - Converting column to vector type
vec_assembler = VectorAssembler(inputCols=['Hours'], outputCol="Hours_Vect")

# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol="Hours_Vect", outputCol="features")

# Model & Parameters
rf = RandomForestRegressor(numTrees=5, maxDepth=2, labelCol = 'Scores', featuresCol='features')

pipeline_rf = Pipeline(stages=[vec_assembler, scaler, rf])
model_rf = pipeline_rf.fit(train_data)
pred_rf = model_rf.transform(test_data)

In [0]:
mllib_metrics(pred_rf)

+--------------------+------+
|            features|Scores|
+--------------------+------+
|[0.04938271604938...|    20|
|[0.1728395061728395]|    30|
|[0.2962962962962963]|    30|
|[0.3333333333333333]|    35|
|[0.9135802469135803]|    75|
|[0.9629629629629631]|    95|
+--------------------+------+

