<a href="https://colab.research.google.com/github/aglantzrbc/spark-challenge/blob/main/Home_Sales_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import dependencies

import os
import warnings

# Suppress all warnings
warnings.simplefilter("ignore")

# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.1'
spark_version = 'spark-3.4.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,622 B]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [993 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,257 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [966 kB]
Hit:11 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [1,235 kB]
Hit:13 https://ppa.launchpadcontent.net/g

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

# Read in the home sales data from S3 into a DataFrame
spark.sparkContext.addFile(url)
home_sales_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

In [None]:
# 2. Create a temporary view of the DataFrame.
home_sales_df.createOrReplaceTempView("home_sales")
print(home_sales_df)


DataFrame[id: string, date: date, date_built: int, price: int, bedrooms: int, bathrooms: int, sqft_living: int, sqft_lot: int, floors: int, waterfront: int, view: int]


In [None]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
print("Average price for a four bedroom house sold in each year:")
print()

query = """
    SELECT YEAR(date) as year,
           ROUND(AVG(price), 2) AS average_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY YEAR(date)
    ORDER BY YEAR(date)
"""

result = spark.sql(query)
result.show()



Average price for a four bedroom house sold in each year:

+----+-------------+
|year|average_price|
+----+-------------+
|2019|     300263.7|
|2020|    298353.78|
|2021|    301819.44|
|2022|    296363.88|
+----+-------------+



In [None]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
print("Average price of a home for each year the home was built that have three bedrooms and three bathrooms:")
print()

query = """
    SELECT date_built,
           ROUND(AVG(price), 2) AS average_price
    FROM home_sales
    WHERE bedrooms = 3
      AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
"""

result = spark.sql(query)
result.show()



Average price of a home for each year the home was built that have three bedrooms and three bathrooms:

+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2010|    292859.62|
|      2011|    291117.47|
|      2012|    293683.19|
|      2013|    295962.27|
|      2014|    290852.27|
|      2015|     288770.3|
|      2016|    290555.07|
|      2017|    292676.79|
+----------+-------------+



In [None]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
print("Average price of a home for each year built that have three bedrooms, three bathrooms, with two floors, and are greater than or equal to 2,000 square feet:")
print()

query = """
    SELECT date_built,
           ROUND(AVG(price), 2) AS average_price
    FROM home_sales
    WHERE bedrooms = 3
      AND bathrooms = 3
      AND floors = 2
      AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
"""

result = spark.sql(query)
result.show()



Average price of a home for each year built that have three bedrooms, three bathrooms, with two floors, and are greater than or equal to 2,000 square feet:

+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2010|    285010.22|
|      2011|    276553.81|
|      2012|    307539.97|
|      2013|    303676.79|
|      2014|    298264.72|
|      2015|    297609.97|
|      2016|     293965.1|
|      2017|    280317.58|
+----------+-------------+



In [None]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

print("The 'view' rating for the average price of a home where the homes are greater than or equal to $350,000:")
print()

start_time = time.time()

query = """
    SELECT view,
           ROUND(AVG(price), 2) AS average_price
    FROM home_sales
    WHERE price >= 350000
    GROUP BY view
    ORDER BY average_price desc
    LIMIT 1
"""

result = spark.sql(query)
result.show()

print("Runtime: --- %s seconds ---" % (time.time() - start_time))

The 'view' rating for the average price of a home where the homes are greater than or equal to $350,000:

+----+-------------+
|view|average_price|
+----+-------------+
|  91|   1137372.73|
+----+-------------+

Runtime: --- 0.8285889625549316 seconds ---


In [None]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable("home_sales")

In [None]:
# 8. Check if the table is cached.
is_cached = spark.catalog.isCached('home_sales')

if is_cached:
    print("The table 'home_sales' is cached:", is_cached)
else:
    print("The table 'home_sales' is not cached:", is_cached)

The table 'home_sales' is cached: True


In [None]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

print("The 'view' ratings with average price greater than or equal to $350,000 using cached data:")
print()

start_time_cached = time.time()
query_cached = """
    SELECT view,
           ROUND(AVG(price), 2) AS average_price
    FROM home_sales
    WHERE price >= 350000
    GROUP BY view
    ORDER BY average_price desc
    LIMIT 1
"""
spark.sql(query_cached).show()

end_time_cached = time.time() - start_time_cached
print("Runtime using cached data: --- %s seconds ---" % end_time_cached)



The 'view' ratings with average price greater than or equal to $350,000 using cached data:

+----+-------------+
|view|average_price|
+----+-------------+
|  91|   1137372.73|
+----+-------------+

Runtime using cached data: --- 0.3559143543243408 seconds ---


In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_sales_df.write.partitionBy('date_built').parquet('p_home_sales',mode='overwrite')

In [None]:
# 11. Read the parquet formatted data.
p_homes_df = spark.read.parquet('p_home_sales')

In [None]:
# 12. Create a temporary view for the parquet data.
p_homes_df.createOrReplaceTempView('parquet_home_sales')


In [None]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

print("The 'view' ratings with average price greater than or equal to $350,000 using the parquet data:")
print()

start_time = time.time()
e = """
    SELECT view,
           ROUND(AVG(price), 2) AS average_price
    FROM parquet_home_sales
    WHERE price >= 350000
    GROUP BY view
    ORDER BY average_price desc
    LIMIT 1
"""
spark.sql(e).show()

end_time_parquet = time.time() - start_time
print("Runtime using parquet data: --- %s seconds ---" % end_time_parquet)
print("Runtime using cached data: --- %s seconds ---" % end_time_cached)

difference = end_time_parquet - end_time_cached
if difference > 0:
    print("Parquet query was slower by: --- %s seconds ---" % difference)
elif difference < 0:
    print("Parquet query was faster by: --- %s seconds ---" % abs(difference))
else:
    print("Both queries had the same runtime.")

# After the initial cache, cached data becomes faster to retrieve data retrieval happens from memory rather than being recomputed or fetched from disk.

The 'view' ratings with average price greater than or equal to $350,000 using the parquet data:

+----+-------------+
|view|average_price|
+----+-------------+
|  91|   1137372.73|
+----+-------------+

Runtime using parquet data: --- 0.502495527267456 seconds ---
Runtime using cached data: --- 0.3559143543243408 seconds ---
Parquet query was slower by: --- 0.14658117294311523 seconds ---


In [None]:
# 14. Uncache the home_sales temporary table.
spark.sql('uncache table home_sales');


In [None]:
# 15. Check if the home_sales is no longer cached
if spark.catalog.isCached('home_sales'):
  print('home_sales is cached')
else:
  print('home_sales is not cached')


home_sales is not cached
