In [24]:
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
#init SparkSession
spark = SparkSession.builder.appName("Airbnb analysis").getOrCreate()

24/08/20 08:17:26 WARN Utils: Your hostname, Zipcoders-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.3.169 instead (on interface en0)
24/08/20 08:17:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/20 08:17:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
filepath = "/Users/qian/Desktop/Zipcode/Python/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"
airbnbDF = spark.read.parquet(filepath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms","number_of_reviews", "price").show(5)

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



In [5]:
#Creat Train and test dataset
#Ultimate Question of Life - 42
trainDF, testDF= airbnbDF.randomSplit([0.8,0.2],seed = 42)
print(f"There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set")

24/08/20 08:17:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


There are 5780 rows in the training set, and 1366 in the test set


In [6]:
from pyspark.ml.feature import VectorAssembler

In [7]:
# Transform, return a new DF
vecAssembler = VectorAssembler(inputCols=['bedrooms'], outputCol = 'features')
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select('bedrooms','features','price').show(10)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
|     1.0|   [1.0]|115.0|
|     1.0|   [1.0]|105.0|
|     1.0|   [1.0]| 86.0|
|     1.0|   [1.0]|100.0|
|     2.0|   [2.0]|220.0|
+--------+--------+-----+
only showing top 10 rows



In [8]:
# Linear Regression
from pyspark.ml.regression import LinearRegression

In [9]:
lr = LinearRegression(featuresCol='features',labelCol='price')
lrModel = lr.fit(vecTrainDF)

24/08/20 08:17:37 WARN Instrumentation: [3b28c95d] regParam is zero, which might cause numerical instability and overfitting.
24/08/20 08:17:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/20 08:17:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/08/20 08:17:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [10]:
m = round (lrModel.coefficients[0],2)
b = round (lrModel.intercept,2)
print(f"The formula for the linear regression line is price = {m}*bedrooms + {b}")

The formula for the linear regression line is price = 123.68*bedrooms + 47.51


In [11]:
# creating pipeline, using Pipeline API
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler,lr])
pipelineModel = pipeline.fit(trainDF)

24/08/20 08:17:39 WARN Instrumentation: [c2e7f981] regParam is zero, which might cause numerical instability and overfitting.
24/08/20 08:17:39 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [12]:
predDF = pipelineModel.transform(testDF)
predDF.select('bedrooms','features','price','prediction').show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  85.0|171.18598011578285|
|     1.0|   [1.0]|  45.0|171.18598011578285|
|     1.0|   [1.0]|  70.0|171.18598011578285|
|     1.0|   [1.0]| 128.0|171.18598011578285|
|     1.0|   [1.0]| 159.0|171.18598011578285|
|     2.0|   [2.0]| 250.0|294.86172649777757|
|     1.0|   [1.0]|  99.0|171.18598011578285|
|     1.0|   [1.0]|  95.0|171.18598011578285|
|     1.0|   [1.0]| 100.0|171.18598011578285|
|     1.0|   [1.0]|2010.0|171.18598011578285|
+--------+--------+------+------------------+
only showing top 10 rows



In [13]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [14]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == 'string']
indexOutputCols = [x + 'Index' for x in categoricalCols]
oheOutputCols = [x + 'OHE' for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=indexOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType=='double') & (field != 'price'))]
assemblerInputs = oheOutputCols+numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol = 'features')
                                                                 

In [15]:
from pyspark.ml.feature import RFormula
rFormula = RFormula(formula = 'price ~ .',featuresCol='features',labelCol='price',handleInvalid='skip')

In [16]:
lr = LinearRegression(labelCol="price", featuresCol="features")
#pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

