In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.2'
spark_version = 'spark-3.5.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/129 kB 11%] [Connecting to cloud.r-projec                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.82)] [1 InRelease 129 kB/129 kB 100%] [Connected to                                                                                                     Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
                                                                                                    Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
                                                                                                    Hit:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRel

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"


spark.sparkContext.addFile(url)
home_sales_data = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

# Show DataFrame
home_sales_data.show()


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [4]:
# 2. Create a temporary view of the DataFrame.

home_sales_data.createOrReplaceTempView('home_sales')

In [5]:
# Schema information
home_sales_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [6]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?

avg_price_4_bedroom = spark.sql("""SELECT
                                YEAR(date) as `Year Sold`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                where
                                  bedrooms = 4
                                group by
                                  YEAR(date)
                                ORDER BY
                                  YEAR(date) desc
                                  """)

avg_price_4_bedroom.show()

+---------+---------+
|Year Sold|AVG Price|
+---------+---------+
|     2022|296363.88|
|     2021|301819.44|
|     2020|298353.78|
|     2019| 300263.7|
+---------+---------+



In [7]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avg_price_3_bed_3_bath = spark.sql("""SELECT
                                date_built as `Year Built`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                where
                                  bedrooms = 3 and bathrooms = 3
                                group by
                                  date_built
                                ORDER BY
                                  date_built desc
                                  """)

avg_price_3_bed_3_bath.show()

+----------+---------+
|Year Built|AVG Price|
+----------+---------+
|      2017|292676.79|
|      2016|290555.07|
|      2015| 288770.3|
|      2014|290852.27|
|      2013|295962.27|
|      2012|293683.19|
|      2011|291117.47|
|      2010|292859.62|
+----------+---------+



In [8]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
avg_price_3_bed_3_bath_2_floors = spark.sql("""SELECT
                                              date_built as `Year Built`,
                                              ROUND(AVG(price),2) as `AVG Price`
                                              from
                                                home_sales
                                              where
                                                bedrooms = 3 and bathrooms = 3 and floors = 2 and sqft_living >= 2000
                                              group by
                                                date_built
                                              ORDER BY
                                                date_built desc
                                                """)

avg_price_3_bed_3_bath_2_floors.show()


+----------+---------+
|Year Built|AVG Price|
+----------+---------+
|      2017|280317.58|
|      2016| 293965.1|
|      2015|297609.97|
|      2014|298264.72|
|      2013|303676.79|
|      2012|307539.97|
|      2011|276553.81|
|      2010|285010.22|
+----------+---------+



In [9]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()
avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                group by
                                  view
                                having
                                AVG(price) >= 350000
                                order by
                                  view desc
                                    """)

avg_price_view.show()


print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View Rating| AVG Price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 2.001127004623413 seconds ---


In [10]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table home_sales")

DataFrame[]

In [11]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [12]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()
avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                group by
                                  view
                                having
                                AVG(price) >= 350000
                                order by
                                  view desc
                                    """)

avg_price_view.show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View Rating| AVG Price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.807790994644165 seconds ---


In [13]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_sales_data.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")

In [14]:
# 11. Read the parquet formatted data.
p_home_sales_data_p=spark.read.parquet('home_sales_partitioned')

In [15]:
# 12. Create a temporary table for the parquet data 'p_home_sales_p'.
p_home_sales_data_p.createOrReplaceTempView('p_home_sales_p')

In [16]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()
avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  p_home_sales_p
                                group by
                                  view
                                having
                                AVG(price) >= 350000
                                order by
                                  view desc
                                    """)

avg_price_view.show()


print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View Rating| AVG Price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 1.0689365863800049 seconds ---


In [17]:
# 14. Partition by the "view" field on the formatted parquet "home_sales_partitioned2"
home_sales_data.write.partitionBy("view").mode("overwrite").parquet("home_sales_partitioned2")

In [18]:
# 15. Read the parquet formatted data ('home_sales_partitioned2').
p_home_sales_data_p2=spark.read.parquet('home_sales_partitioned2')


In [19]:
 # 16. Create a temporary table for the parquet data 'p_home_sales_p2'.
 p_home_sales_data_p2.createOrReplaceTempView('p_home_sales_p2')

In [20]:
 # 17. Using the parquet DataFrame (p_home_sales_p2), run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.
start_time = time.time()

avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  p_home_sales_p2
                                group by
                                  view
                                having
                                AVG(price) >= 350000
                                order by
                                  view desc
                                    """)

avg_price_view.show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|View Rating| AVG Price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 1.6150062084197998 seconds ---


In [21]:
# 17. calculate the average price of home, provide is measuring the time it takes to execute the SQL query that groups home sales by the view rating and calculates the average price for homes with a view rating of 100.
start_time = time.time()
avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                where
                                  view = 100
                                group by
                                  view
                               """)

avg_price_view.show()
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+---------+
|View Rating|AVG Price|
+-----------+---------+
|        100|1026669.5|
+-----------+---------+

--- 0.5936989784240723 seconds ---


In [22]:
# 18. Uncache the home_sales temporary table.

spark.sql("uncache table home_sales")

DataFrame[]

In [23]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")


False

In [24]:
# After uncaching the home_sales temporary table, you are running a SQL query to calculate the average price of homes with a view rating of 100

start_time = time.time()
avg_price_view = spark.sql("""SELECT
                                view as `View Rating`,
                                ROUND(AVG(price),2) as `AVG Price`
                                from
                                  home_sales
                                where
                                  view = 100
                                group by
                                  view
                               """)

avg_price_view.show()
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+---------+
|View Rating|AVG Price|
+-----------+---------+
|        100|1026669.5|
+-----------+---------+

--- 0.4945995807647705 seconds ---
