In [1]:
# Import findspark and initialise. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
from pyspark import SparkFiles

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)


In [4]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')


In [5]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
spark.sql("""
SELECT   YEAR(date) as YearSold
        ,ROUND(AVG(price),2) as HouseAveragePrice
  FROM home_sales
 WHERE bedrooms = 4
 GROUP BY YEAR(date)
 ORDER BY YEAR(date) DESC
""").show()


+--------+-----------------+
|YearSold|HouseAveragePrice|
+--------+-----------------+
|    2022|        296363.88|
|    2021|        301819.44|
|    2020|        298353.78|
|    2019|         300263.7|
+--------+-----------------+



In [6]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""
SELECT   YEAR(date_built) as YearBuilt
        ,ROUND(AVG(price),2) as HouseAveragePrice
  FROM home_sales
 WHERE (bedrooms = 3 AND bathrooms = 3)
 GROUP BY YEAR(date_built)
 ORDER BY YEAR(date_built) DESC
""").show()


+---------+-----------------+
|YearBuilt|HouseAveragePrice|
+---------+-----------------+
|     2017|        292676.79|
|     2016|        290555.07|
|     2015|         288770.3|
|     2014|        290852.27|
|     2013|        295962.27|
|     2012|        293683.19|
|     2011|        291117.47|
|     2010|        292859.62|
+---------+-----------------+



In [7]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""
SELECT   YEAR(date_built) as YearBuilt
        ,ROUND(AVG(price),2) as HouseAveragePrice
  FROM home_sales
 WHERE (bedrooms = 3 AND bathrooms = 3)
   AND floors = 2
   AND sqft_living > 2000
 GROUP BY YEAR(date_built)
 ORDER BY YEAR(date_built) DESC
""").show()


+---------+-----------------+
|YearBuilt|HouseAveragePrice|
+---------+-----------------+
|     2017|        280527.62|
|     2016|         293965.1|
|     2015|        297609.97|
|     2014|        297619.46|
|     2013|        303676.79|
|     2012|        307539.97|
|     2011|        276553.81|
|     2010|        285010.22|
+---------+-----------------+



In [8]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating. 
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

query_avg_house_price_per_view_rating = """
SELECT   view as ViewRating
        ,ROUND(AVG(price),2) as HouseAveragePrice
  FROM home_sales
 GROUP BY view
 HAVING HouseAveragePrice >= 350000
 ORDER BY view DESC
"""

spark.sql(query_avg_house_price_per_view_rating).show()

print("--- %s seconds ---" % (time.time() - start_time))

+----------+-----------------+
|ViewRating|HouseAveragePrice|
+----------+-----------------+
|        99|       1061201.42|
|        98|       1053739.33|
|        97|       1129040.15|
|        96|       1017815.92|
|        95|        1054325.6|
|        94|        1033536.2|
|        93|       1026006.06|
|        92|        970402.55|
|        91|       1137372.73|
|        90|       1062654.16|
|        89|       1107839.15|
|        88|       1031719.35|
|        87|        1072285.2|
|        86|       1070444.25|
|        85|       1056336.74|
|        84|       1117233.13|
|        83|       1033965.93|
|        82|        1063498.0|
|        81|       1053472.79|
|        80|        991767.38|
+----------+-----------------+
only showing top 20 rows

--- 2.0894150733947754 seconds ---


In [9]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table home_sales")

DataFrame[]

In [10]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [11]:
# 9. Using the cached data, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql(query_avg_house_price_per_view_rating).show()

print("--- %s seconds ---" % (time.time() - start_time))


+----------+-----------------+
|ViewRating|HouseAveragePrice|
+----------+-----------------+
|        99|       1061201.42|
|        98|       1053739.33|
|        97|       1129040.15|
|        96|       1017815.92|
|        95|        1054325.6|
|        94|        1033536.2|
|        93|       1026006.06|
|        92|        970402.55|
|        91|       1137372.73|
|        90|       1062654.16|
|        89|       1107839.15|
|        88|       1031719.35|
|        87|        1072285.2|
|        86|       1070444.25|
|        85|       1056336.74|
|        84|       1117233.13|
|        83|       1033965.93|
|        82|        1063498.0|
|        81|       1053472.79|
|        80|        991767.38|
+----------+-----------------+
only showing top 20 rows

--- 1.6220743656158447 seconds ---


In [12]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 
df.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")

In [13]:
# 11. Read the formatted parquet data.
p_df_p=spark.read.parquet('home_sales_partitioned')

In [14]:
# 12. Create a temporary table for the parquet data.
p_df_p.createOrReplaceTempView('home_sales')

In [15]:
# 13. Using the parquet DataFrame, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql(query_avg_house_price_per_view_rating).show()

print("--- %s seconds ---" % (time.time() - start_time))

+----------+-----------------+
|ViewRating|HouseAveragePrice|
+----------+-----------------+
|        99|       1061201.42|
|        98|       1053739.33|
|        97|       1129040.15|
|        96|       1017815.92|
|        95|        1054325.6|
|        94|        1033536.2|
|        93|       1026006.06|
|        92|        970402.55|
|        91|       1137372.73|
|        90|       1062654.16|
|        89|       1107839.15|
|        88|       1031719.35|
|        87|        1072285.2|
|        86|       1070444.25|
|        85|       1056336.74|
|        84|       1117233.13|
|        83|       1033965.93|
|        82|        1063498.0|
|        81|       1053472.79|
|        80|        991767.38|
+----------+-----------------+
only showing top 20 rows

--- 2.1819822788238525 seconds ---


In [16]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [17]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_sales')


False

In [18]:
# Cleanup / shutdown Spark session
spark.stop()