In [1]:
import os
# Specify the Spark version
spark_version = 'spark-3.5.4'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()


Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,381 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,784 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [4]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)

home_df=spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)

# Show the DataFrame
home_df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [5]:
# 2. Create a temporary view of the DataFrame.
home_df.createOrReplaceTempView("temp_view")

In [6]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
avg_price_4_bedroom = spark.sql("SELECT ROUND(AVG(price),2), YEAR(date) from temp_view where bedrooms == 4 group by YEAR(date) ORDER BY YEAR(date) desc")
avg_price_4_bedroom.show()


+--------------------+----------+
|round(avg(price), 2)|year(date)|
+--------------------+----------+
|           296363.88|      2022|
|           301819.44|      2021|
|           298353.78|      2020|
|            300263.7|      2019|
+--------------------+----------+



In [10]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""SELECT date_built AS Year_Built, ROUND(AVG(price),2) AS Avg_Price FROM temp_view WHERE bedrooms = 3 AND bathrooms = 3 GROUP BY date_built ORDER BY date_built ASC""").show()

+----------+---------+
|Year_Built|Avg_Price|
+----------+---------+
|      2010|292859.62|
|      2011|291117.47|
|      2012|293683.19|
|      2013|295962.27|
|      2014|290852.27|
|      2015| 288770.3|
|      2016|290555.07|
|      2017|292676.79|
+----------+---------+



In [11]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""SELECT date_built AS Year_Built, ROUND(AVG(price),2)AS Avg_Price FROM temp_view WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_lot >= 2000 GROUP BY date_built ORDER BY date_built ASC""").show()

+----------+---------+
|Year_Built|Avg_Price|
+----------+---------+
|      2010|280447.23|
|      2011|281413.45|
|      2012|295858.68|
|      2013|295142.13|
|      2014|294195.13|
|      2015|291788.36|
|      2016| 287812.6|
|      2017|282026.59|
+----------+---------+



In [20]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("SELECT LPAD(view, 2, '0') AS View_Rating, ROUND(AVG(price),2) AS Avg_Price FROM temp_view WHERE price >= 350000 GROUP BY view ORDER BY LPAD(view, 2, '0') DESC").show()



# Print out the runtime

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View_Rating| Avg_Price|
+-----------+----------+
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
|         80| 991767.38|
+-----------+----------+
only showing top 20 rows

--- 1.3082869052886963 seconds ---


In [21]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table temp_view")

DataFrame[]

In [22]:
# 8. Check if the table is cached.
spark.catalog.isCached('temp_view')

True

In [23]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql("SELECT LPAD(view, 2, '0') AS View_Rating, ROUND(AVG(price),2) AS Avg_Price FROM temp_view WHERE price >= 350000 GROUP BY view ORDER BY LPAD(view, 2, '0') DESC").show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------+----------+
|View_Rating| Avg_Price|
+-----------+----------+
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
|         80| 991767.38|
+-----------+----------+
only showing top 20 rows

--- 0.61490797996521 seconds ---


The runtime for the original data was 1.308 seconds, while the runtime after it was cached was 0.615 seconds.

In [24]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_df.write \
    .partitionBy("date_built") \
    .mode("overwrite") \
    .parquet("partitioned_home_sales_data")

In [25]:
# 11. Read the parquet formatted data.
partitioned_df = spark.read.parquet("partitioned_home_sales_data")

In [26]:
# 12. Create a temporary table for the parquet data.
partitioned_df.createOrReplaceTempView("p_home_sales_data")

In [27]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql("SELECT LPAD(view, 2, '0') AS View_Rating, ROUND(AVG(price),2) AS Avg_Price FROM p_home_sales_data WHERE price >= 350000 GROUP BY view ORDER BY LPAD(view, 2, '0') DESC").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View_Rating| Avg_Price|
+-----------+----------+
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
|         80| 991767.38|
+-----------+----------+
only showing top 20 rows

--- 1.5243785381317139 seconds ---


The runtime for the partitioned data was 1.524 seconds, more than 2 times slower than runtime for cached, and slightly longer than run time for original.  (please note that original data was run multiple times to refine code, and this is reflected in a shortened run time for original.

In [28]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table temp_view")

DataFrame[]

In [29]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('temp_view')

False