In [144]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate();
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('airbnb_nyc_cleaned.csv')
df=df.drop('latitude','longitude','host_id')
df.take(1)

[Row(neighbourhood_group='Brooklyn', neighbourhood='Kensington', room_type='Private room', price=149, minimum_nights=1, number_of_reviews=9, reviews_per_month=0.21, calculated_host_listings_count=6, availability_365=365)]

In [145]:
import pyspark.sql.functions as F
df.select(   [ F.count( F.when( F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]  ).show()

+-------------------+-------------+---------+-----+--------------+-----------------+-----------------+------------------------------+----------------+
|neighbourhood_group|neighbourhood|room_type|price|minimum_nights|number_of_reviews|reviews_per_month|calculated_host_listings_count|availability_365|
+-------------------+-------------+---------+-----+--------------+-----------------+-----------------+------------------------------+----------------+
|                  0|            0|        0|    0|             0|                0|                0|                             0|               0|
+-------------------+-------------+---------+-----+--------------+-----------------+-----------------+------------------------------+----------------+



In [146]:
df.printSchema()

root
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)



## preprocessing

In [147]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder

categoricalColumns = ['neighbourhood_group', 'neighbourhood', 'room_type']
stages = []

for categoricalCol in categoricalColumns:
 
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
numericCols =['price','minimum_nights', 'number_of_reviews', 'reviews_per_month', 'calculated_host_listings_count','availability_365']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [148]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
prep_DF = pipelineModel.transform(df)
prep_DF.printSchema()

root
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- neighbourhood_groupIndex: double (nullable = false)
 |-- neighbourhood_groupclassVec: vector (nullable = true)
 |-- neighbourhoodIndex: double (nullable = false)
 |-- neighbourhoodclassVec: vector (nullable = true)
 |-- room_typeIndex: double (nullable = false)
 |-- room_typeclassVec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [149]:
prep_DF.select('features').take(1)


[Row(features=SparseVector(232, {1: 1.0, 56: 1.0, 225: 1.0, 226: 149.0, 227: 1.0, 228: 9.0, 229: 0.21, 230: 6.0, 231: 365.0}))]

In [150]:
prep_DF = prep_DF.select(['features', 'price'])
prep_DF.show(3)

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(232,[1,56,225,22...|  149|
|(232,[0,13,224,22...|  225|
|(232,[0,6,225,226...|  150|
+--------------------+-----+
only showing top 3 rows



In [151]:
splits = prep_DF.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
print(train_df.count())
print(test_df.count())

34174
14667


In [152]:
train_df.show()
train_df.describe().show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(232,[0,6,224,226...|   65|
|(232,[0,6,224,226...|   68|
|(232,[0,6,224,226...|   69|
|(232,[0,6,224,226...|   70|
|(232,[0,6,224,226...|   70|
|(232,[0,6,224,226...|   70|
|(232,[0,6,224,226...|   70|
|(232,[0,6,224,226...|   72|
|(232,[0,6,224,226...|   74|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   76|
|(232,[0,6,224,226...|   76|
|(232,[0,6,224,226...|   77|
|(232,[0,6,224,226...|   77|
|(232,[0,6,224,226...|   77|
|(232,[0,6,224,226...|   78|
|(232,[0,6,224,226...|   78|
|(232,[0,6,224,226...|   79|
+--------------------+-----+
only showing top 20 rows

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|             34174|
|   mean| 152.0949844911336|
| stddev|231.87775417813512|
|    min|                10|
|    max|             10000|
+-------+------------------+



In [153]:
test_df.show()

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(232,[0,6,224,226...|   50|
|(232,[0,6,224,226...|   55|
|(232,[0,6,224,226...|   65|
|(232,[0,6,224,226...|   67|
|(232,[0,6,224,226...|   68|
|(232,[0,6,224,226...|   70|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   75|
|(232,[0,6,224,226...|   76|
|(232,[0,6,224,226...|   77|
|(232,[0,6,224,226...|   78|
|(232,[0,6,224,226...|   80|
|(232,[0,6,224,226...|   80|
|(232,[0,6,224,226...|   80|
|(232,[0,6,224,226...|   80|
|(232,[0,6,224,226...|   89|
|(232,[0,6,224,226...|   93|
|(232,[0,6,224,226...|   95|
|(232,[0,6,224,226...|   95|
+--------------------+-----+
only showing top 20 rows



### Random forest regressor

In [92]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Creation d'un RandomForest .
rf = RandomForestRegressor(labelCol="price", featuresCol="features", maxDepth=20, maxBins=100)

# Training
rfModel = rf.fit(train_df)

predictions = rfModel.transform(test_df)
predictions.select("prediction","price","features").show(5)

# Evaluate model

rf_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % rf_evaluator.evaluate(predictions))

rf_evaluator_1 = RegressionEvaluator(predictionCol="prediction",labelCol="price",metricName="rmse")
print("Root Mean Squared Error (RMSE) on test data = %g" % rf_evaluator_1.evaluate(predictions))


+-----------------+-----+--------------------+
|       prediction|price|            features|
+-----------------+-----+--------------------+
|82.87113951139625|   70|(231,[0,6,224,226...|
|85.14834322082487|   70|(231,[0,6,224,226...|
|79.60059355402596|   70|(231,[0,6,224,226...|
|85.26482964210967|   70|(231,[0,6,224,226...|
|86.61939283025036|   75|(231,[0,6,224,226...|
+-----------------+-----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.641317
Root Mean Squared Error (RMSE) on test data = 111.766


### Decision tree regression:


In [162]:
from pyspark.ml.regression import DecisionTreeRegressor


# Creation d'un RandomForest .
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'price')

# Training
dt_model = dt.fit(train_df)

dt_predictions = dt_model.transform(test_df)
dt_predictions.select("prediction","price","features").show(5)

# Evaluate model
dt_evaluatorR2 = RegressionEvaluator(predictionCol="prediction",labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % dt_evaluatorR2.evaluate(predictions))

dt_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


+------------------+-----+--------------------+
|        prediction|price|            features|
+------------------+-----+--------------------+
|44.174886260236576|   50|(232,[0,6,224,226...|
|44.174886260236576|   55|(232,[0,6,224,226...|
| 67.00574393181397|   65|(232,[0,6,224,226...|
| 67.00574393181397|   67|(232,[0,6,224,226...|
| 67.00574393181397|   68|(232,[0,6,224,226...|
+------------------+-----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.621351
Root Mean Squared Error (RMSE) on test data = 177.178


### Linear regression 

In [154]:

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

# Creation d'un  Regression lineaire
lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Training
lr_Model = lr.fit(train_df)

lr_predictions = lr_Model.transform(test_df)
lr_predictions.select("prediction","price","features").show(5)

# Evaluation

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

test_result = lr_Model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)


+-----------------+-----+--------------------+
|       prediction|price|            features|
+-----------------+-----+--------------------+
|50.13037626402358|   50|(232,[0,6,224,226...|
|55.12399121657618|   55|(232,[0,6,224,226...|
|65.11122112168137|   65|(232,[0,6,224,226...|
| 67.1086671027024|   67|(232,[0,6,224,226...|
|68.10739009321293|   68|(232,[0,6,224,226...|
+-----------------+-----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.999998
Root Mean Squared Error (RMSE) on test data = 0.328886
