In [None]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# Set the Spark version you want to install
spark_version = 'spark-3.5.2'
os.environ['SPARK_VERSION'] = spark_version

# Install Java and Spark
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Initialize findspark
import findspark
findspark.init()


0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to r2u.stat.illinois.edu (192.17.190.167)]                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Connected to r2u.stat.illinois.edu (192.17.190.167)] [Waiting for headers]                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connected to r2u.stat.illinois.edu (192.17.190.167)] [Waiting for headers] [Connecting to ppa.la                                                                                                    Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcont    

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [None]:
# 2. Create a temporary view of the DataFrame.

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True, quote="\"", escape="\"")

df.createOrReplaceTempView('home_sales')



In [None]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
spark.sql("""
SELECT
    year(date) AS Year,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    home_sales
WHERE
    bedrooms = 4
GROUP BY
    Year
ORDER BY
    Year
""").show(truncate=False)



+----+-------------+
|Year|Average_Price|
+----+-------------+
|2019|300263.7     |
|2020|298353.78    |
|2021|301819.44    |
|2022|296363.88    |
+----+-------------+



In [None]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""
SELECT
    date_built AS Year_Built,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    home_sales
WHERE
    bedrooms = 3 AND bathrooms = 3
GROUP BY
    Year_Built
ORDER BY
    Year_Built
""").show(truncate=False)


+----------+-------------+
|Year_Built|Average_Price|
+----------+-------------+
|2010      |292859.62    |
|2011      |291117.47    |
|2012      |293683.19    |
|2013      |295962.27    |
|2014      |290852.27    |
|2015      |288770.3     |
|2016      |290555.07    |
|2017      |292676.79    |
+----------+-------------+



In [None]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""
SELECT
    date_built AS Year_Built,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    home_sales
WHERE
    bedrooms = 3
    AND bathrooms = 3
    AND floors = 2
    AND sqft_living >= 2000
GROUP BY
    Year_Built
ORDER BY
    Year_Built
""").show(truncate=False)


+----------+-------------+
|Year_Built|Average_Price|
+----------+-------------+
|2010      |285010.22    |
|2011      |276553.81    |
|2012      |307539.97    |
|2013      |303676.79    |
|2014      |298264.72    |
|2015      |297609.97    |
|2016      |293965.1     |
|2017      |280317.58    |
+----------+-------------+



In [None]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()
spark.sql("""
SELECT
    view AS View_Rating,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    home_sales
GROUP BY
    View_Rating
HAVING
    Average_Price >= 350000
ORDER BY
    View_Rating DESC
""").show(truncate=False)


print("--- %s seconds ---" % (time.time() - start_time))

+-----------+-------------+
|View_Rating|Average_Price|
+-----------+-------------+
|100        |1026669.5    |
|99         |1061201.42   |
|98         |1053739.33   |
|97         |1129040.15   |
|96         |1017815.92   |
|95         |1054325.6    |
|94         |1033536.2    |
|93         |1026006.06   |
|92         |970402.55    |
|91         |1137372.73   |
|90         |1062654.16   |
|89         |1107839.15   |
|88         |1031719.35   |
|87         |1072285.2    |
|86         |1070444.25   |
|85         |1056336.74   |
|84         |1117233.13   |
|83         |1033965.93   |
|82         |1063498.0    |
|81         |1053472.79   |
+-----------+-------------+
only showing top 20 rows

--- 1.0432865619659424 seconds ---


In [None]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable("home_sales")


In [None]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [None]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()
spark.sql("""
SELECT
    view AS View_Rating,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    home_sales
GROUP BY
    View_Rating
HAVING
    Average_Price >= 350000
ORDER BY
    View_Rating DESC
""").show(truncate=False)



print("--- %s seconds ---" % (time.time() - start_time))


+-----------+-------------+
|View_Rating|Average_Price|
+-----------+-------------+
|100        |1026669.5    |
|99         |1061201.42   |
|98         |1053739.33   |
|97         |1129040.15   |
|96         |1017815.92   |
|95         |1054325.6    |
|94         |1033536.2    |
|93         |1026006.06   |
|92         |970402.55    |
|91         |1137372.73   |
|90         |1062654.16   |
|89         |1107839.15   |
|88         |1031719.35   |
|87         |1072285.2    |
|86         |1070444.25   |
|85         |1056336.74   |
|84         |1117233.13   |
|83         |1033965.93   |
|82         |1063498.0    |
|81         |1053472.79   |
+-----------+-------------+
only showing top 20 rows

--- 2.8917877674102783 seconds ---


In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").parquet('home_sales', mode='overwrite')


In [None]:
# 11. Read the parquet formatted data.
p_df = spark.read.parquet('home_sales')


In [None]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('parquet_home_sales')


In [None]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql("""
SELECT
    view AS View_Rating,
    ROUND(AVG(price), 2) AS Average_Price
FROM
    parquet_home_sales
GROUP BY
    View_Rating
HAVING
    Average_Price >= 350000
ORDER BY
    View_Rating DESC
""").show(truncate=False)

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+-------------+
|View_Rating|Average_Price|
+-----------+-------------+
|100        |1026669.5    |
|99         |1061201.42   |
|98         |1053739.33   |
|97         |1129040.15   |
|96         |1017815.92   |
|95         |1054325.6    |
|94         |1033536.2    |
|93         |1026006.06   |
|92         |970402.55    |
|91         |1137372.73   |
|90         |1062654.16   |
|89         |1107839.15   |
|88         |1031719.35   |
|87         |1072285.2    |
|86         |1070444.25   |
|85         |1056336.74   |
|84         |1117233.13   |
|83         |1033965.93   |
|82         |1063498.0    |
|81         |1053472.79   |
+-----------+-------------+
only showing top 20 rows

--- 0.9183156490325928 seconds ---


In [None]:
# 14. Uncache the home_sales temporary table.

spark.catalog.uncacheTable('home_sales')


In [None]:
# 15. Check if the home_sales is no longer cached
is_cached = spark.catalog.isCached('home_sales')
print(f"Is 'home_sales' table cached? {is_cached}")


Is 'home_sales' table cached? False
