In [160]:
from pyspark.sql import SparkSession


In [161]:
spark = SparkSession.builder.getOrCreate()

In [162]:
sc = spark.sparkContext

In [163]:
df = spark.read.parquet('sf-airbnb-clean.parquet/')

In [164]:
df.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [165]:
df.select('bathrooms' , 'bedrooms','price').show()

+---------+--------+-----+
|bathrooms|bedrooms|price|
+---------+--------+-----+
|      1.0|     1.0|170.0|
|      1.0|     2.0|235.0|
|      4.0|     1.0| 65.0|
|      4.0|     1.0| 65.0|
|      1.5|     2.0|785.0|
|      1.0|     2.0|255.0|
|      1.0|     1.0|139.0|
|      1.0|     1.0|135.0|
|      1.0|     2.0|265.0|
|      1.0|     3.0|177.0|
|      2.0|     3.0|194.0|
|      1.5|     1.0|139.0|
|      4.0|     1.0| 85.0|
|      3.0|     1.0| 85.0|
|      1.0|     1.0| 79.0|
|      1.0|     2.0|136.0|
|      1.0|     1.0|215.0|
|      2.0|     2.0|450.0|
|      1.0|     0.0|107.0|
|      1.0|     1.0|110.0|
+---------+--------+-----+
only showing top 20 rows


In [166]:
trainDF , testDF = df.randomSplit([0.8 , 0.2], seed=42)

In [167]:
#Dispaly notebook cell with horizontal Scroll bar
from IPython.display import display , HTML
display(HTML("<style>pre {white-space: pre !important; }</style>"))

In [168]:
trainDF.show(truncate=False)

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+-------------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude|longitude |property_type|room_type      |accommodates|bathrooms|bedrooms|beds|bed_type     |minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_l

In [169]:
from pyspark.ml.feature import VectorAssembler

In [170]:
vecAssembler = VectorAssembler(inputCols=['bathrooms','bedrooms'] , outputCol='features')

In [171]:
vectrainDF = vecAssembler.transform(trainDF)

In [172]:
vectrainDF.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [173]:
vectrainDF.select('bathrooms' ,'bedrooms','features','price').show()

+---------+--------+---------+-----+
|bathrooms|bedrooms| features|price|
+---------+--------+---------+-----+
|      1.0|     1.0|[1.0,1.0]|200.0|
|      1.0|     1.0|[1.0,1.0]|130.0|
|      1.0|     1.0|[1.0,1.0]| 95.0|
|      1.0|     1.0|[1.0,1.0]|250.0|
|      3.0|     3.0|[3.0,3.0]|250.0|
|      1.0|     1.0|[1.0,1.0]|115.0|
|      1.5|     1.0|[1.5,1.0]|105.0|
|      1.0|     1.0|[1.0,1.0]| 86.0|
|      1.0|     1.0|[1.0,1.0]|100.0|
|      1.0|     2.0|[1.0,2.0]|220.0|
|      1.0|     1.0|[1.0,1.0]|110.0|
|      1.0|     1.0|[1.0,1.0]|130.0|
|      1.0|     1.0|[1.0,1.0]|250.0|
|      1.0|     1.0|[1.0,1.0]|100.0|
|      2.0|     3.0|[2.0,3.0]|350.0|
|      2.0|     2.0|[2.0,2.0]|200.0|
|      2.0|     1.0|[2.0,1.0]|250.0|
|      1.0|     2.0|[1.0,2.0]|299.0|
|      1.5|     1.0|[1.5,1.0]| 95.0|
|      2.5|     3.0|[2.5,3.0]|500.0|
+---------+--------+---------+-----+
only showing top 20 rows


                                                                                

In [174]:
from pyspark.ml.regression import LinearRegression

In [175]:
lr = LinearRegression(featuresCol='features',labelCol='price',predictionCol='prediction') 

In [176]:
lrModel = lr.fit(vectrainDF)

25/10/22 01:39:33 WARN Instrumentation: [b9becafb] regParam is zero, which might cause numerical instability and overfitting.


In [177]:
predDF =lrModel.transform(vectrainDF)

In [178]:
predDF.select('price','prediction').show()

+-----+------------------+
|price|        prediction|
+-----+------------------+
|200.0|167.92639278679684|
|130.0|167.92639278679684|
| 95.0|167.92639278679684|
|250.0|167.92639278679684|
|250.0| 435.4958619420832|
|115.0|167.92639278679684|
|105.0|175.66710604765103|
| 86.0|167.92639278679684|
|100.0|167.92639278679684|
|220.0|286.22970084273163|
|110.0|167.92639278679684|
|130.0|167.92639278679684|
|250.0|167.92639278679684|
|100.0|167.92639278679684|
|350.0| 420.0144354203748|
|200.0|   301.71112736444|
|250.0|183.40781930850522|
|299.0|286.22970084273163|
| 95.0|175.66710604765103|
|500.0|  427.755148681229|
+-----+------------------+
only showing top 20 rows


In [179]:
#prediction for test data

vectestDF = vecAssembler.transform(testDF)

