In [0]:
airbnb_df = spark.read.format('csv').option("header","true").option("inferSchema","true").load('/FileStore/tables/listings-1.csv')
display(airbnb_df.limit(5))

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
2818,Quiet Garden View Room & Super Fast WiFi,3159,Daniel,,Oostelijk Havengebied - Indische Buurt,52.36575,4.94142,Private room,59,3,278,2020-02-14,1.98,1.0,0
20168,Studio with private bathroom in the centre 1,59484,Alexander,,Centrum-Oost,52.36424,4.89396,Private room,236,1,340,2020-04-09,2.63,2.0,0
25428,Lovely apt in City Centre (w.lift) near Jordaan,56142,Joan,,Centrum-West,52.37297,4.88339,Entire home/apt,125,14,5,2020-02-09,0.15,1.0,58
27886,"Romantic, stylish B&B houseboat in canal district",97647,Flip,,Centrum-West,52.38761,4.89188,Private room,138,2,219,2020-07-25,2.05,1.0,158
28871,Comfortable double room,124245,Edwin,,Centrum-West,52.36719,4.89092,Private room,75,2,336,2020-09-20,2.72,2.0,340


In [0]:
airbnb_df.count()

In [0]:
#changing the schema of original dataset by casting the particular columns

from pyspark.sql.types import IntegerType
airbnb_df = airbnb_df.withColumn("host_id", airbnb_df["host_id"].cast(IntegerType())).withColumn("id",airbnb_df["id"].cast(IntegerType()))
airbnb_df.dtypes

In [0]:
#adding new columns storing true/false values for integerType check on columns -> id/host_id 

import pyspark.sql.functions as F

airbnb_df = airbnb_df.withColumn(
  "value_host_id",
  F.col("host_id").cast("int").isNotNull()).withColumn("value_id",F.col("id").cast("int").isNotNull())

airbnb_df.printSchema()

In [0]:
from pyspark.sql.functions import when, count, col
null_df = airbnb_df.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           airbnb_df.columns])
display(null_df)

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,value_host_id,value_id
26,35,53,104,18781,31,26,26,26,26,26,29,2316,2313,26,48,0,0


In [0]:
#filling -1 values for null values in integer columns

temp = airbnb_df.na.fill(-1)
display(temp.filter((temp.id == -1) & (temp.host_id == -1)).limit(3))
#airbnb_df.count()

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,value_host_id,value_id
-1,1344947,-1,,De Pijp - Rivierenbuurt,52.34426,4.88722,Private room,155,2,90,2020-08-15,0.87,2,353.0,-1,False,False
-1,29582869,-1,,Noord-West,52.41495,4.88927,Entire home/apt,190,7,1,2016-08-16,0.02,1,0.0,-1,False,False
-1,58563168,-1,,Zuid,52.35344,4.87972,Entire home/apt,280,1,6,2017-05-06,0.11,1,0.0,-1,False,False


In [0]:
#display(temp.limit(5))
cleaned_airbnb_df = temp.filter((temp.id != -1) & (temp.host_id != -1))
cleaned_airbnb_df.count()

In [0]:
#cleaned_airbnb_df.count()

In [0]:
'''airbnb_df.dropna(subset=("id","host_id"))
airbnb_df.count()'''

In [0]:
# creating DFs to store details where "id" & "host_id" is not of integer type
'''
null_ids = cleaned_airbnb_df.filter(F.col("value_id") == False )
null_host_id = cleaned_airbnb_df.filter(F.col("value_host_id") == False )'''


In [0]:
#checking the cleaned data for null values
#decreased to 0 for columns -> id,host_id
#reference cmd-5

from pyspark.sql.functions import when, count, col
null_count_df = cleaned_airbnb_df.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           cleaned_airbnb_df.columns])
display(null_count_df)

id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,value_host_id,value_id
0,33,0,56,18753,0,0,0,0,0,0,0,2287,2287,0,0,0,0


In [0]:
#filtering required columns 

cleaned_airbnb_df = cleaned_airbnb_df.select("id",\
                                             "name","host_id","host_name","neighbourhood","room_type",\
                                             "price","minimum_nights","number_of_reviews",\
                                             "reviews_per_month","availability_365","value_host_id","value_id")
display(cleaned_airbnb_df.limit(3))

