In [1]:
# Import packages
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, Row
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import StringIndexer, VectorIndexer

In [2]:
# sprak session
spark = SparkSession\
    .builder\
    .appName("Dating_Regression")\
    .getOrCreate()

In [3]:
# Load data
data = spark.read.csv(
    "o_pair.csv", header=True, mode="DROPMALFORMED", inferSchema=True
)

In [4]:
# Transform data into (features: vectors, label: float) format
import pyspark.sql.functions as psf
from pyspark.ml.feature import VectorAssembler
df = spark.read.csv("o_pair.csv")
droplist = ["iid", "pid", "match"]

In [5]:
df0 = df.select([column for column in df.columns if column not in droplist])
df1 = df0.select([psf.regexp_replace(c, '[\]\[]', '').cast("float").alias(c) for c in df0.columns])
va = VectorAssembler(inputCols=df1.columns, outputCol="features")
df2 = va.transform(df1.na.drop())

In [6]:
data = df2.withColumn('label', df1._c76)

In [7]:
data

DataFrame[_c0: float, _c1: float, _c2: float, _c3: float, _c4: float, _c5: float, _c6: float, _c7: float, _c8: float, _c9: float, _c10: float, _c11: float, _c12: float, _c13: float, _c14: float, _c15: float, _c16: float, _c17: float, _c18: float, _c19: float, _c20: float, _c21: float, _c22: float, _c23: float, _c24: float, _c25: float, _c26: float, _c27: float, _c28: float, _c29: float, _c30: float, _c31: float, _c32: float, _c33: float, _c34: float, _c35: float, _c36: float, _c37: float, _c38: float, _c39: float, _c40: float, _c41: float, _c42: float, _c43: float, _c44: float, _c45: float, _c46: float, _c47: float, _c48: float, _c49: float, _c50: float, _c51: float, _c52: float, _c53: float, _c54: float, _c55: float, _c56: float, _c57: float, _c58: float, _c59: float, _c60: float, _c61: float, _c62: float, _c63: float, _c64: float, _c65: float, _c66: float, _c67: float, _c68: float, _c69: float, _c70: float, _c71: float, _c72: float, _c73: float, _c74: float, _c75: float, _c76: float,

In [8]:
# Identify and index categorical features
# Specify maxCategories s.t. features with > 4 distinct values are treated as continuous
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df2)

# Split the data into training and test sets (9:1)
(trainingData, testData) = data.randomSplit([0.9, 0.1])

In [9]:
# ===== Random Forest Regression Model ======

In [10]:
# RandomForest Regressor
rf = RandomForestRegressor(featuresCol="indexedFeatures")

In [11]:
# Chain indexer and forest in a Pipeline
rfpipeline = Pipeline(stages=[featureIndexer, rf])

In [12]:
# Train model
rfmodel = rfpipeline.fit(trainingData)

In [13]:
# Make prediction
predictions = rfmodel.transform(testData)

In [14]:
predictions.select("prediction", "label", "features").show(10)

+--------------------+-----+--------------------+
|          prediction|label|            features|
+--------------------+-----+--------------------+
|0.008485401459854014|  0.0|[2.0,15.0,0.0,1.0...|
|                 0.0|  0.0|[4.0,10.0,0.0,1.0...|
|              0.9625|  1.0|[4.0,13.0,0.0,1.0...|
|  0.0053475935828877|  0.0|[6.0,12.0,0.0,1.0...|
| 0.07604150904930145|  0.0|[6.0,14.0,0.0,1.0...|
|0.013832995042741714|  0.0|[6.0,17.0,0.0,1.0...|
|  0.9553475935828878|  1.0|[7.0,12.0,0.0,1.0...|
|  0.0053475935828877|  0.0|[8.0,10.0,0.0,1.0...|
|                 0.0|  0.0|[9.0,16.0,0.0,1.0...|
|                 1.0|  1.0|[13.0,5.0,1.0,1.0...|
+--------------------+-----+--------------------+
only showing top 10 rows



In [15]:
# Evaluation: test RMSE
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Test RMSE = %g" % rmse)

rfModel = rfmodel.stages[1]
print(rfModel)

Test RMSE = 0.0480786
RandomForestRegressionModel (uid=RandomForestRegressor_4c0fa13a703143eb543d) with 20 trees


In [16]:
# =========== GBT Regression model ===============

In [17]:
# GBT model
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

In [18]:
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

In [19]:
# Train model
model = pipeline.fit(trainingData)

In [20]:
# Make prediction
predictions = model.transform(testData)

In [21]:
predictions.select("prediction", "label", "features").show(10)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[2.0,15.0,0.0,1.0...|
|       0.0|  0.0|[4.0,10.0,0.0,1.0...|
|       1.0|  1.0|[4.0,13.0,0.0,1.0...|
|       0.0|  0.0|[6.0,12.0,0.0,1.0...|
|       0.0|  0.0|[6.0,14.0,0.0,1.0...|
|       0.0|  0.0|[6.0,17.0,0.0,1.0...|
|       1.0|  1.0|[7.0,12.0,0.0,1.0...|
|       0.0|  0.0|[8.0,10.0,0.0,1.0...|
|       0.0|  0.0|[9.0,16.0,0.0,1.0...|
|       1.0|  1.0|[13.0,5.0,1.0,1.0...|
+----------+-----+--------------------+
only showing top 10 rows



In [22]:
# Evaluation: Test RMSE
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Test RMSE = %g" % rmse)

Test RMSE = 0


In [23]:
gbtModel = model.stages[1]
print(gbtModel)

GBTRegressionModel (uid=GBTRegressor_46ce830367c429113ab4) with 10 trees


In [24]:
# =========== GBT Classifier model ===============

In [25]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

In [26]:
# GBT Classifier
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model
model = pipeline.fit(trainingData)

# Make prediction
predictions = model.transform(testData)

In [27]:
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|[2.0,15.0,0.0,1.0...|
|       0.0|         0.0|[4.0,10.0,0.0,1.0...|
|       1.0|         1.0|[4.0,13.0,0.0,1.0...|
|       0.0|         0.0|[6.0,12.0,0.0,1.0...|
|       0.0|         0.0|[6.0,14.0,0.0,1.0...|
+----------+------------+--------------------+
only showing top 5 rows



In [28]:
# Evaluation: test Acc
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

gbtModel = model.stages[2]
print(gbtModel)

Accuracy = 1
GBTClassificationModel (uid=GBTClassifier_4176a528078d779b55f0) with 10 trees
