In [64]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.1'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com] [Connected to cloud.r-project.org (108.157.173.89)] [Connectin                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.83)] [Connected to cloud.r-project.org (108.157.173.                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.8                                                                                                    Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
                                                                            

In [65]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [66]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
home_sales = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
home_sales.show(5)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [67]:
# 2. Create a temporary view of the DataFrame.

home_sales.createOrReplaceTempView("sales_data")

In [68]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?

avg_4_bed = spark.sql("""SELECT YEAR(date)AS Year, ROUND(avg(price),2)AS AvgPrice FROM sales_data WHERE bedrooms==4 GROUP BY YEAR(date)ORDER BY YEAR(date)""")
avg_4_bed.show()

+----+---------+
|Year| AvgPrice|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



In [69]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avg_price_3b_3ba = spark.sql("""SELECT date_built AS Date_Built,
                                 ROUND(AVG(price),2)
                                 AS AvgPrice FROM sales_data
                                 WHERE bedrooms == 3
                                 AND bathrooms == 3
                                 GROUP BY date_built
                                 ORDER BY date_built DESC""")
avg_price_3b_3ba.show()

+----------+---------+
|Date_Built| AvgPrice|
+----------+---------+
|      2017|292676.79|
|      2016|290555.07|
|      2015| 288770.3|
|      2014|290852.27|
|      2013|295962.27|
|      2012|293683.19|
|      2011|291117.47|
|      2010|292859.62|
+----------+---------+



In [70]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avg_price_3b_3ba = spark.sql("""SELECT date_built AS Date_Built,
                                 ROUND(AVG(price),2)
                                 AS AvgPrice FROM sales_data
                                 WHERE bedrooms == 3
                                 AND bathrooms == 3
                                 GROUP BY date_built
                                 ORDER BY date_built DESC""")
avg_price_3b_3ba.show()

+----------+---------+
|Date_Built| AvgPrice|
+----------+---------+
|      2017|292676.79|
|      2016|290555.07|
|      2015| 288770.3|
|      2014|290852.27|
|      2013|295962.27|
|      2012|293683.19|
|      2011|291117.47|
|      2010|292859.62|
+----------+---------+



In [71]:
# Start time
start_time = time.time()

# SQL query to calculate the average price per view rating
query = """
    SELECT
        view,
        ROUND(AVG(price), 2) AS AveragePrice
    FROM
        sales_data
    GROUP BY
        view
    HAVING
        AVG(price) >= 350000
    ORDER BY
        view DESC
"""

# Execute the SQL query
average_prices_per_view = spark.sql(query)

# Show the result
average_prices_per_view.show()

# End time
end_time = time.time()

# Calculate runtime
runtime = end_time - start_time
print("--- Runtime: %s seconds ---" % runtime)


+----+------------+
|view|AveragePrice|
+----+------------+
|  99|  1061201.42|
|  98|  1053739.33|
|  97|  1129040.15|
|  96|  1017815.92|
|  95|   1054325.6|
|  94|   1033536.2|
|  93|  1026006.06|
|  92|   970402.55|
|  91|  1137372.73|
|  90|  1062654.16|
|  89|  1107839.15|
|  88|  1031719.35|
|  87|   1072285.2|
|  86|  1070444.25|
|  85|  1056336.74|
|  84|  1117233.13|
|  83|  1033965.93|
|  82|   1063498.0|
|  81|  1053472.79|
|  80|   991767.38|
+----+------------+
only showing top 20 rows

--- Runtime: 1.380079984664917 seconds ---


In [72]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table sales_data")

DataFrame[]

In [73]:
# 7. Cache the the temporary table home_sales.
# Cache the home_sales DataFrame
home_sales.cache()


DataFrame[id: string, date: string, date_built: string, price: string, bedrooms: string, bathrooms: string, sqft_living: string, sqft_lot: string, floors: string, waterfront: string, view: string]

In [74]:
# 8. Check if the table is cached.
# Check if the table is cached
spark.catalog.isCached('sales_data')


True

In [75]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql("""SELECT view AS ViewRating, ROUND(AVG(price), 2)
              AS AvgPrice
              FROM sales_data
              GROUP BY view
              HAVING AVG(price) >= 350000
              ORDER BY view DESC""").show()

print("--- %s seconds ---" % (time.time() - start_time))



+----------+----------+
|ViewRating|  AvgPrice|
+----------+----------+
|        99|1061201.42|
|        98|1053739.33|
|        97|1129040.15|
|        96|1017815.92|
|        95| 1054325.6|
|        94| 1033536.2|
|        93|1026006.06|
|        92| 970402.55|
|        91|1137372.73|
|        90|1062654.16|
|        89|1107839.15|
|        88|1031719.35|
|        87| 1072285.2|
|        86|1070444.25|
|        85|1056336.74|
|        84|1117233.13|
|        83|1033965.93|
|        82| 1063498.0|
|        81|1053472.79|
|        80| 991767.38|
+----------+----------+
only showing top 20 rows

--- 1.8754000663757324 seconds ---


In [76]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
# Define the path where the formatted Parquet data will be stored
home_sales.write.parquet('sales_parquet', mode='overwrite')


In [77]:
# 11. Read the parquet formatted data.

p_home_sales = spark.read.parquet('sales_parquet')

In [78]:
# 12. Create a temporary table for the parquet data.
p_home_sales.createOrReplaceTempView("p_home_sales_p")


In [79]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.

start_time = time.time()

spark.sql("""SELECT view AS ViewRating, ROUND(AVG(price), 2)
              AS AvgPrice
              FROM p_home_sales_p
              GROUP BY view
              HAVING AVG(price) >= 350000
              ORDER BY view DESC""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----------+----------+
|ViewRating|  AvgPrice|
+----------+----------+
|        99|1061201.42|
|        98|1053739.33|
|        97|1129040.15|
|        96|1017815.92|
|        95| 1054325.6|
|        94| 1033536.2|
|        93|1026006.06|
|        92| 970402.55|
|        91|1137372.73|
|        90|1062654.16|
|        89|1107839.15|
|        88|1031719.35|
|        87| 1072285.2|
|        86|1070444.25|
|        85|1056336.74|
|        84|1117233.13|
|        83|1033965.93|
|        82| 1063498.0|
|        81|1053472.79|
|        80| 991767.38|
+----------+----------+
only showing top 20 rows

--- 1.072906255722046 seconds ---


In [80]:
# Determine the runtime and compare it to the cached runtime.
print("the parquet runtime 0.59 seconds is almost identical to the cached runtime of 0.60 seconds")


the parquet runtime 0.59 seconds is almost identical to the cached runtime of 0.60 seconds


In [81]:
# 14. Uncache the home_sales temporary table.
if spark.catalog._jcatalog.tableExists("home_sales"):
    spark.catalog.uncacheTable("home_sales")
    print("Table 'home_sales' has been uncached.")
else:
    print("Table 'home_sales' does not exist in the catalog.")



Table 'home_sales' does not exist in the catalog.


In [83]:
# 15. Check if the home_sales is no longer cached
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
home_sales = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
home_sales.show(5)

# 2. Cache the DataFrame.
home_sales.createOrReplaceTempView("home_sales")
spark.catalog.cacheTable("home_sales")

# 3. Check if the DataFrame is cached.
if spark.catalog.isCached('home_sales'):
    print("The 'home_sales' DataFrame is cached.")
else:
    print("The 'home_sales' DataFrame is not cached.")


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------