id,name,host_id,host_name,neighbourhood,room_type,price,minimum_nights,number_of_reviews,reviews_per_month,availability_365,value_host_id,value_id
2818,Quiet Garden View Room & Super Fast WiFi,3159,Daniel,Oostelijk Havengebied - Indische Buurt,Private room,59,3,278,1.98,0,True,True
20168,Studio with private bathroom in the centre 1,59484,Alexander,Centrum-Oost,Private room,236,1,340,2.63,0,True,True
25428,Lovely apt in City Centre (w.lift) near Jordaan,56142,Joan,Centrum-West,Entire home/apt,125,14,5,0.15,58,True,True


In [0]:
#multiple bookings for single host_id

cleaned_airbnb_df.createOrReplaceTempView("host_id_count")
host_id_count = spark.sql("select host_id,count(*) as count from host_id_count group by host_id having count > 1 ")
host_id_count.count()

In [0]:
display(host_id_count.limit(5))

host_id,count
1907015,2
6797351,2
9519520,2
2674028,3
7872614,2


In [0]:
#inner join on two DFs to get details about people having more than one bookings

inner_join = cleaned_airbnb_df.join(host_id_count, on=["host_id"],how = "inner")
display(inner_join)

host_id,id,name,host_name,neighbourhood,room_type,price,minimum_nights,number_of_reviews,reviews_per_month,availability_365,value_host_id,value_id,count
59484,20168,Studio with private bathroom in the centre 1,Alexander,Centrum-Oost,Private room,236,1,340,2.63,0,True,True,2
124245,28871,Comfortable double room,Edwin,Centrum-West,Private room,75,2,336,2.72,340,True,True,2
124245,29051,Comfortable single room,Edwin,Centrum-West,Private room,55,2,481,4.13,361,True,True,2
187728,44129,Luxury design with canal view,Tatiana,Centrum-West,Entire home/apt,115,7,177,1.43,167,True,True,5
231806,50518,Perfect central Amsterdam apartment,Nikki,Westerpark,Entire home/apt,120,2,112,1.15,48,True,True,2
246493,53067,Spacious studio at the attic of a townhouse,DoJo,De Pijp - Rivierenbuurt,Private room,40,7,354,2.94,0,True,True,5
246493,53692,Large quiet Studio with gardenview in hip area.,DoJo,De Pijp - Rivierenbuurt,Private room,50,7,334,2.93,0,True,True,5
278253,58211,En Suite Apartment in a monumental canal house,Marcel,Centrum-West,Private room,151,3,119,1.18,346,True,True,2
335166,67841,Amsterdam - The Pijp Apartment 1A,Dene,De Pijp - Rivierenbuurt,Entire home/apt,106,2,16,0.19,0,True,True,2
336950,68290,Rebel - Private Room (only long term),Manuel,Centrum-West,Private room,45,30,659,5.65,142,True,True,2


In [0]:
inner_join.dropDuplicates()
inner_join.count()

In [0]:

inner_join = inner_join.select("id","name","host_id","count","host_name",\
                               "neighbourhood",
                  "room_type","price",
                  "minimum_nights",
                  "number_of_reviews",
                  "reviews_per_month",
                  "availability_365",)

display(inner_join.limit(5))

id,name,host_id,count,host_name,neighbourhood,room_type,price,minimum_nights,number_of_reviews,reviews_per_month,availability_365
20168,Studio with private bathroom in the centre 1,59484,2,Alexander,Centrum-Oost,Private room,236,1,340,2.63,0
28871,Comfortable double room,124245,2,Edwin,Centrum-West,Private room,75,2,336,2.72,340
29051,Comfortable single room,124245,2,Edwin,Centrum-West,Private room,55,2,481,4.13,361
44129,Luxury design with canal view,187728,5,Tatiana,Centrum-West,Entire home/apt,115,7,177,1.43,167
50518,Perfect central Amsterdam apartment,231806,2,Nikki,Westerpark,Entire home/apt,120,2,112,1.15,48


In [0]:
#maximum number of bookings for particular rooms

inner_join.createOrReplaceTempView("inner_join_view")
room_type_count = spark.sql("select room_type,count(*) as count from inner_join_view group by room_type")
display(room_type_count)

room_type,count
Shared room,27
Hotel room,112
Entire home/apt,2060
Private room,1680
