In [33]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.5.5'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [Connecting to archive.ubuntu.com (185.125.190.83)] [Waiting for headers] [1 InRelease 3,632 B/3,0% [Connecting to archive.ubuntu.com (185.125.190.83)] [Waiting for headers] [Waiting for headers] [                                                                                                    Get:2 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,686 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubunt

In [34]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [35]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"


In [36]:
# 2. Create a temporary view of the DataFrame.
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)
df.createOrReplaceTempView("home_sales")

In [37]:
# display the df
df.show(truncate=False)

+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|id                                  |date      |date_built|price |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d6-9c31-7398aa8f6089|2022-04-08|2016      |936923|4       |3        |3167       |11733   |2     |1         |76  |
|7530a2d8-1ae3-4517-9f4a-befe060c4353|2021-06-13|2013      |379628|2       |2        |2235       |14384   |1     |0         |23  |
|43de979c-0bf0-4c9f-85ef-96dc27b258d5|2019-04-12|2014      |417866|2       |2        |2127       |10575   |2     |0         |0   |
|b672c137-b88c-48bf-9f18-d0a4ac62fb8b|2019-10-16|2016      |239895|2       |2        |1631       |11149   |2     |0         |0   |
|e0726d4d-d595-4074-8283-4139a54d0d63|2022-01-08|2017      |424418|3       |2      

In [38]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
query = """
SELECT YEAR(date) AS year, ROUND(AVG(price), 2) as avg_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY year
ORDER BY year
"""
spark.sql(query).show()

+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



In [39]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
query = """
SELECT date_built AS year, ROUND (AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY date_built
ORDER BY date_built
"""
spark.sql(query).show()

+----+---------+
|year|avg_price|
+----+---------+
|2010|292859.62|
|2011|291117.47|
|2012|293683.19|
|2013|295962.27|
|2014|290852.27|
|2015| 288770.3|
|2016|290555.07|
|2017|292676.79|
+----+---------+



In [40]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
query = """
SELECT date_built AS year, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY year
ORDER BY year
"""
spark.sql(query).show()

+----+---------+
|year|avg_price|
+----+---------+
|2010|285010.22|
|2011|276553.81|
|2012|307539.97|
|2013|303676.79|
|2014|298264.72|
|2015|297609.97|
|2016| 293965.1|
|2017|280317.58|
+----+---------+



In [44]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()
query = """
SELECT view AS view_rating, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
GROUP BY view_rating
HAVING avg_price >= 350000
ORDER BY view_rating DESC
"""
spark.sql(query).show()



print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.46152496337890625 seconds ---


In [45]:
# 7. Cache the the temporary table home_sales.
spark.sql("CACHE TABLE home_sales")

DataFrame[]

In [46]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [47]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()
query = """
SELECT view AS view_rating, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
GROUP BY view_rating
HAVING avg_price >= 350000
ORDER BY view_rating DESC
"""
spark.sql(query).show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.2895655632019043 seconds ---


*Analysis*: The query execution time dropped from 0.46 seconds to just 0.28 seconds after caching, which is a massive improvement in speed. This confirms that Spark is now reading from memory (RAM) instead of repeatedly scanning the dataset from disk.

In [48]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.mode('overwrite').partitionBy('date_built').parquet('p_home_sales')

The data has been saved in Parquet format. This has been confirmed manually by navigating to the file directory and executing !ls p_home_sales from the notebook. The files inside each folder contain the Parquet data corresponding to the rows with that particular date_built value. In this case, the folders are named based on the following date_built values: 2010, 2011, 2012, 2013, 2014, 2015, 2016, and 2017. If we want to query the data later, we can efficiently filter by the date_built column, and Spark will only load the relevant partitions.

In [49]:
# 11. Read the parquet formatted data.
df_parquet = spark.read.parquet("p_home_sales")

In [52]:
# 12. Create a temporary table for the parquet data.
df_parquet.createOrReplaceTempView("p_home_sales")

In [56]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()
query = """
SELECT view AS view_rating, ROUND(AVG(price), 2) AS avg_price
FROM p_home_sales
GROUP BY view_rating
HAVING avg_price >= 350000
ORDER BY view_rating DESC
"""
spark.sql(query).show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.8122079372406006 seconds ---


In [58]:
# 14. Uncache the home_sales temporary table.
spark.sql("UNCACHE TABLE home_sales")

DataFrame[]

In [59]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")

False

**Analysis on query runtime:**


* The first run without caching or partitioning took approximately 0.46 seconds.
* After caching the data using cache table home_sales, the run time decreased to 0.28 seconds
* After partitioning the data by date_built and creating a temporary view (p_home_sales), the run time increased to 0.89 seconds.

Reason: While partitioning is a powerful optimization for large datasets, it may not always lead to an immediate reduction in query time if the dataset is small or the query does not filter by partitioned columns.




