In [117]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [118]:
sc = SparkSession.builder.appName("sql_ex1").getOrCreate()
df = sc.read.csv("file:///home/hduser/programs/dataset/airbnb.csv", header=True)
df.show(3)

+----+--------------------+-------+---------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+---------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|     John|           Brooklyn|   Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca...|   2845| Jennifer|          Manhattan|      M

In [119]:
df.dtypes

[('id', 'string'),
 ('name', 'string'),
 ('host_id', 'string'),
 ('host_name', 'string'),
 ('neighbourhood_group', 'string'),
 ('neighbourhood', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('room_type', 'string'),
 ('price', 'string'),
 ('minimum_nights', 'string'),
 ('number_of_reviews', 'string'),
 ('last_review', 'string'),
 ('reviews_per_month', 'string'),
 ('calculated_host_listings_count', 'string'),
 ('availability_365', 'string')]

## Selecting only relevant columns

In [120]:
df2 = df.select('neighbourhood_group', 'neighbourhood', 'room_type', 'price', \
               'minimum_nights','number_of_reviews', 'reviews_per_month', \
               'calculated_host_listings_count', 'availability_365', 'latitude', 'longitude')

## Typecasting all strings to integers and floats

In [121]:
from pyspark.sql.types import FloatType, IntegerType
df2 = df2.withColumn("price", df2["price"].cast(IntegerType()))
df2 = df2.withColumn("minimum_nights", df2["minimum_nights"].cast(IntegerType()))
df2 = df2.withColumn("number_of_reviews", df2["number_of_reviews"].cast(IntegerType()))
df2 = df2.withColumn("reviews_per_month", df2["reviews_per_month"].cast(FloatType()))
df2 = df2.withColumn("calculated_host_listings_count", df2["calculated_host_listings_count"].cast(IntegerType()))
df2 = df2.withColumn("availability_365", df2["availability_365"].cast(IntegerType()))
df2 = df2.withColumn("latitude", df2["latitude"].cast(FloatType()))
df2 = df2.withColumn("longitude", df2["longitude"].cast(FloatType()))

## Replacing null values by 0 for reviews_per_month

In [122]:
df2.na.fill(value=0,subset=["reviews_per_month"]).show(3)

+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+
|neighbourhood_group|neighbourhood|      room_type|price|minimum_nights|number_of_reviews|reviews_per_month|calculated_host_listings_count|availability_365|latitude|longitude|
+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+
|           Brooklyn|   Kensington|   Private room|  149|             1|                9|             0.21|                             6|             365|40.64749|-73.97237|
|          Manhattan|      Midtown|Entire home/apt|  225|             1|               45|             0.38|                             2|             355|40.75362|-73.98377|
|          Manhattan|       Harlem|   Private room|  150|             3|                0|              0.0|            

In [123]:
df2.dtypes

[('neighbourhood_group', 'string'),
 ('neighbourhood', 'string'),
 ('room_type', 'string'),
 ('price', 'int'),
 ('minimum_nights', 'int'),
 ('number_of_reviews', 'int'),
 ('reviews_per_month', 'float'),
 ('calculated_host_listings_count', 'int'),
 ('availability_365', 'int'),
 ('latitude', 'float'),
 ('longitude', 'float')]

In [124]:
df2 = df2.na.drop(how="any")

## Removing Outliers i.e. price > 250

In [125]:
df2 = df2.filter(df2.price<=250)

## Encoding neighbourhood and roomtype column

In [126]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
ng_indexer = StringIndexer(inputCol="neighbourhood_group", outputCol="ng1")
n_indexer = StringIndexer(inputCol="neighbourhood", outputCol="n1")
rt_indexer = StringIndexer(inputCol="room_type", outputCol="rt1")
ng_onehot = OneHotEncoder(inputCol="ng1", outputCol="ngv")
n_onehot = OneHotEncoder(inputCol="n1", outputCol="nv")
rt_onehot = OneHotEncoder(inputCol="rt1", outputCol="rtv")

pipeline = Pipeline(stages=[ng_indexer, n_indexer, rt_indexer, \
                            ng_onehot, n_onehot, rt_onehot])
df3 = pipeline.fit(df2).transform(df2)
df3.show(5)

+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+---+----+---+-------------+----------------+-------------+
|neighbourhood_group|neighbourhood|      room_type|price|minimum_nights|number_of_reviews|reviews_per_month|calculated_host_listings_count|availability_365|latitude|longitude|ng1|  n1|rt1|          ngv|              nv|          rtv|
+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+---+----+---+-------------+----------------+-------------+
|           Brooklyn|   Kensington|   Private room|  149|             1|                9|             0.21|                             6|             365|40.64749|-73.97237|0.0|49.0|0.0|(4,[0],[1.0])|(217,[49],[1.0])|(2,[0],[1.0])|
|          Manhattan|      Midtown|Entire home/apt|  225|       

In [127]:
df3.show(3)

+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+---+----+---+-------------+----------------+-------------+
|neighbourhood_group|neighbourhood|      room_type|price|minimum_nights|number_of_reviews|reviews_per_month|calculated_host_listings_count|availability_365|latitude|longitude|ng1|  n1|rt1|          ngv|              nv|          rtv|
+-------------------+-------------+---------------+-----+--------------+-----------------+-----------------+------------------------------+----------------+--------+---------+---+----+---+-------------+----------------+-------------+
|           Brooklyn|   Kensington|   Private room|  149|             1|                9|             0.21|                             6|             365|40.64749|-73.97237|0.0|49.0|0.0|(4,[0],[1.0])|(217,[49],[1.0])|(2,[0],[1.0])|
|          Manhattan|      Midtown|Entire home/apt|  225|       

In [128]:
df3.dtypes

[('neighbourhood_group', 'string'),
 ('neighbourhood', 'string'),
 ('room_type', 'string'),
 ('price', 'int'),
 ('minimum_nights', 'int'),
 ('number_of_reviews', 'int'),
 ('reviews_per_month', 'float'),
 ('calculated_host_listings_count', 'int'),
 ('availability_365', 'int'),
 ('latitude', 'float'),
 ('longitude', 'float'),
 ('ng1', 'double'),
 ('n1', 'double'),
 ('rt1', 'double'),
 ('ngv', 'vector'),
 ('nv', 'vector'),
 ('rtv', 'vector')]

## Vectorizing features

In [129]:
va = VectorAssembler(inputCols = \
                     ['minimum_nights', 'number_of_reviews', 'reviews_per_month', \
                      'calculated_host_listings_count', 'availability_365', \
                      'ngv','rtv', 'nv','latitude', 'longitude'], outputCol = 'features')
df4 = va.transform(df3)
df4 = df4.select(['features', 'price'])
df4.show(5)

+--------------------+-----+
|            features|price|
+--------------------+-----+
|(230,[0,1,2,3,4,5...|  149|
|(230,[0,1,2,3,4,6...|  225|
|(230,[0,1,2,3,4,5...|   89|
|(230,[0,1,2,3,6,1...|   80|
|(230,[0,1,2,3,4,6...|  200|
+--------------------+-----+
only showing top 5 rows



In [130]:
splits = df4.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

## Linear Regression

In [131]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='price')
lr_model = lr.fit(train_df)
#print("Coefficients: " + str(lr_model.coefficients))
#print("Intercept: " + str(lr_model.intercept))

In [132]:
trainingSummary = lr_model.summary
print("Linear Regression RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("Linear Regression R2: %f" % trainingSummary.r2)

Linear Regression RMSE: 36.971942
Linear Regression R2: 0.566009


In [133]:
# TEST Results
fullPredictions = lr_model.transform(test_df).cache()
print(fullPredictions)
fullPredictions.select("prediction","price","features").show(5)

DataFrame[features: vector, price: int, prediction: double]
+-----------------+-----+--------------------+
|       prediction|price|            features|
+-----------------+-----+--------------------+
|  81.606265873761|   80|(230,[0,1,2,3,4,5...|
|78.45477234752616|  100|(230,[0,1,2,3,4,5...|
|74.62740454451341|  110|(230,[0,1,2,3,4,5...|
|66.59965735463993|   99|(230,[0,1,2,3,4,5...|
|69.66324051949414|   65|(230,[0,1,2,3,4,5...|
+-----------------+-----+--------------------+
only showing top 5 rows



In [134]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(fullPredictions))
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="rmse")
print("RMSE (R2) on test data = %g" % lr_evaluator.evaluate(fullPredictions))

R Squared (R2) on test data = 0.552981
RMSE (R2) on test data = 37.8103


# Decision Tree Regressor

In [135]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='price')
dt_model = dt.fit(train_df)
dtPrediction = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % dt_evaluator.evaluate(dtPrediction))
dt_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="rmse")
print("RMSE on test data = %g" % dt_evaluator.evaluate(dtPrediction))

R Squared (R2) on test data = 0.540875
RMSE on test data = 38.3189


In [136]:
dtPrediction.show(5)

+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(230,[0,1,2,3,4,5...|   80| 64.44272445820434|
|(230,[0,1,2,3,4,5...|  100| 64.44272445820434|
|(230,[0,1,2,3,4,5...|  110|60.027386333421966|
|(230,[0,1,2,3,4,5...|   99|60.027386333421966|
|(230,[0,1,2,3,4,5...|   65| 64.44272445820434|
+--------------------+-----+------------------+
only showing top 5 rows



# Gradiant Boosted Tree Regression

In [137]:
from pyspark.ml.regression import GBTRegressor
gb = GBTRegressor(featuresCol = 'features', labelCol='price')
gb_model = gb.fit(train_df)
gbPredictions = gb_model.transform(test_df)
gb_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % gb_evaluator.evaluate(gbPredictions))
gb_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="rmse")
print("RMSE on test data = %g" % gb_evaluator.evaluate(gbPredictions))

R Squared (R2) on test data = 0.574128
RMSE on test data = 36.9051


In [138]:
gbPredictions.show(5)

+--------------------+-----+-----------------+
|            features|price|       prediction|
+--------------------+-----+-----------------+
|(230,[0,1,2,3,4,5...|   80|75.99084275229757|
|(230,[0,1,2,3,4,5...|  100|78.06530814702504|
|(230,[0,1,2,3,4,5...|  110|72.90774273443479|
|(230,[0,1,2,3,4,5...|   99| 70.9711636667254|
|(230,[0,1,2,3,4,5...|   65|83.20356079754676|
+--------------------+-----+-----------------+
only showing top 5 rows



# Random Forest Regressor

In [139]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol = 'features', labelCol='price')
rf_model = rf.fit(train_df)
rfPredictions = rf_model.transform(test_df)
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % rf_evaluator.evaluate(rfPredictions))
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="rmse")
print("RMSE on test data = %g" % rf_evaluator.evaluate(rfPredictions))

R Squared (R2) on test data = 0.543257
RMSE on test data = 38.2193


In [140]:
rfPredictions.show(5)

+--------------------+-----+-----------------+
|            features|price|       prediction|
+--------------------+-----+-----------------+
|(230,[0,1,2,3,4,5...|   80|69.63637301508058|
|(230,[0,1,2,3,4,5...|  100|68.82653986271585|
|(230,[0,1,2,3,4,5...|  110|63.65623787225847|
|(230,[0,1,2,3,4,5...|   99|64.33962521455275|
|(230,[0,1,2,3,4,5...|   65|68.23344498150145|
+--------------------+-----+-----------------+
only showing top 5 rows

