In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("ML_Spark").getOrCreate()

In [3]:
spark

In [7]:
filePath = "./sf-airbnb-clean.parquet"
airbnb_df = spark.read.parquet(filePath)
airbnb_df = airbnb_df.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price")

In [8]:
airbnb_df.printSchema()

root
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- price: double (nullable = true)



In [10]:
train_df, test_df, val_df = airbnb_df.randomSplit([0.7, 0.2, 0.1], seed=42)

In [13]:
train_df.count()

5113

In [37]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer

In [41]:
train_df.dtypes

[('neighbourhood_cleansed', 'string'),
 ('room_type', 'string'),
 ('bedrooms', 'double'),
 ('bathrooms', 'double'),
 ('number_of_reviews', 'double'),
 ('price', 'double')]

In [39]:
train_df.select("room_type").distinct().count()

3

In [23]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [60]:
categoricalCols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)
# oheEncoder.fit(train_df).transform(train_df).select("neighbourhood_cleansedIndex").distinct().count()
numericCols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [71]:
pipline_transformer = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler])
pipline_transformer.fit(train_df).transform(train_df).select("features").show(1)

+--------------------+
|            features|
+--------------------+
|(40,[14,35,38,39]...|
+--------------------+
only showing top 1 row



In [72]:
lr_model = LinearRegression(featuresCol="features", labelCol="price")
pipe = Pipeline(stages=[pipline_transformer, lr_model])

In [73]:
model = pipe.fit(train_df)

In [74]:
predicted = model.transform(test_df)

In [75]:
predicted.select("price", "prediction").show()

+-----+------------------+
|price|        prediction|
+-----+------------------+
|115.0| 11.81508168008095|
| 95.0|117.92863664099033|
| 88.0|115.54216032219242|
| 90.0|115.02336112245374|
|189.0|113.67448320313318|
| 99.0|110.14664864491019|
|115.0|109.00529040548508|
| 85.0|105.89249520705303|
| 78.0|102.57218032872552|
|120.0|224.24971128179524|
|300.0|224.24971128179524|
|120.0|223.73091208205656|
|225.0| 222.0707546428928|
|167.0|220.20307752383357|
|445.0|218.43916024472205|
|210.0|208.27069592984398|
|110.0|233.63579013469152|
|185.0|242.60682962779688|
|175.0|  241.776750908215|
|120.0|234.92860147166445|
+-----+------------------+
only showing top 20 rows



In [76]:
from pyspark.ml.evaluation import RegressionEvaluator

In [77]:
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="r2")

In [78]:
regression_evaluator.evaluate(predicted)

0.14878369065869523