In [180]:
predDFTest = lrModel.transform(vectestDF)

In [181]:
predDFTest.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [182]:
predDFTest.select('price','prediction').show()

+------+------------------+
| price|        prediction|
+------+------------------+
|  85.0|167.92639278679684|
|  45.0|167.92639278679684|
|  70.0|167.92639278679684|
| 128.0|167.92639278679684|
| 159.0|167.92639278679684|
| 250.0|286.22970084273163|
|  99.0|167.92639278679684|
|  95.0|167.92639278679684|
| 100.0|167.92639278679684|
|2010.0|167.92639278679684|
| 270.0|167.92639278679684|
| 500.0| 317.1925538861484|
| 125.0| 49.62308473086201|
| 210.0| 404.5330088986664|
|  60.0|175.66710604765103|
| 170.0| 420.0144354203748|
| 214.0|167.92639278679684|
| 120.0|167.92639278679684|
|  82.0|183.40781930850522|
| 169.0| 404.5330088986664|
+------+------------------+
only showing top 20 rows


In [183]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [184]:
VecAssenmb = VectorAssembler(inputCols=['bathrooms','bedrooms'] , outputCol='features')
trainvecdf = VecAssenmb.transform(trainDF)
lr = LinearRegression(featuresCol='features', labelCol='price')
MlModel = lr.fit(trainvecdf)

25/10/22 01:39:37 WARN Instrumentation: [6901ee9e] regParam is zero, which might cause numerical instability and overfitting.


In [185]:
testvecdf = VecAssenmb.transform(testDF)

In [186]:
pred=MlModel.transform(testvecdf)

In [187]:
pred.select('bedrooms','bathrooms','features','price','prediction').show()

+--------+---------+---------+------+------------------+
|bedrooms|bathrooms| features| price|        prediction|
+--------+---------+---------+------+------------------+
|     1.0|      1.0|[1.0,1.0]|  85.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]|  45.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]|  70.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]| 128.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]| 159.0|167.92639278679684|
|     2.0|      1.0|[1.0,2.0]| 250.0|286.22970084273163|
|     1.0|      1.0|[1.0,1.0]|  99.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]|  95.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]| 100.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]|2010.0|167.92639278679684|
|     1.0|      1.0|[1.0,1.0]| 270.0|167.92639278679684|
|     2.0|      3.0|[3.0,2.0]| 500.0| 317.1925538861484|
|     0.0|      1.0|[1.0,0.0]| 125.0| 49.62308473086201|
|     3.0|      1.0|[1.0,3.0]| 210.0| 404.5330088986664|
|     1.0|      1.5|[1.5,1.0]| 

In [188]:
VecAssenmb = VectorAssembler(inputCols=['bathrooms','bedrooms'] , outputCol='features')
lr = LinearRegression(featuresCol='features', labelCol='price')

from pyspark.ml import Pipeline

In [189]:
pl = Pipeline(stages=[VecAssenmb,lr])

In [190]:
plModel = pl.fit(trainDF)

25/10/22 01:39:38 WARN Instrumentation: [73a2ca84] regParam is zero, which might cause numerical instability and overfitting.


In [191]:
pred = plModel.transform(testDF)

In [192]:
pred.show()

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+----------+-------------+---------------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+------+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+---------+------------------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude| longitude|property_type|      room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communica

In [193]:
 #----------------------- ML for House Pricing Prediction------------------------
#--------------------------------------------------------------------------------

In [194]:
trainCol = trainDF.dtypes
trainCol

