|Variable     |Description |
|:------------|:----------:|
|id           |A notation for a house           |
|date         |Date house was sold          |
|price        |Price is prediction target           |
|bedrooms     |Number of bedrooms           |
|bathrooms    |Number of bathrooms           |
|sqft_living  |Square footage of the home           |
|sqft_lot     |Square footage of the lot           |
|floors       |Total floors (levels) in house           |
|waterfront   |House which has a view to a waterfront           |
|view         |Has been viewed           |
|condition    |How good the condition is overall           |
|grade        |overall grade given to the housing unit, based on King County grading system           |
|sqft_above   |Square footage of house apart from basement           |
|sqft_basement|Square footage of the basement           |
|yr_built     |Built Year           |
|yr_renovated |Year when house was renovated           |
|zipcode      |Zip code           |
|lat          |Latitude coordinate           |
|long         |Longitude coordinate           |
|sqft_living15|Living room area in 2015(implies-- some renovations) This might or might not have affected the lotsize area|
|sqft_lot15   |LotSize area in 2015(implies-- some renovations)           |

## Data Load

In [1]:
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when

# Creating Spark Session
spark = SparkSession.builder.appName("LinearRegression_with_HouseData").getOrCreate()
sc = spark.sparkContext

gcs_bucket='dataproc-staging-us-central1-1012630253058-dtwj7n8q/notebooks/jupyter/'
data_file = "gs://" + gcs_bucket + "kc_house_data.csv"

# reading csv
house_df = spark.read.option("header", True).csv(data_file) 

# changing date rows 20141013T000000 ---> 20141013
house_df = house_df.withColumn("date"
                        ,when(house_df.date.endswith("T000000"), regexp_replace(house_df.date,"T000000","")))

# changing data types required
house_df = house_df.withColumn("id", col("id").cast(IntegerType())) \
                   .withColumn("price", col("price").cast(FloatType())) \
                   .withColumn("bedrooms", col("bedrooms").cast(IntegerType())) \
                   .withColumn("bathrooms", col("bathrooms").cast(FloatType())) \
                   .withColumn("sqft_living", col("sqft_living").cast(IntegerType())) \
                   .withColumn("sqft_lot", col("sqft_lot").cast(IntegerType())) \
                   .withColumn("floors", col("floors").cast(FloatType())) \
                   .withColumn("waterfront", col("waterfront").cast(IntegerType())) \
                   .withColumn("view", col("view").cast(IntegerType())) \
                   .withColumn("condition", col("condition").cast(IntegerType())) \
                   .withColumn("grade", col("grade").cast(IntegerType())) \
                   .withColumn("sqft_above", col("sqft_above").cast(IntegerType())) \
                   .withColumn("sqft_basement", col("sqft_basement").cast(IntegerType())) \
                   .withColumn("yr_built", col("yr_built").cast(DateType())) \
                   .withColumn("yr_renovated", col("yr_renovated").cast(DateType())) \
                   .withColumn("zipcode", col("zipcode").cast(IntegerType())) \
                   .withColumn("lat", col("lat").cast(FloatType())) \
                   .withColumn("long", col("long").cast(FloatType())) \
                   .withColumn("sqft_living15", col("sqft_living15").cast(IntegerType())) \
                   .withColumn("sqft_lot15", col("sqft_lot15").cast(IntegerType()))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/07 16:14:25 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/02/07 16:14:25 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/02/07 16:14:25 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/02/07 16:14:25 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
                                                                                

In [2]:
sc.setLogLevel("ERROR")

## Data exploration