# Or use RFormula
pipeline = Pipeline(stages = [rFormula, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)

24/08/20 08:17:44 WARN Instrumentation: [8684e943] regParam is zero, which might cause numerical instability and overfitting.
24/08/20 08:17:44 WARN Instrumentation: [8684e943] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,7,23,4...| 85.0| 55.30094763354373|
|(98,[0,3,6,7,23,4...| 45.0| 22.70940291742818|
|(98,[0,3,6,7,23,4...| 70.0|27.182906571761578|
|(98,[0,3,6,7,13,4...|128.0|-91.90969569190747|
|(98,[0,3,6,7,13,4...|159.0| 94.54162775821169|
+--------------------+-----+------------------+
only showing top 5 rows



In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

RMSE is 220.6


In [18]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")

R2 is 0.15985154393435386


In [19]:
# Saving the Models
pipelinePath = '/tmp/lr-pipeline-model'
pipelineModel.write().overwrite().save(pipelinePath)

In [20]:
# Loading the Models
from pyspark.ml import PipelineModel
savedPipelineModel = PipelineModel.load(pipelinePath)

                                                                                

In [21]:
# Tree-Based Models
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol='price')

# Filter for just numeric columns (and exclude price, our label)
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

# Combine output of StringIndexer defined above and numeric columns
assemblerInputs = indexOutputCols + numericCols 
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Combine stages into pipeline
stages = [stringIndexer, vecAssembler, dt]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(trainDF)

24/08/20 08:17:51 ERROR Instrumentation: java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 3 has 36 values. Consider removing this and other categorical features with a large number of values, or add more training examples.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:151)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:274)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.$anonfun$train$1(DecisionTreeRegressor.scala:135)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegr

IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 3 has 36 values. Consider removing this and other categorical features with a large number of values, or add more training examples.

In [22]:
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF)

In [23]:
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_aa29a145f806, depth=5, numNodes=47, numFeatures=33
  If (feature 12 <= 2.5)
   If (feature 12 <= 1.5)
    If (feature 5 in {1.0,2.0})
     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 104.23992784125075
      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 250.7111111111111
     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
       Predict: 151.94179894179894
      Else (feat

In [25]:
featureImp =pd.DataFrame(list(zip(vecAssembler.getInputCols(), dtModel.featureImportances)),
                         columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
12,bedrooms,0.283406
1,cancellation_policyIndex,0.167893
2,instant_bookableIndex,0.140081
4,property_typeIndex,0.128179
15,number_of_reviews,0.126233
3,neighbourhood_cleansedIndex,0.0562
9,longitude,0.03881
14,minimum_nights,0.029473
13,beds,0.015218
5,room_typeIndex,0.010905


In [26]:
# Random forests
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)

In [28]:
# K-Fold Cross-Validation
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

In [29]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 4, 6]).addGrid(rf.numTrees, [10, 100]).build())

In [30]:
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

In [31]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3, seed=42)
cvModel = cv.fit(trainDF)

24/08/20 09:39:22 WARN DAGScheduler: Broadcasting large task binary with size 1324.3 KiB
24/08/20 09:39:27 WARN DAGScheduler: Broadcasting large task binary with size 1157.9 KiB
24/08/20 09:39:33 WARN DAGScheduler: Broadcasting large task binary with size 1198.7 KiB
24/08/20 09:39:35 WARN DAGScheduler: Broadcasting large task binary with size 1224.9 KiB


In [32]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

[({Param(parent='RandomForestRegressor_4b103387ada4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_4b103387ada4', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  291.1822640924783),
 ({Param(parent='RandomForestRegressor_4b103387ada4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestRegressor_4b103387ada4', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  286.7714750274078),
 ({Param(parent='RandomForestRegressor_4b103387ada4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4,
   Param(parent='RandomForestRegressor_4b103387ada4', name

In [34]:
cvModel = cv.setParallelism(4).fit(trainDF)
cvModel

24/08/20 09:45:27 WARN BlockManager: Block rdd_2253_0 already exists on this machine; not re-adding it
24/08/20 09:45:29 WARN DAGScheduler: Broadcasting large task binary with size 1324.3 KiB
24/08/20 09:45:32 WARN DAGScheduler: Broadcasting large task binary with size 1157.9 KiB
24/08/20 09:45:34 WARN DAGScheduler: Broadcasting large task binary with size 1198.7 KiB
24/08/20 09:45:37 WARN DAGScheduler: Broadcasting large task binary with size 1224.9 KiB


CrossValidatorModel_a35e1cac217c

In [36]:
cv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3, parallelism=4, seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)

24/08/20 09:46:25 WARN DAGScheduler: Broadcasting large task binary with size 1323.7 KiB
24/08/20 09:46:28 WARN DAGScheduler: Broadcasting large task binary with size 1160.5 KiB
24/08/20 09:46:30 WARN DAGScheduler: Broadcasting large task binary with size 1198.2 KiB
24/08/20 09:46:32 WARN DAGScheduler: Broadcasting large task binary with size 1224.9 KiB
