<a href="https://colab.research.google.com/github/HeliosRider/Home_Sales/blob/main/Home_Sales.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import Dependencies
import sys
!{sys.executable} -m pip install findspark

# Start a SparkSession
import findspark
findspark.init()


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
from pyspark import SparkFiles
import pandas as pd

home_sales_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(home_sales_url)
home_sales_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)

# Show the delayed home sales data.
home_sales_df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [4]:
# 2. Create a temporary view from the Spark DataFrame
home_sales_df.createOrReplaceTempView("home_sales")

# Now, run SQL queries on the temporary view
spark.sql("SELECT * FROM home_sales").show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [5]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
avgprice_per_4bedrooms = spark.sql("SELECT ROUND(AVG(price),2), YEAR(date) from home_sales where bedrooms == 4 group by YEAR(date) ORDER BY YEAR(date) desc")
avgprice_per_4bedrooms.show()

+--------------------+----------+
|round(avg(price), 2)|year(date)|
+--------------------+----------+
|           296363.88|      2022|
|           301819.44|      2021|
|           298353.78|      2020|
|            300263.7|      2019|
+--------------------+----------+



In [6]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avgprice_per_4brs_3bthrms = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_sales where bedrooms == 3 AND bathrooms == 3 group by date_built ORDER BY date_built desc")
avgprice_per_4brs_3bthrms .show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           292676.79|      2017|
|           290555.07|      2016|
|            288770.3|      2015|
|           290852.27|      2014|
|           295962.27|      2013|
|           293683.19|      2012|
|           291117.47|      2011|
|           292859.62|      2010|
+--------------------+----------+



In [7]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
avgprice_per_4brs_3bthrms_2flrs = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_sales where bedrooms == 3 AND bathrooms == 3 AND floors == 2 AND sqft_living >= 2000 group by date_built ORDER BY date_built desc")
avgprice_per_4brs_3bthrms_2flrs .show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           280317.58|      2017|
|            293965.1|      2016|
|           297609.97|      2015|
|           298264.72|      2014|
|           303676.79|      2013|
|           307539.97|      2012|
|           276553.81|      2011|
|           285010.22|      2010|
+--------------------+----------+



In [8]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

avgprice_per_view_rating = spark.sql("SELECT view, ROUND(AVG(price),2) from home_sales group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc")
avgprice_per_view_rating.show()

original_runtime = print("--- %s seconds ---" % (time.time() - start_time))


+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 1.7194256782531738 seconds ---


In [9]:
# 7. Cache the the temporary table home_sales.
spark.sql('cache table home_sales')

DataFrame[]

In [10]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [11]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

avgprice_per_view_rating = spark.sql("SELECT view, ROUND(AVG(price),2) from home_sales group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc")
avgprice_per_view_rating.show()

cached_data = print("--- %s seconds ---" % (time.time() - start_time))



+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 0.625737190246582 seconds ---


In [20]:
# Determine the runtime and compare it to the uncached runtime.
print("The cached runtime of 0.6257 seconds was less than the uncached runtime of 1.7194 seconds")

The cached runtime of 0.6257 seconds was less than the uncached runtime of 1.7194 seconds


In [13]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_sales_df.write.partitionBy("date_built").mode("overwrite").parquet("parquet_delayed_partitioned")

In [14]:
# 11. Read the parquet formatted data.
parquet_df=spark.read.parquet('parquet_delayed_partitioned')

In [15]:
# 12. Create a temporary table for the parquet data.
parquet_df.createOrReplaceTempView('temp_parquet_data')

In [16]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

avgprice_per_view_rating = spark.sql("SELECT view, ROUND(AVG(price),2) from temp_parquet_data group by view having ROUND(AVG(price),2) >= 350000 ORDER BY view desc")
avgprice_per_view_rating.show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  99|          1061201.42|
|  98|          1053739.33|
|  97|          1129040.15|
|  96|          1017815.92|
|  95|           1054325.6|
|  94|           1033536.2|
|  93|          1026006.06|
|  92|           970402.55|
|  91|          1137372.73|
|  90|          1062654.16|
|  89|          1107839.15|
|  88|          1031719.35|
|  87|           1072285.2|
|  86|          1070444.25|
|  85|          1056336.74|
|  84|          1117233.13|
|  83|          1033965.93|
|  82|           1063498.0|
|  81|          1053472.79|
|  80|           991767.38|
+----+--------------------+
only showing top 20 rows

--- 0.9675025939941406 seconds ---


In [21]:
print("The parquet runtime of 0.9675 seconds is higher than the cached runtime of 0.6257 seconds")

The parquet runtime of 0.9675 seconds is higher than the cached runtime of 0.6257 seconds


In [18]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [19]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")

False