In [3]:
columns_for_lr = ["floors", "waterfront","lat" ,"bedrooms" ,"sqft_basement" ,"view" ,"bathrooms","sqft_living15","sqft_above","grade","sqft_living", "price"]
print(house_df.printSchema())
print(house_df.select(columns_for_lr).show(5, True))

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- price: float (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: float (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: float (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: date (nullable = true)
 |-- yr_renovated: date (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)

None
+------+----------+-------+--------+-------------+----+---------+-------------+----------+-----+-----------+--------+
|floors|waterfront|    lat|bedroom

In [4]:
house_df[columns_for_lr].describe().toPandas().T

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
floors,21613,1.4943089807060566,0.5399888951423489,1.0,3.5
waterfront,21613,0.007541757275713691,0.08651719772788748,0,1
lat,21613,47.560052521439594,0.13856371085848315,47.1559,47.7776
bedrooms,21613,3.37084162309721,0.930061831147451,0,33
sqft_basement,21613,291.5090454818859,442.57504267746685,0,4820
view,21613,0.23430342849211122,0.7663175692736114,0,4
bathrooms,21613,2.1147573219821405,0.770163157217741,0.0,8.0
sqft_living15,21613,1986.552491556008,685.3913042527788,399,6210
sqft_above,21613,1788.3906907879516,828.0909776519175,290,9410


In [5]:
import six

for i in house_df[columns_for_lr].columns:
    if not( isinstance(house_df.select(i).take(1)[0][0], six.string_types)):
        print("Correlation to price for", i, house_df.stat.corr('price',i))

Correlation to price for floors 0.25679388755071897


[Stage 8:>                                                          (0 + 1) / 1]                                                                                

Correlation to price for waterfront 0.26636943403060204
Correlation to price for lat 0.307003482637723
Correlation to price for bedrooms 0.30834959814563934
Correlation to price for sqft_basement 0.32381602071198434
Correlation to price for view 0.39729348829450273
Correlation to price for bathrooms 0.5251375054139628
Correlation to price for sqft_living15 0.5853789035795692
Correlation to price for sqft_above 0.6055672983560784
Correlation to price for grade 0.6674342560202353
Correlation to price for sqft_living 0.7020350546118005
Correlation to price for price 1.0


### Preparing data from Linear Regression

In [6]:
from pyspark.ml.feature import VectorAssembler

# Preparing to Linear Regression
features = ["floors", "waterfront","lat" ,"bedrooms" ,"sqft_basement" ,"view" ,"bathrooms","sqft_living15","sqft_above","grade","sqft_living"] 
vectorAssembler = VectorAssembler(inputCols = features, outputCol = 'Features')
vhouse_df = vectorAssembler.transform(house_df)
vhouse_df = vhouse_df.select(['Features', 'price'])
print("\nAfter Vector Assembling")
vhouse_df.show(5)


After Vector Assembling


[Stage 29:>                                                         (0 + 1) / 1]

+--------------------+--------+
|            Features|   price|
+--------------------+--------+
|[1.0,0.0,47.51119...|221900.0|
|[2.0,0.0,47.72100...|538000.0|
|[1.0,0.0,47.73789...|180000.0|
|[1.0,0.0,47.52080...|604000.0|
|[1.0,0.0,47.61679...|510000.0|
+--------------------+--------+
only showing top 5 rows



                                                                                

## Traint Test Split

In [7]:
# Training, Test Split
(trainingData, testData) = vhouse_df.randomSplit([0.7, 0.3])

print("Training data")
print(trainingData.show(5))

print("\nTest data")
print(testData.show(5))

Training data


                                                                                

+--------------------+---------+
|            Features|    price|
+--------------------+---------+
|(11,[0,2,7,8,9,10...| 139950.0|
|(11,[0,2,7,8,9,10...| 235000.0|
|(11,[0,2,7,8,9,10...|1295650.0|
|[1.0,0.0,47.17639...| 245000.0|
|[1.0,0.0,47.17750...| 360000.0|
+--------------------+---------+
only showing top 5 rows

None

Test data
+--------------------+--------+
|            Features|   price|
+--------------------+--------+
|(11,[0,2,7,8,9,10...|142000.0|
|(11,[0,2,7,8,9,10...|355000.0|
|[1.0,0.0,47.16469...|335000.0|
|[1.0,0.0,47.18030...|265000.0|
|[1.0,0.0,47.18080...|350000.0|
+--------------------+--------+
only showing top 5 rows

None


[Stage 31:>                                                         (0 + 1) / 1]                                                                                

In [8]:
print("Total trainingData instance :", trainingData.count(),"\nTotal testData instance :", testData.count())

[Stage 35:>                                                         (0 + 1) / 1]

Total trainingData instance : 15003 
Total testData instance : 6610


                                                                                

# Linear Regression

In [9]:
from pyspark.ml.regression import LinearRegression

print("*** Linear Regression ***")
lr = LinearRegression(featuresCol = 'Features', labelCol='price', maxIter = 20, regParam = 0.3, elasticNetParam = 0.8)
lr_model = lr.fit(trainingData)
print("Coefficients: " + str(lr_model.coefficients))
print("\nIntercept: " + str(lr_model.intercept))

*** Linear Regression ***


[Stage 39:>                                                         (0 + 1) / 1]

Coefficients: [-32456.631336669576,645081.1259244545,672619.7540772442,-27897.911058289792,89.11311510769912,66814.30583828094,-359.5490547669412,4.158007130600411,96.40363220483924,81804.97166813583,100.90403059634717]

Intercept: -32369695.408281278


                                                                                

### Model Summary

In [10]:
trainingSummary = lr_model.summary
print("Model Summary for Linear Regression")
print("-----------------------------------")
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Model Summary for Linear Regression
-----------------------------------
RMSE: 217814.722242
r2: 0.655311


### Prediction

In [11]:
print("Prediction for Linear Regression Model")
print("--------------------------------------")
lr_predictions = lr_model.transform(testData)
lr_predictions.select("features","price","prediction").show(5)

from pyspark.ml.evaluation import RegressionEvaluator

# Evaluator for test data
lr_evaluator = RegressionEvaluator(labelCol="price",
                                   predictionCol="prediction",
                                   metricName="r2")

# R2 score for test data
r2 = lr_evaluator.evaluate(lr_predictions)
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))


Prediction for Linear Regression Model
--------------------------------------


[Stage 40:>                                                         (0 + 1) / 1]                                                                                

+--------------------+--------+------------------+
|            features|   price|        prediction|
+--------------------+--------+------------------+
|(11,[0,2,7,8,9,10...|142000.0|-286236.9574088119|
|(11,[0,2,7,8,9,10...|355000.0| 604252.4438825101|
|[1.0,0.0,47.16469...|335000.0| 189101.6817632541|
|[1.0,0.0,47.18030...|265000.0|188186.58300402015|
|[1.0,0.0,47.18080...|350000.0|310105.72187368944|
+--------------------+--------+------------------+
only showing top 5 rows



[Stage 41:>                                                         (0 + 1) / 1]                                                                                

R Squared (R2) on test data = 0.662859


# Gradient-boosted Tree Regression

In [12]:
from pyspark.ml.regression import GBTRegressor

# Gradient-boosted Tree Regression
gbt = GBTRegressor(featuresCol = 'Features', labelCol = 'price', maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_predictions = gbt_model.transform(testData)
print("\n*** Gradient-boosted Tree Regression *** ")
gbt_predictions.select('prediction', 'price', 'features').show(5)

                                                                                


*** Gradient-boosted Tree Regression *** 
+------------------+--------+--------------------+
|        prediction|   price|            features|
+------------------+--------+--------------------+
|234249.21852428862|142000.0|(11,[0,2,7,8,9,10...|
|342141.02785188414|355000.0|(11,[0,2,7,8,9,10...|
|321822.92960840295|335000.0|[1.0,0.0,47.16469...|
|  276647.788726951|265000.0|[1.0,0.0,47.18030...|
|279021.53497661534|350000.0|[1.0,0.0,47.18080...|
+------------------+--------+--------------------+
only showing top 5 rows



In [13]:
# Gradient-boosted Tree Regression Evaluator
gbt_evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")
r2 = gbt_evaluator.evaluate(gbt_predictions)
print("\n*** Gradient-boosted Tree Regression - R Squared (R2) Score *** ")
print("---------------------------------------------------------------")
print("R Squared (R2) on test data = %g" % r2)


*** Gradient-boosted Tree Regression - R Squared (R2) Score *** 
---------------------------------------------------------------
R Squared (R2) on test data = 0.759249


[Stage 148:>                                                        (0 + 1) / 1]                                                                                