In [None]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [1]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [2]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "home_sales_revised.csv"

home_sales=spark.read.format('csv').option("header","true").load(url)

In [3]:
home_sales.show(5)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [17]:
home_sales.toPandas().head()

Unnamed: 0,id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
0,f8a53099-ba1c-47d6-9c31-7398aa8f6089,2022-04-08,2016,936923,4,3,3167,11733,2,1,76
1,7530a2d8-1ae3-4517-9f4a-befe060c4353,2021-06-13,2013,379628,2,2,2235,14384,1,0,23
2,43de979c-0bf0-4c9f-85ef-96dc27b258d5,2019-04-12,2014,417866,2,2,2127,10575,2,0,0
3,b672c137-b88c-48bf-9f18-d0a4ac62fb8b,2019-10-16,2016,239895,2,2,1631,11149,2,0,0
4,e0726d4d-d595-4074-8283-4139a54d0d63,2022-01-08,2017,424418,3,2,2249,13878,2,0,4


In [6]:
# 2. Create a temporary view of the DataFrame.

home_sales.createOrReplaceTempView("home_sales")

In [6]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?

spark.sql("""
    select year(date) as sold_year,round(avg(price),2) as avg_price
    from home_sales
    where bedrooms=4    
    group by year(date)
    order by year(date)
;
""").toPandas()

Unnamed: 0,sold_year,avg_price
0,2019,300263.7
1,2020,298353.78
2,2021,301819.44
3,2022,296363.88


In [7]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?

spark.sql("""
    select date_built,round(avg(price),2) as avg_price
    from home_sales
    where bedrooms=3
    and bathrooms=3
    group by date_built
    order by date_built
;
""").toPandas()

Unnamed: 0,date_built,avg_price
0,2010,292859.62
1,2011,291117.47
2,2012,293683.19
3,2013,295962.27
4,2014,290852.27
5,2015,288770.3
6,2016,290555.07
7,2017,292676.79


In [10]:
#  5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

spark.sql("""
    select date_built,round(avg(price),2) as avg_price
    from home_sales
    where bedrooms=3
    and bathrooms=3
    and floors=2
    and sqft_living>=2000
    group by date_built
    order by date_built
;
""").toPandas()

Unnamed: 0,date_built,avg_price
0,2010,285010.22
1,2011,276553.81
2,2012,307539.97
3,2013,303676.79
4,2014,298264.72
5,2015,297609.97
6,2016,293965.1
7,2017,280317.58


In [4]:
home_sales.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [10]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

result = spark.sql("""
    select view,round(avg(price),2) as avg_price
    from home_sales
    group by view
    having avg_price>=350000
;
""").toPandas()
display(result)

print("--- %s seconds ---" % (time.time() - start_time))
print("--- %s seconds ---" % round((time.time() - start_time),2))

Unnamed: 0,view,avg_price
0,51,788128.21
1,54,798684.82
2,69,750537.94
3,87,1072285.2
4,73,752861.18
5,64,767036.67
6,59,791453.0
7,85,1056336.74
8,52,733780.26
9,71,775651.1


--- 0.28221940994262695 seconds ---
--- 0.28 seconds ---


In [11]:
# 7. Cache the the temporary table home_sales.

home_sales.cache()

DataFrame[id: string, date: string, date_built: string, price: string, bedrooms: string, bathrooms: string, sqft_living: string, sqft_lot: string, floors: string, waterfront: string, view: string]

In [14]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [16]:
# 9. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

result = spark.sql("""
    select view,round(avg(price),2) as avg_price
    from home_sales
    group by view
    having avg_price>=350000
;
""").toPandas()
display(result)

print("--- %s seconds ---" % (time.time() - start_time))


Unnamed: 0,view,avg_price
0,51,788128.21
1,54,798684.82
2,69,750537.94
3,87,1072285.2
4,73,752861.18
5,64,767036.67
6,59,791453.0
7,85,1056336.74
8,52,733780.26
9,71,775651.1


--- 0.1730031967163086 seconds ---


In [34]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 

home_sales.write.partitionBy('date_built').format('parquet').mode("overwrite").save("date_built_parquet")

In [17]:
# 11. Read the formatted parquet data.

home_sales=spark.read.parquet('date_built_parquet')
home_sales.show(2)

+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|   3|      2015|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
only showing top 2 rows



In [18]:
# 12. Create a temporary table for the parquet data.

home_sales.createOrReplaceTempView('home_sales')

In [19]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()

result = spark.sql("""
    select view,round(avg(price),2) as avg_price
    from home_sales
    group by view
    having avg_price>=350000
;
""").toPandas()
display(result)


print("--- %s seconds ---" % (time.time() - start_time))

Unnamed: 0,view,avg_price
0,51,788128.21
1,54,798684.82
2,73,752861.18
3,87,1072285.2
4,64,767036.67
5,59,791453.0
6,85,1056336.74
7,52,733780.26
8,71,775651.1
9,98,1053739.33


--- 0.3470034599304199 seconds ---


In [37]:
# 14. Uncache the home_sales temporary table.

home_sales.unpersist()

DataFrame[id: string, date: string, price: string, bedrooms: string, bathrooms: string, sqft_living: string, sqft_lot: string, floors: string, waterfront: string, view: string, date_built: int]

In [38]:
# 15. Check if the home_sales is no longer cached

spark.catalog.isCached('home_sales')

False