In [102]:

# Import findspark and initialise. 
import findspark
findspark.init()

# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, round
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# 1. Read in the home_sales_revised.csv data into a Spark DataFrame.
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv("home_sales_revised.csv", header=True, inferSchema=True)

# 2. Create a temporary table called home_sales.
df.createOrReplaceTempView("home_sales")


                                                                                

In [103]:

# 3. What is the average price for a four-bedroom house sold for each year? Round off your answer to two decimal places.
avg_price_4_bedroom = spark.sql("""
    SELECT year(date) AS year, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY year(date)
    ORDER BY year
""")
avg_price_4_bedroom.show()
avg_price_4_bedroom.show()
    

+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+

+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



In [104]:

# 4. What is the average price of a home for each year the home was built, that has three bedrooms and three bathrooms? Round off your answer to two decimal places.
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avg_price_3_bed_3_bath = spark.sql("""
    SELECT date_built AS year_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
""")
avg_price_3_bed_3_bath.show()

+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|292859.62|
|      2011|291117.47|
|      2012|293683.19|
|      2013|295962.27|
|      2014|290852.27|
|      2015| 288770.3|
|      2016|290555.07|
|      2017|292676.79|
+----------+---------+



In [105]:

# 5. What is the average price of a home for each year the home was built, that has three bedrooms, three bathrooms, two floors, and is greater than or equal to 2,000 square feet? Round off your answer to two decimal places.
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
avg_price_specific = spark.sql("""
    SELECT date_built AS year_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
""")
avg_price_specific.show()

+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|285010.22|
|      2011|276553.81|
|      2012|307539.97|
|      2013|303676.79|
|      2014|298264.72|
|      2015|297609.97|
|      2016| 293965.1|
|      2017|280317.58|
+----------+---------+



In [106]:

# 6. What is the average price of a home per "view" rating having an average home price greater than or equal to $350,000? Determine the runtime for this query, and round off your answer to two decimal places.
# having an average home price greater than or equal to $350,000? Order by descending view rating. 
# Although this is a small dataset, determine the run time for this query.
start_time = time.time()
avg_price_view = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY avg_price DESC
""")
avg_price_view.show()
uncached_runtime = time.time() - start_time
print(f"Uncached runtime: {uncached_runtime} seconds")

+----+----------+
|view| avg_price|
+----+----------+
|  91|1137372.73|
|  97|1129040.15|
|  84|1117233.13|
|  75|1114042.94|
|  89|1107839.15|
|  78|1080649.37|
|  77|1076205.56|
|  87| 1072285.2|
|  86|1070444.25|
|  82| 1063498.0|
|  90|1062654.16|
|  99|1061201.42|
|  76|1058802.78|
|  85|1056336.74|
|  95| 1054325.6|
|  98|1053739.33|
|  81|1053472.79|
|  83|1033965.93|
|  94| 1033536.2|
|  88|1031719.35|
+----+----------+
only showing top 20 rows

Uncached runtime: 0.24407029151916504 seconds


In [107]:

# 7. Cache the temporary table home_sales.
spark.catalog.cacheTable("home_sales")

# 8. Check if the table is cached.
print(f"Is 'home_sales' cached? {spark.catalog.isCached('home_sales')}")


Is 'home_sales' cached? True


In [108]:

# 9. Using the cached data, run the last query again and determine the runtime.
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.
start_time = time.time()
avg_price_view_cached = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY avg_price DESC
""")
avg_price_view_cached.show()
cached_runtime = time.time() - start_time
print(f"Cached runtime: {cached_runtime} seconds")


+----+----------+
|view| avg_price|
+----+----------+
|  91|1137372.73|
|  97|1129040.15|
|  84|1117233.13|
|  75|1114042.94|
|  89|1107839.15|
|  78|1080649.37|
|  77|1076205.56|
|  87| 1072285.2|
|  86|1070444.25|
|  82| 1063498.0|
|  90|1062654.16|
|  99|1061201.42|
|  76|1058802.78|
|  85|1056336.74|
|  95| 1054325.6|
|  98|1053739.33|
|  81|1053472.79|
|  83|1033965.93|
|  94| 1033536.2|
|  88|1031719.35|
+----+----------+
only showing top 20 rows

Cached runtime: 0.6052045822143555 seconds


In [109]:

# 10. Partition by the "date_built" field on the formatted parquet home sales data.
df.write.mode("overwrite").partitionBy("date_built").parquet("home_sales_partitioned")



In [110]:

# 11. Read the formatted parquet data.
# 12. Create a temporary table for the parquet data.
parquet_df = spark.read.parquet("home_sales_partitioned")
parquet_df.createOrReplaceTempView("home_sales_parquet")


In [111]:

# 13. Run the last query on the parquet temporary table and determine the runtime.
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.
start_time = time.time()
avg_price_view_parquet = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales_parquet
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY avg_price DESC
""")
avg_price_view_parquet.show()
parquet_runtime = time.time() - start_time
print(f"Parquet runtime: {parquet_runtime} seconds")


+----+----------+
|view| avg_price|
+----+----------+
|  91|1137372.73|
|  97|1129040.15|
|  84|1117233.13|
|  75|1114042.94|
|  89|1107839.15|
|  78|1080649.37|
|  77|1076205.56|
|  87| 1072285.2|
|  86|1070444.25|
|  82| 1063498.0|
|  90|1062654.16|
|  99|1061201.42|
|  76|1058802.78|
|  85|1056336.74|
|  95| 1054325.6|
|  98|1053739.33|
|  81|1053472.79|
|  83|1033965.93|
|  94| 1033536.2|
|  88|1031719.35|
+----+----------+
only showing top 20 rows

Parquet runtime: 0.3876931667327881 seconds


In [112]:

# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable("home_sales")

# 15. Verify that the home_sales temporary table is uncached.
print(f"Is 'home_sales' cached after uncache? {spark.catalog.isCached('home_sales')}")


Is 'home_sales' cached after uncache? False
