In [None]:
# Import findspark and initialize.
import findspark
findspark.init()

# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

# Create a SparkSession in local mode
spark = SparkSession.builder.master("local[*]").appName("SparkSQL").getOrCreate()

# 1. Read the home_sales_revised.csv data into a Spark DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv("file://" + SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

# 2. Create a temporary table called home_sales.
df.createOrReplaceTempView("home_sales")

# 3. What is the average price for a four-bedroom house sold for each year? Round off your answer to two decimal places.
query_3 = """
SELECT year, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY year
ORDER BY year
"""
result_3 = spark.sql(query_3)
result_3.show()

# 4. What is the average price of a home for each year it was built that has three bedrooms and three bathrooms? Round off your answer to two decimal places.
query_4 = """
SELECT year_built, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY year_built
ORDER BY year_built
"""
result_4 = spark.sql(query_4)
result_4.show()

# 5. What is the average price of a home for each year that has three bedrooms, three bathrooms, two floors, and is greater than or equal to 2,000 square feet? Round off your answer to two decimal places.
query_5 = """
SELECT year_built, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft >= 2000
GROUP BY year_built
ORDER BY year_built
"""
result_5 = spark.sql(query_5)
result_5.show()

# 6. What is the "view" rating for homes costing more than or equal to $350,000? Determine the run time for this query, and round off your answer to two decimal places.
start_time = time.time()
query_6 = """
SELECT view, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE price >= 350000
GROUP BY view
ORDER BY view
"""
result_6 = spark.sql(query_6)
result_6.show()
print("--- %s seconds ---" % (time.time() - start_time))

# 7. Cache your temporary table home_sales.
spark.catalog.cacheTable('home_sales')

# 8. Check if your temporary table is cached.
print("Is home_sales cached?", spark.catalog.isCached('home_sales'))

# 9. Using the cached data, run the query that filters out the view ratings with an average price of greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.
start_time = time.time()
result_9 = spark.sql(query_6)
result_9.show()
print("--- %s seconds ---" % (time.time() - start_time))

# 10. Partition by the "date_built" field on the formatted parquet home sales data.
df.write.partitionBy("date_built").parquet("parquet_home_sales")

# 11. Read the formatted parquet data.
parquet_df = spark.read.parquet("parquet_home_sales")

# 12. Create a temporary table for the parquet data.
parquet_df.createOrReplaceTempView("parquet_home_sales")

# 13. Run the query that filters out the view ratings with an average price of greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.
start_time = time.time()
result_13 = spark.sql(query_6.replace("home_sales", "parquet_home_sales"))
result_13.show()
print("--- %s seconds ---" % (time.time() - start_time))

# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable('home_sales')

# 15. Verify that the home_sales temporary table is uncached using PySpark.
print("Is home_sales cached?", spark.catalog.isCached('home_sales'))
