In [1]:
df = spark.sql("select * from ab_nyc_2019_csv")

In [2]:
df.count()

In [3]:
df.describe()

In [4]:
df.printSchema()

In [5]:
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [6]:
df=df.dropna(subset=['neighbourhood_group','neighbourhood','latitude','longitude','room_type','price','minimum_nights','availability_365','calculated_host_listings_count'])

In [7]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [8]:
# we dont really need last review date , name and host_name for our analysis part since this doesnt affect our price in logical way in real world.
df=df.drop('last_review','name','host_name')

In [9]:
df.printSchema()

In [10]:
df.count()

In [11]:
# reviews per month will be zero if the number of reviews are zero ( we can clearly see here the number of reviews and reviews_per_month are of same count when reviews_per_month is zero)
df.select('number_of_reviews','reviews_per_month').filter("reviews_per_month is null").count()

In [12]:
# replacing the null values of reviews_per_month to zero.
df=df.fillna({ 'reviews_per_month':0 } )

In [13]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [14]:
df.count()

In [15]:
df.show()

In [16]:
# lets find the distinct unique valyes in neighbourhood_group
df.select('neighbourhood_group').distinct().show()

In [17]:
# lets find the distinct unique valyes in neighbourhood
df.select('neighbourhood').distinct().show()

In [18]:
# lets  find the distinct unique valyes in neighbourhood
df.select('room_type').distinct().show()

In [19]:
df.createOrReplaceTempView('ab_newyork_view')

In [20]:
spark.sql("select * from ab_newyork_view").show()

In [21]:
spark.sql("select distinct(room_type) from ab_newyork_view").show()

In [22]:
spark.sql("select count(id) from ab_newyork_view where room_type='-73.90783'").show()

In [23]:
spark.sql("select count(id) as number_of_bookings,neighbourhood_group from ab_newyork_view group by neighbourhood_group order by number_of_bookings desc").show()

In [24]:
spark.sql("select count(id) as number_of_bookings,neighbourhood from ab_newyork_view group by neighbourhood order by number_of_bookings desc").show()

In [25]:
spark.sql("select count(id) as number_of_bookings,room_type from ab_newyork_view group by room_type order by number_of_bookings desc").show()

In [26]:
spark.sql("select count(neighbourhood) as number_of_365_neighbourhoods,neighbourhood_group from ab_newyork_view where availability_365 = 365 group by neighbourhood_group\
          order by number_of_365_neighbourhoods desc").show()

In [27]:
spark.sql("select sum(number_of_reviews) as Reviews_Count,neighbourhood,neighbourhood_group,room_type,price from ab_newyork_view group by\
          neighbourhood,neighbourhood_group,room_type,price order by Reviews_Count desc limit 10").show()

###Linear Regression

In [29]:
# Import the required libraries

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline

In [30]:
# Use StringIndexer to convert the categorical columns to hold numerical data

neighbourhood_group_indexer = StringIndexer(inputCol='neighbourhood_group',outputCol='neighbourhood_group_index',handleInvalid='keep')
room_type_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')

In [31]:
assembler = VectorAssembler(inputCols=['neighbourhood_group_index','latitude','longitude',
                                       'room_type_index','minimum_nights','number_of_reviews',
                                       'calculated_host_listings_count','availability_365'],
                            outputCol="features")

In [32]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[neighbourhood_group_indexer,room_type_indexer,assembler])

In [33]:
# Create a 70-30 train test split

train_data,test_data=df.randomSplit([0.7,0.3])

In [34]:
fit_pipe=pipe.fit(train_data)

In [35]:
train_data=fit_pipe.transform(train_data)

In [36]:
lr_model = LinearRegression(labelCol='price')

In [37]:
fit_model = lr_model.fit(train_data.select(['features','price']))

In [38]:
test_data=fit_pipe.transform(test_data)

In [39]:
lr_results = fit_model.transform(test_data)

In [40]:
lr_results.select(['price','prediction']).show()

In [41]:
test_results = fit_model.evaluate(test_data)

In [42]:
test_results.residuals.show()

In [43]:
test_results.rootMeanSquaredError

In [44]:
test_results.r2

In [45]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

In [46]:
pd_df2 = lr_results.toPandas()
print(pd_df2)

In [47]:
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.scatterplot(pd_df2['prediction'],pd_df2['price'])
display(plt.show())

d ###Decision Tree

In [49]:
# Import the required libraries

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [50]:
neighbourhood_group_indexer = StringIndexer(inputCol='neighbourhood_group',outputCol='neighbourhood_group_index',handleInvalid='keep')
room_type_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')

In [51]:
assembler = VectorAssembler(inputCols=['neighbourhood_group_index','latitude','longitude',
                                       'room_type_index','minimum_nights','number_of_reviews',
                                       'calculated_host_listings_count','availability_365'],
                            outputCol="features")

In [52]:
train,test=df.randomSplit([0.7,0.3])

In [53]:
dt_model=DecisionTreeRegressor(featuresCol="features",labelCol='price')

In [54]:
pipe = Pipeline(stages=[neighbourhood_group_indexer,room_type_indexer,assembler,dt_model])

In [55]:
train_model = pipe.fit(train)

In [56]:
test_result = train_model.transform(test)

In [57]:
test_result.select(['price','prediction']).show()

In [58]:
eval = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")

In [59]:
rmse = eval.evaluate(test_result)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [60]:
eval1 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")

In [61]:
r2 = eval1.evaluate(test_result)
print("R Squared (R2) on test data = %g" % r2)

In [62]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

In [63]:
pd_df3 = test_result.toPandas()
print(pd_df3)

In [64]:
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.scatterplot(pd_df3['prediction'],pd_df3['price'])
display(plt.show())

###Random Forest

In [66]:
# Import the required libraries

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [67]:
neighbourhood_group_indexer = StringIndexer(inputCol='neighbourhood_group',outputCol='neighbourhood_group_index',handleInvalid='keep')
room_type_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')

In [68]:
assembler = VectorAssembler(inputCols=['neighbourhood_group_index','latitude','longitude',
                                       'room_type_index','minimum_nights','number_of_reviews',
                                       'calculated_host_listings_count','availability_365'],
                            outputCol="features")

In [69]:
train_dt,test_dt=df.randomSplit([0.7,0.3])

In [70]:
rf_model=RandomForestRegressor(featuresCol="features",labelCol='price')

In [71]:
pipe = Pipeline(stages=[neighbourhood_group_indexer,room_type_indexer,assembler,rf_model])

In [72]:
model = pipe.fit(train_dt)

In [73]:
result = model.transform(test_dt)

In [74]:
result.select(['price','prediction']).show()

In [75]:
evaluate = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")

In [76]:
rmse = evaluate.evaluate(result)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [77]:
evaluate1 = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="r2")

In [78]:
rsq = evaluate1.evaluate(test_result)
print("R Squared (R2) on test data = %g" % rsq)

In [79]:
pd_df4 = result.toPandas()
print(pd_df4)

In [80]:
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.scatterplot(pd_df4['prediction'],pd_df4['price'])
display(plt.show())