In [1]:
#Code Snippet 26
#Step 1 - Importing the data and essential libraries 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SingleVariableLinearReg').getOrCreate()
from pyspark.ml.regression import LinearRegression
data = spark.read.csv('single_variable_regression.csv',header=True,inferSchema=True)
print("Initial Data")
data.show(3)
#importing the VectorAssembler to convert the features into spark accepted format
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
#Step 2 - Data pre-processing and converting the data to spark accepted format
#converting the feature(s) into spark accepted data format
assembler_object = VectorAssembler(inputCols=['house_size'],outputCol='house_size_feature')
feature_vector_dataframe = assembler_object.transform(data)
print("Data after adding house_size column as a spark accepted feature")
feature_vector_dataframe.show(2)
feature_vector_dataframe.printSchema()
formatted_data = feature_vector_dataframe.select('house_size_feature','price_sold')
print("Consolidated Data with accepted features and labels")
formatted_data.show(3)
#Step 3 - Training our Linear Regression model with single variable
# Splitting the data into 70 and 30 percent
train_data, test_data = formatted_data.randomSplit([0.7,0.3]) 
#Defining our Linear Regression
lireg = LinearRegression(featuresCol='house_size_feature',labelCol='price_sold')
#Training our model with training data
lireg_model = lireg.fit(train_data)
#Step 4 - Evaluating of Trained Model
#Evaluating our model with testing data
test_results = lireg_model.evaluate(test_data)
print("Residuals info - distance between data points and fitted regression line")
test_results.residuals.show(4)
print("Root Mean Square Error {}".format(test_results.rootMeanSquaredError))
print("R square value {}".format(test_results.r2))
#Step 5 - Performing Predictions with novel data
#Creating unlabeled data from test data by removing the label in order to get predictions
unlabeled_data =  test_data.select('house_size_feature')
predictions = lireg_model.transform(unlabeled_data)
print("\nPredictions for Novel Data")
predictions.show(4)
#Checking our model with new value manually
house_size_coeff=lireg_model.coefficients[0]
intercept = lireg_model.intercept
print("Coeffecient is {}".format(house_size_coeff))
print("Intercept is {}".format(intercept))
new_house_size = 950
#Mimicking the hypothesis function to get a prediction
price = (intercept) + (house_size_coeff)*new_house_size
print("\nPredicted house price for house size {} is {}".format(new_house_size,price))

Initial Data
+----------+----------+
|house_size|price_sold|
+----------+----------+
|      1490|        60|
|      2500|        95|
|      1200|        55|
+----------+----------+
only showing top 3 rows

Data after adding house_size column as a spark accepted feature
+----------+----------+------------------+
|house_size|price_sold|house_size_feature|
+----------+----------+------------------+
|      1490|        60|          [1490.0]|
|      2500|        95|          [2500.0]|
+----------+----------+------------------+
only showing top 2 rows

root
 |-- house_size: integer (nullable = true)
 |-- price_sold: integer (nullable = true)
 |-- house_size_feature: vector (nullable = true)

Consolidated Data with accepted features and labels
+------------------+----------+
|house_size_feature|price_sold|
+------------------+----------+
|          [1490.0]|        60|
|          [2500.0]|        95|
|          [1200.0]|        55|
+------------------+----------+
only showing top 3 rows

Resi