[('host_is_superhost', 'string'),
 ('cancellation_policy', 'string'),
 ('instant_bookable', 'string'),
 ('host_total_listings_count', 'double'),
 ('neighbourhood_cleansed', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('property_type', 'string'),
 ('room_type', 'string'),
 ('accommodates', 'double'),
 ('bathrooms', 'double'),
 ('bedrooms', 'double'),
 ('beds', 'double'),
 ('bed_type', 'string'),
 ('minimum_nights', 'double'),
 ('number_of_reviews', 'double'),
 ('review_scores_rating', 'double'),
 ('review_scores_accuracy', 'double'),
 ('review_scores_cleanliness', 'double'),
 ('review_scores_checkin', 'double'),
 ('review_scores_communication', 'double'),
 ('review_scores_location', 'double'),
 ('review_scores_value', 'double'),
 ('price', 'double'),
 ('bedrooms_na', 'double'),
 ('bathrooms_na', 'double'),
 ('beds_na', 'double'),
 ('review_scores_rating_na', 'double'),
 ('review_scores_accuracy_na', 'double'),
 ('review_scores_cleanliness_na', 'double'),
 ('review_sco

In [195]:
StringColumns =[f for (f,v) in trainCol if v =='string']
StringColumns

['host_is_superhost',
 'cancellation_policy',
 'instant_bookable',
 'neighbourhood_cleansed',
 'property_type',
 'room_type',
 'bed_type']

In [196]:
NumericeColumns =[f for (f,v) in trainCol if ((v =='double')&(f!='price'))]
NumericeColumns

['host_total_listings_count',
 'latitude',
 'longitude',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'minimum_nights',
 'number_of_reviews',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'bedrooms_na',
 'bathrooms_na',
 'beds_na',
 'review_scores_rating_na',
 'review_scores_accuracy_na',
 'review_scores_cleanliness_na',
 'review_scores_checkin_na',
 'review_scores_communication_na',
 'review_scores_location_na',
 'review_scores_value_na']

In [197]:
strIndxCols = [s+'_Index' for s in StringColumns]
strIndxCols

['host_is_superhost_Index',
 'cancellation_policy_Index',
 'instant_bookable_Index',
 'neighbourhood_cleansed_Index',
 'property_type_Index',
 'room_type_Index',
 'bed_type_Index']

In [198]:
oheOut = [s+'_OHE' for s in StringColumns]
oheOut

['host_is_superhost_OHE',
 'cancellation_policy_OHE',
 'instant_bookable_OHE',
 'neighbourhood_cleansed_OHE',
 'property_type_OHE',
 'room_type_OHE',
 'bed_type_OHE']

In [199]:
AllDataCol = NumericeColumns + oheOut
AllDataCol

['host_total_listings_count',
 'latitude',
 'longitude',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'minimum_nights',
 'number_of_reviews',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'bedrooms_na',
 'bathrooms_na',
 'beds_na',
 'review_scores_rating_na',
 'review_scores_accuracy_na',
 'review_scores_cleanliness_na',
 'review_scores_checkin_na',
 'review_scores_communication_na',
 'review_scores_location_na',
 'review_scores_value_na',
 'host_is_superhost_OHE',
 'cancellation_policy_OHE',
 'instant_bookable_OHE',
 'neighbourhood_cleansed_OHE',
 'property_type_OHE',
 'room_type_OHE',
 'bed_type_OHE']

In [200]:
from pyspark.ml.feature import OneHotEncoder , StringIndexer , VectorAssembler

In [201]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import pipeline

In [202]:



strIndx = StringIndexer(inputCols=StringColumns,outputCols=strIndxCols , handleInvalid='skip')

ohe = OneHotEncoder(inputCols=strIndxCols , outputCols=oheOut,)

vecAss =VectorAssembler(inputCols=AllDataCol , outputCol='features')

lr = LinearRegression(featuresCol='features' , labelCol='price',predictionCol='prediction')

In [203]:
stagespl = [strIndx , ohe , vecAss , lr]

In [204]:
pl = Pipeline(stages=stagespl)

In [205]:
plModel = pl.fit(trainDF)

25/10/22 01:39:40 WARN Instrumentation: [000555d5] regParam is zero, which might cause numerical instability and overfitting.
25/10/22 01:39:41 WARN Instrumentation: [000555d5] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


In [206]:
plModel

PipelineModel_8273e5111050

In [207]:
plModel.write().overwrite().save('plModelReg')

In [208]:
from pyspark.ml  import PipelineModel

In [209]:
savedPlModel = PipelineModel.load('plModelReg')

In [210]:
pred = savedPlModel.transform(testDF)

In [211]:
pred.select ('features','price' , 'prediction').show(1,truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----------------+
|features                                                                                                                                                                           |price|prediction       |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----------------+
|(98,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,26,29,32,48,69,92,94],[1.0,37.72001,-122.39249,2.0,1.0,1.0,1.0,2.0,128.0,97.0,10.0,10.0,10.0,10.0,9.0,10.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|85.0 |55.41964567628929|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [212]:
from pyspark.ml.evaluation import RegressionEvaluator

In [213]:
regeval = RegressionEvaluator(predictionCol='prediction' , labelCol='price')

In [214]:
regeval.evaluate(pred)

220.67727485028183

In [215]:
regeval = RegressionEvaluator(predictionCol='prediction' , labelCol='price' , metricName='r2')

In [216]:
regeval.evaluate(pred)

0.15956462738252108

In [217]:
#-----------------------------Handle Invalid -----------------------------
#-------------------------------------------------------------------------


In [218]:
columns = ['seqno', 'Name']
data = [('1','John Jones'),('2','Mary Johnson'),('3','Peter Smith'),
        ('4','Linda Davis'),('5','James Wilson'),('6','Patricia Brown'),('7','Michael Garcia'),
        ('8','Barbara Martinez'),('9','William Rodriguez'),('10','Elizabeth Hernandez')]

In [219]:
df = spark.createDataFrame(data , schema=columns)

In [220]:
df.show()

+-----+-------------------+
|seqno|               Name|
+-----+-------------------+
|    1|         John Jones|
|    2|       Mary Johnson|
|    3|        Peter Smith|
|    4|        Linda Davis|
|    5|       James Wilson|
|    6|     Patricia Brown|
|    7|     Michael Garcia|
|    8|   Barbara Martinez|
|    9|  William Rodriguez|
|   10|Elizabeth Hernandez|
+-----+-------------------+



In [221]:
# '''
# #handle Invalid
# skip =filter rows with invalid data
# error = throw error
# keep = give a number for new categories (all new categories have same number)
# '''