# Part 2: Spark Dataframe API

In [1]:
#installing PySpark

import findspark
findspark.init()

In [2]:
#importing neccessary libraries

import csv
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

In [3]:
#creating the SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

# Part 2_Task 1

### Download the parquet data file from the URL: (https://github.com/databricks/LearningSparkV2/blob/master/mlflow-project-example/data/sf-airbnb-clean.parquet) and load it into a Spark data frame.

In [4]:
#loading given airbnb.parquet dataset into Spark DataFrame

airbnbDF = spark.read.parquet("airbnb.parquet")

In [5]:
#displaying first 5 rows of a dataFrame

airbnbDF.head(5)

[Row(host_is_superhost='t', cancellation_policy='moderate', instant_bookable='t', host_total_listings_count=1.0, neighbourhood_cleansed='Western Addition', latitude=37.76931, longitude=-122.43386, property_type='Apartment', room_type='Entire home/apt', accommodates=3.0, bathrooms=1.0, bedrooms=1.0, beds=2.0, bed_type='Real Bed', minimum_nights=1.0, number_of_reviews=180.0, review_scores_rating=97.0, review_scores_accuracy=10.0, review_scores_cleanliness=10.0, review_scores_checkin=10.0, review_scores_communication=10.0, review_scores_location=10.0, review_scores_value=10.0, price=170.0, bedrooms_na=0.0, bathrooms_na=0.0, beds_na=0.0, review_scores_rating_na=0.0, review_scores_accuracy_na=0.0, review_scores_cleanliness_na=0.0, review_scores_checkin_na=0.0, review_scores_communication_na=0.0, review_scores_location_na=0.0, review_scores_value_na=0.0),
 Row(host_is_superhost='f', cancellation_policy='strict_14_with_grace_period', instant_bookable='f', host_total_listings_count=2.0, neighb

In [6]:
#displaying total number of rows in a AirBnB dataFrame

print(airbnbDF.count())

7146


In [7]:
#getting number of columns in a AirBnB dataFrame

len(airbnbDF.columns)

34

In [8]:
#displaying all the columns in a AirBnB dataFrame 

airbnbDF.printSchema()

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores_communication: double (nullable = true

In [9]:
import pyspark.sql.functions as F

df_agg = airbnbDF.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in airbnbDF.columns])

In [10]:
df_agg.show()

+-----------------+-------------------+----------------+-------------------------+----------------------+--------+---------+-------------+---------+------------+---------+--------+----+--------+--------------+-----------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----+-----------+------------+-------+-----------------------+-------------------------+----------------------------+------------------------+------------------------------+-------------------------+----------------------+
|host_is_superhost|cancellation_policy|instant_bookable|host_total_listings_count|neighbourhood_cleansed|latitude|longitude|property_type|room_type|accommodates|bathrooms|bedrooms|beds|bed_type|minimum_nights|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_va

# Part 2_Task 2

### Create CSV output file: out_2_2.txt that lists the minimum price, maximum price, and total row count from the dataset. Use the following output column names in the resultant file: min_price, max_price, row_count

In [11]:
#getting minimum and maximum price and total row count from AirBnB DataFrame

minMaxCountDF = airbnbDF.select(min("price").alias("min_price"), max("price").alias("max_price") , count("price").alias("row_count"))

In [12]:
#displaying minimum and maximum price and total row count from AirBnB DataFrame

minMaxCountDF.show()

+---------+---------+---------+
|min_price|max_price|row_count|
+---------+---------+---------+
|     10.0|  10000.0|     7146|
+---------+---------+---------+



In [21]:
#writing the result to a CSV output file with column names: out_2_2.txt

minMaxCountDF.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("out_2_2.txt")

# Part 2_Task 3

### Calculate the average number of bathrooms and bedrooms across all the properties listed in the data set with a price of > 5000 and a review score being exactly equalt to 10. Write the results into a CSV file: out_2_3.txt with the following column headers: avg_bathrooms, avg_bedrooms

In [24]:
#calculating the average number of bathrooms and bedrooms across all the properties listed in the data set with a 
#price of > 5000 and a review score being exactly equalt to 10.

#There are so many columns with review scores and the question didn't mentioned the exact column name.
#So assuming "review_scores_value" column as review score.

avgBathBedDF = airbnbDF.filter((airbnbDF['price'] > 5000)&(airbnbDF['review_scores_value'] == 10)).select(avg("Bathrooms").alias("avg_bathrooms"), avg("Bedrooms").alias("avg_bedrooms"))

In [25]:
#displaying average number of bathrooms and bedrooms from AirBnB DataFrame

avgBathBedDF.show()

+-------------+------------+
|avg_bathrooms|avg_bedrooms|
+-------------+------------+
|        2.375|         3.0|
+-------------+------------+



In [16]:
#writing the result to a CSV output file with column names: out_2_3.txt

avgBathBedDF.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("out_2_3.txt")

In [26]:
# Assuming the average number of bathrooms and bedrooms should be a round value. 
# However I am not writing out this result to output file.

avgBathBedDF_temp = airbnbDF.filter((airbnbDF['price'] > 5000)&(airbnbDF['review_scores_value'] == 10)).select(round(avg("Bathrooms")).alias("avg_bathrooms"), round(avg("Bedrooms")).alias("avg_bedrooms"))

In [27]:
#displaying average number of bathrooms and bedrooms(round value) from AirBnB DataFrame

avgBathBedDF_temp.show()

+-------------+------------+
|avg_bathrooms|avg_bedrooms|
+-------------+------------+
|          2.0|         3.0|
+-------------+------------+



# Part 2_Task 4

### How many people can be accomodated by the property with the lowest price and highest rating? Write the resulting number to a text file: out_2_4.txt .

In [17]:
#getting all the accomodates based on the price and rating.

#Assuming "review_scores_rating" column as rating column

groupingDF = airbnbDF.select("accommodates","price","review_scores_rating").groupBy("price","review_scores_rating").sum("accommodates")

In [18]:
#getting number of people can be accomodated by the property with the lowest price and highest rating

accomodateDF = groupingDF.sort(groupingDF.price.asc(),groupingDF.review_scores_rating.desc()).limit(1)

In [19]:
#displaying the result

accomodateDF.show()

+-----+--------------------+-----------------+
|price|review_scores_rating|sum(accommodates)|
+-----+--------------------+-----------------+
| 10.0|               100.0|              2.0|
+-----+--------------------+-----------------+



In [20]:
##writing only the resulting number to a text output file: out_2_3.txt

accomodateDF.select('sum(accommodates)').write.mode("overwrite").format("csv").save("out_2_4.txt")

                                        End of Task 1, 2, 3, 4 of Part 2

                                Task 5 of Part 2 is in a seperate task_2_5.py file