In [16]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = (SparkSession
.builder
.appName("Spark_ML_APP")
.enableHiveSupport() # we need this to create tables
.getOrCreate())

22/07/15 18:45:05 WARN Utils: Your hostname, ivo-Nitro-AN515-57 resolves to a loopback address: 127.0.1.1; using 192.168.1.197 instead (on interface wlp0s20f3)
22/07/15 18:45:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/15 18:45:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/07/15 18:45:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
filePath = """../data/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)

airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price").show(5)

trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"""\n\nThere are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set""")

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows

22/07/15 18:45:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


There are 5780 rows in the training s

In [4]:
# we should cache the trainDF dataframe, since we'll be using it often
# this is important, when in the presence of a distributed system
# I'm just using my localhost as server for the data, instead...

# trainDF.cache()

A linear regression model predicting price given the number of bedrooms, "bathrooms", and "number_of_reviews"

In [5]:
vecAssembler = VectorAssembler(inputCols=["bedrooms","bathrooms", 
                                "number_of_reviews"], outputCol="features")
# to do ML modeling like a Linear regression, we need to put our data into
# a single vector
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms","bathrooms", "number_of_reviews", "features", "price").show(10)
#  column should be 'equal' to the features column

+--------+---------+-----------------+---------------+-----+
|bedrooms|bathrooms|number_of_reviews|       features|price|
+--------+---------+-----------------+---------------+-----+
|     1.0|      1.0|              1.0|  [1.0,1.0,1.0]|200.0|
|     1.0|      1.0|             13.0| [1.0,1.0,13.0]|130.0|
|     1.0|      1.0|             12.0| [1.0,1.0,12.0]| 95.0|
|     1.0|      1.0|              1.0|  [1.0,1.0,1.0]|250.0|
|     3.0|      3.0|              0.0|  [3.0,3.0,0.0]|250.0|
|     1.0|      1.0|            100.0|[1.0,1.0,100.0]|115.0|
|     1.0|      1.5|             36.0| [1.0,1.5,36.0]|105.0|
|     1.0|      1.0|            194.0|[1.0,1.0,194.0]| 86.0|
|     1.0|      1.0|              4.0|  [1.0,1.0,4.0]|100.0|
|     2.0|      1.0|              2.0|  [2.0,1.0,2.0]|220.0|
+--------+---------+-----------------+---------------+-----+
only showing top 10 rows



In [6]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

22/07/15 18:45:09 WARN Instrumentation: [7564bcb2] regParam is zero, which might cause numerical instability and overfitting.
22/07/15 18:45:09 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/07/15 18:45:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/07/15 18:45:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/07/15 18:45:09 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [7]:
m_0 = round(lrModel.coefficients[0], 2)
m_1 = round(lrModel.coefficients[1], 2)
m_2 = round(lrModel.coefficients[2], 2)
b = round(lrModel.intercept, 2)
print(f"""The formula for the linear regression line is
price = {m_0}*bedrooms + {m_1} * bathrooms + {m_2} * bathrooms + {b}""")

The formula for the linear regression line is
price = 116.65*bedrooms + 14.65 * bathrooms + -0.25 * bathrooms + 48.22


In [8]:
lrModel.coefficients

DenseVector([116.6457, 14.6487, -0.2452])

In [9]:
lrModel.summary.coefficientStandardErrors

[4.6583813605265805,
 5.377873367940104,
 0.055272275442847144,
 8.974083827855292]

Creating a pipeline in pyspark

In [10]:

pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

22/07/15 18:45:10 WARN Instrumentation: [2437f49f] regParam is zero, which might cause numerical instability and overfitting.


In [11]:
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms","bathrooms", "number_of_reviews","bedrooms", "features", "price", "prediction").show(10)

+--------+---------+-----------------+--------+---------------+------+------------------+
|bedrooms|bathrooms|number_of_reviews|bedrooms|       features| price|        prediction|
+--------+---------+-----------------+--------+---------------+------+------------------+
|     1.0|      1.0|            128.0|     1.0|[1.0,1.0,128.0]|  85.0|148.12966631945827|
|     1.0|      1.0|              0.0|     1.0|  [1.0,1.0,0.0]|  45.0|179.51913339774154|
|     1.0|      1.0|              0.0|     1.0|  [1.0,1.0,0.0]|  70.0|179.51913339774154|
|     1.0|      1.0|              1.0|     1.0|  [1.0,1.0,1.0]| 128.0|179.27390318619246|
|     1.0|      1.0|              3.0|     1.0|  [1.0,1.0,3.0]| 159.0|178.78344276309429|
|     2.0|      1.0|             15.0|     2.0| [2.0,1.0,15.0]| 250.0| 292.4864106206108|
|     1.0|      1.0|              1.0|     1.0|  [1.0,1.0,1.0]|  99.0|179.27390318619246|
|     1.0|      1.0|              0.0|     1.0|  [1.0,1.0,0.0]|  95.0|179.51913339774154|
|     1.0|

Creating dummy variables

In [12]:
categoricalCols = [field for (field, dataType) in trainDF.dtypes
                    if dataType == "string"]
categoricalCols

['host_is_superhost',
 'cancellation_policy',
 'instant_bookable',
 'neighbourhood_cleansed',
 'property_type',
 'room_type',
 'bed_type']

In [13]:
indexOutputCols = [x + "Index" for x in categoricalCols]

oheOutputCols = [x + "OHE" for x in categoricalCols]

# A common approach is to use the StringIndexer and OneHotEncoder. With this approach, the first step is to
# apply the StringIndexer estimator to convert categorical values into category indices. These category indices 
# are ordered by label frequencies, so the most frequent label gets index 0, which provides us with reproducible 
# results across various runs of the same data. Once you have created your category indices, you can pass those
# as input to the OneHotEncoder.
stringIndexer = StringIndexer(inputCols=categoricalCols,
                                outputCols=indexOutputCols,
                                handleInvalid="keep")
# handleInvalid parameter specifies how to handle categories in test set, but not in the train set. The options
# are skip (filter out rows with invalid data), error (throw an error), or keep (put inva‐
# lid data in a special additional bucket, at index numLabels                               

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                            outputCols=oheOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes
                if ((dataType == "double") & (field != "price"))]

assemblerInputs = oheOutputCols + numericCols
assemblerInputs


['host_is_superhostOHE',
 'cancellation_policyOHE',
 'instant_bookableOHE',
 'neighbourhood_cleansedOHE',
 'property_typeOHE',
 'room_typeOHE',
 'bed_typeOHE',
 'host_total_listings_count',
 'latitude',
 'longitude',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'minimum_nights',
 'number_of_reviews',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'bedrooms_na',
 'bathrooms_na',
 'beds_na',
 'review_scores_rating_na',
 'review_scores_accuracy_na',
 'review_scores_cleanliness_na',
 'review_scores_checkin_na',
 'review_scores_communication_na',
 'review_scores_location_na',
 'review_scores_value_na']

In [14]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
                                outputCol="features")

In [15]:
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(10)

22/07/15 18:46:45 WARN Instrumentation: [3bbd1622] regParam is zero, which might cause numerical instability and overfitting.
22/07/15 18:46:45 WARN Instrumentation: [3bbd1622] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
+--------------------+------+------------------+
|            features| price|        prediction|
+--------------------+------+------------------+
|(105,[0,4,8,25,47...|  85.0| 54.97283005654026|
|(105,[0,4,8,25,47...|  45.0| 23.34747155520563|
|(105,[0,4,8,25,47...|  70.0| 28.46158915392334|
|(105,[0,4,8,15,46...| 128.0|-91.65206355943246|
|(105,[0,4,8,15,47...| 159.0| 95.14003645319463|
|(105,[0,4,8,15,47...| 250.0|263.66204150177145|
|(105,[0,4,8,14,46...|  99.0| 152.5327799254228|
|(105,[0,4,8,34,46...|  95.0|180.84328858035042|
|(105,[0,4,8,31,46...| 100.0|-52.85953832967152|
|(105,[0,4,8,31,47...|2010.0| 261.2786481467865|
+--------------------+------+------------------+
only showing top 10 rows



In [17]:

regressionEvaluator = RegressionEvaluator(predictionCol="prediction",
                                            labelCol="price",
                                            metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

RMSE is 220.5


In [19]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R^2 is {r2}")

R^2 is 0.16055708814862335


Saving our model

In [21]:
pipelinePath = "../data/lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)

To load our model

In [None]:
from pyspark.ml import PipelineModel
savedPipelineModel = PipelineModel.load(pipelinePath)