In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("MapsaBigData").getOrCreate()

In [3]:
spark

In [4]:
filePath = """./databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)
airbnbDFFilterd = airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price")

In [5]:
trainDF, testDF = airbnbDFFilterd.randomSplit([.8, .2], seed=42)

In [6]:
testDF.count()

1366

In [7]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [8]:
trainDF

neighbourhood_cleansed,room_type,bedrooms,bathrooms,number_of_reviews,price
Bayview,Entire home/apt,0.0,1.0,2.0,99.0
Bayview,Entire home/apt,0.0,1.0,3.0,60.0
Bayview,Entire home/apt,0.0,1.0,20.0,100.0
Bayview,Entire home/apt,0.0,1.0,39.0,110.0
Bayview,Entire home/apt,0.0,1.0,95.0,149.0
Bayview,Entire home/apt,1.0,1.0,1.0,250.0
Bayview,Entire home/apt,1.0,1.0,12.0,95.0
Bayview,Entire home/apt,1.0,1.0,13.0,130.0
Bayview,Entire home/apt,1.0,1.0,15.0,109.0
Bayview,Entire home/apt,1.0,1.0,31.0,127.0


In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)

In [11]:
vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     0.0|   [0.0]| 99.0|
|     0.0|   [0.0]| 60.0|
|     0.0|   [0.0]|100.0|
|     0.0|   [0.0]|110.0|
|     0.0|   [0.0]|149.0|
|     1.0|   [1.0]|250.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]|109.0|
|     1.0|   [1.0]|127.0|
+--------+--------+-----+
only showing top 10 rows



In [12]:
from pyspark.ml.regression import LinearRegression

In [13]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

In [14]:
lrModel.intercept

51.7988030413165

In [15]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

In [16]:
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+-----+------------------+
|bedrooms|features|price|        prediction|
+--------+--------+-----+------------------+
|     0.0|   [0.0]|115.0|  51.7988030413165|
|     0.0|   [0.0]|199.0|  51.7988030413165|
|     1.0|   [1.0]| 95.0|173.33189308801815|
|     1.0|   [1.0]| 88.0|173.33189308801815|
|     1.0|   [1.0]| 99.0|173.33189308801815|
|     1.0|   [1.0]| 85.0|173.33189308801815|
|     2.0|   [2.0]|300.0| 294.8649831347198|
|     2.0|   [2.0]|445.0| 294.8649831347198|
|     2.0|   [2.0]|175.0| 294.8649831347198|
|     2.0|   [2.0]|120.0| 294.8649831347198|
+--------+--------+-----+------------------+
only showing top 10 rows



In [17]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]

indexOutputCols = [x + "Index" for x in categoricalCols]

oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [18]:
pipelineTransformer = Pipeline(stages=[stringIndexer, oheEncoder, vecAssembler])

In [19]:
piplineObj = pipelineTransformer.fit(trainDF)

In [20]:
transformedData = piplineObj.transform(trainDF)

In [21]:
transformedData

neighbourhood_cleansed,room_type,bedrooms,bathrooms,number_of_reviews,price,neighbourhood_cleansedIndex,room_typeIndex,neighbourhood_cleansedOHE,room_typeOHE,features
Bayview,Entire home/apt,0.0,1.0,2.0,99.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,38,39]..."
Bayview,Entire home/apt,0.0,1.0,3.0,60.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,38,39]..."
Bayview,Entire home/apt,0.0,1.0,20.0,100.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,38,39]..."
Bayview,Entire home/apt,0.0,1.0,39.0,110.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,38,39]..."
Bayview,Entire home/apt,0.0,1.0,95.0,149.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,38,39]..."
Bayview,Entire home/apt,1.0,1.0,1.0,250.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,37,38,..."
Bayview,Entire home/apt,1.0,1.0,12.0,95.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,37,38,..."
Bayview,Entire home/apt,1.0,1.0,13.0,130.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,37,38,..."
Bayview,Entire home/apt,1.0,1.0,15.0,109.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,37,38,..."
Bayview,Entire home/apt,1.0,1.0,31.0,127.0,15.0,0.0,"(35,[15],[1.0])","(2,[0],[1.0])","(40,[15,35,37,38,..."


In [22]:
transformedData.select("room_typeIndex").distinct().count()

3

In [23]:
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

In [24]:
pipelineModel = pipeline.fit(trainDF)

In [25]:
predDF = pipelineModel.transform(testDF)

In [29]:
predDF.select("features").show(truncate=False)

+---------------------------------------------+
|features                                     |
+---------------------------------------------+
|(40,[15,35,38,39],[1.0,1.0,1.0,19.0])        |
|(40,[15,35,38,39],[1.0,1.0,1.0,109.0])       |
|(40,[15,35,37,38,39],[1.0,1.0,1.0,1.0,5.0])  |
|(40,[15,35,37,38,39],[1.0,1.0,1.0,1.0,35.0]) |
|(40,[15,35,37,38,39],[1.0,1.0,1.0,1.0,87.0]) |
|(40,[15,35,37,38,39],[1.0,1.0,1.0,1.0,128.0])|
|(40,[15,35,37,38,39],[1.0,1.0,2.0,1.0,3.0])  |
|(40,[15,35,37,38,39],[1.0,1.0,2.0,1.0,59.0]) |
|(40,[15,35,37,38,39],[1.0,1.0,2.0,2.0,9.0])  |
|(40,[15,35,37,38,39],[1.0,1.0,2.0,2.0,75.0]) |
|(40,[15,35,37,38,39],[1.0,1.0,2.0,2.5,1.0])  |
|(40,[15,35,37,38],[1.0,1.0,3.0,1.0])         |
|(40,[15,35,37,38],[1.0,1.0,3.0,1.0])         |
|(40,[15,35,37,38,39],[1.0,1.0,3.0,1.0,9.0])  |
|(40,[15,35,37,38,39],[1.0,1.0,3.0,3.0,1.0])  |
|(40,[15,36,38,39],[1.0,1.0,1.0,2.0])         |
|(40,[15,36,37,38],[1.0,1.0,1.0,1.0])         |
|(40,[15,36,37,38,39],[1.0,1.0,1.0,1.0,7

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

In [34]:
regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                          labelCol="price",metricName="r2")

In [35]:
regressionEvaluator.evaluate(predDF)

0.2923250512096982