In [18]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, RFormula
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import click

In [3]:
spark = SparkSession.builder.appName('app').getOrCreate()

In [5]:
df = spark.read.parquet('./data3.parquet')
df.select('neighbourhood_cleansed', 'room_type', 'bedrooms', 'bathrooms', 'number_of_reviews', 'price').show(5)

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



In [6]:
train, test = df.randomSplit([.8, .2])
train.count(), test.count()

(5663, 1483)

In [7]:
va = VectorAssembler(inputCols=['bedrooms'], outputCol='features')
train_df = va.transform(train)
train_df.select('bedrooms', 'features', 'price').show(5)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 85.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
+--------+--------+-----+
only showing top 5 rows



In [8]:
lr = LinearRegression(featuresCol='features', labelCol='price')
lr_model = lr.fit(train_df)

In [9]:
m = round(lr_model.coefficients[0], 2)
b = round(lr_model.intercept, 2)
print(f"price - {m} * bedrooms + {b}")

price - 116.22 * bedrooms + 54.35


In [10]:
pipeline = Pipeline(stages=[va, lr])
pipeline_model = pipeline.fit(train)

In [11]:
pred_df = pipeline_model.transform(test)
pred_df.select('bedrooms', 'features', 'price', 'prediction').show(10)

+--------+--------+-----+------------------+
|bedrooms|features|price|        prediction|
+--------+--------+-----+------------------+
|     3.0|   [3.0]|250.0|403.00459325657954|
|     1.0|   [1.0]| 45.0|  170.570383059947|
|     1.0|   [1.0]|130.0|  170.570383059947|
|     1.0|   [1.0]|250.0|  170.570383059947|
|     1.0|   [1.0]|100.0|  170.570383059947|
|     2.0|   [2.0]|250.0| 286.7874881582633|
|     1.0|   [1.0]|100.0|  170.570383059947|
|     1.0|   [1.0]|190.0|  170.570383059947|
|     3.0|   [3.0]|300.0|403.00459325657954|
|     1.0|   [1.0]|129.0|  170.570383059947|
+--------+--------+-----+------------------+
only showing top 10 rows



In [12]:
reg_ev = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName='rmse')
rmse = reg_ev.evaluate(pred_df)
print(f"rmse is {rmse:.1f}")

rmse is 384.8


In [13]:
r2 = reg_ev.setMetricName('r2').evaluate(pred_df)
r2

0.09109169299356967

In [14]:
pipeline_model.write().overwrite().save('./saved_pipeline')

In [None]:
from pyspark.ml import PipelineModel
saved = PipelineModel.load('./saved_pipeline')