In [6]:
#import findspark and initiate
import findspark
findspark.init()

In [7]:
#create SparkSession using pyspark configuration
import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf()
spark = SparkSession.builder.appName("project").config(conf = conf).getOrCreate()
spark
sc=spark.sparkContext

In [8]:
#read in merged dataset
df=spark.read.parquet("hdfs://ip-172-31-74-188.ec2.internal:8020/user/hadoop/df")

In [9]:
#use transformers and encoders to perform feature engineering

#import packages
from pyspark.ml.feature import VectorAssembler

#string indices have already been created for calculating correlations
#create vectorizer that makes one column based on all predictor variables
vectorAssembler_features = VectorAssembler(
    inputCols=["authorindex", 
               "subredditindex", 
               "parentindex", 
               "controversiality",
              "timeofday"], 
    outputCol="features")

In [10]:
#split data into training and testing
split_data = df.randomSplit([0.7, 0.3])
train_data = split_data[0]
test_data = split_data[1]

In [11]:
#build linear regression model and train using pipeline

#import packages
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

#define logistic regression model
mod = LinearRegression(labelCol="score", featuresCol="features")

#set up pipeline
pipeline_mod = Pipeline(stages=[vectorAssembler_features,
                                mod])

#train model using pipeline
modelfit = pipeline_mod.fit(train_data)

In [12]:
#make predictions for testing data using model
prediction=modelfit.transform(test_data)

In [13]:
#evaluate performance of model on testing data
selected = prediction.select("score", "prediction")
selected.show()

+-----+------------------+
|score|        prediction|
+-----+------------------+
|    1|0.9458396412692883|
|    1| 0.956243936734744|
|    1|0.9664326674986342|
|    1|0.9784053967623598|
|    1|0.8903783219941612|
|    1| 1.001273574083964|
|    1| 1.017993737946242|
|    1|1.0210840676303992|
|    1|1.0302469285501399|
|    1|1.0544763892063593|
|    1|1.0606832907839443|
|    1| 1.085401399520182|
|    3|1.0844369223066959|
|    1| 1.111419497755214|
|    1|1.1087807907669842|
|    1|1.1523305298585649|
|    1|1.1442318968277834|
|    1| 1.151942101194694|
|   -2|1.0687774363646336|
|    1|1.1729950655630355|
+-----+------------------+
only showing top 20 rows



In [14]:
#convert to double and rdd
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
selected=selected.withColumn('score', col('score').cast(DoubleType()))
selectedrdd=selected.rdd

In [15]:
from pyspark.mllib.evaluation import RegressionMetrics
#create metrics object
metrics = RegressionMetrics(selectedrdd)

#print squared errors
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)

# printR-squared
print("R-squared = %s" % metrics.r2)

# print explained variance
print("Explained variance = %s" % metrics.explainedVariance)

MSE = 26.829615697666057
RMSE = 5.179731237976162
R-squared = -423.3891187328598
Explained variance = 26.892454511574584
