In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.1'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ Packages [78.0 kB]
Hit:6 http://archive.ubuntu.com/ubuntu focal InRelease
Get:7 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Get:8 http://security.ubuntu.com/ubuntu focal-security/main amd64 Packages [2,773 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Get:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease [18.1 kB]
Get:11 http://security.ubuntu.com/ubuntu focal-security/universe amd64 Packages [1,056 kB]
Get:12 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/u

In [2]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [3]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [11]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)
homesales_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True, ignoreLeadingWhiteSpace=True)
homesales_df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [22]:
# 2. Create a temporary view of the DataFrame.

homesales_df.createOrReplaceTempView('home_sales')

In [23]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?

avg_price_4bedroom = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_data where bedrooms == 4 group by date_built ORDER BY date_built desc")
avg_price_4bedroom.show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           296576.69|      2017|
|           296050.24|      2016|
|           307908.86|      2015|
|           299073.89|      2014|
|           299999.39|      2013|
|           298233.42|      2012|
|            302141.9|      2011|
|           296800.75|      2010|
+--------------------+----------+



In [24]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?

avg_price_3bed_3bath = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_data where bedrooms == 3 AND bathrooms == 3 group by date_built ORDER BY date_built desc")
avg_price_3bed_3bath.show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           292676.79|      2017|
|           290555.07|      2016|
|            288770.3|      2015|
|           290852.27|      2014|
|           295962.27|      2013|
|           293683.19|      2012|
|           291117.47|      2011|
|           292859.62|      2010|
+--------------------+----------+



In [25]:
#  5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

avg_price_3bed_3bath_2floors = spark.sql("SELECT ROUND(AVG(price),2), date_built from home_data where bedrooms == 3 AND bathrooms == 3 AND floors == 2 AND sqft_living >= 2000 group by date_built ORDER BY date_built desc")
avg_price_3bed_3bath_2floors.show()

+--------------------+----------+
|round(avg(price), 2)|date_built|
+--------------------+----------+
|           280317.58|      2017|
|            293965.1|      2016|
|           297609.97|      2015|
|           298264.72|      2014|
|           303676.79|      2013|
|           307539.97|      2012|
|           276553.81|      2011|
|           285010.22|      2010|
+--------------------+----------+



In [26]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("SELECT ROUND(AVG(price),2) from home_data where price >= 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+
|round(avg(price), 2)|
+--------------------+
|           473796.26|
+--------------------+

--- 0.28972315788269043 seconds ---


In [27]:
# 7. Cache the the temporary table home_sales.

spark.sql("cache table home_data")

DataFrame[]

In [28]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [29]:
# 9. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

spark.sql("SELECT ROUND(AVG(price),2) from home_data where price >= 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))


+--------------------+
|round(avg(price), 2)|
+--------------------+
|           473796.26|
+--------------------+

--- 0.22892975807189941 seconds ---


In [30]:
print("the cached runtime of 0.307 seconds was shorter than the original runtime of 1.15 seconds")

the cached runtime of 0.307 seconds was shorter than the original runtime of 1.15 seconds


In [31]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 

homesales_df.write.parquet('home_parquet', mode='overwrite')


In [32]:
# 11. Read the formatted parquet data.

parquet_home_df = spark.read.parquet('home_parquet')

In [33]:
# 12. Create a temporary table for the parquet data.

parquet_home_df.createOrReplaceTempView('parquet_temp_home')

In [34]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()

spark.sql("SELECT ROUND(AVG(price),2) from parquet_temp_home where price >= 350000").show()

print("--- %s seconds ---" % (time.time() - start_time))
print("the parquet runtime is slightly shorter than the cached runtime")

+--------------------+
|round(avg(price), 2)|
+--------------------+
|           473796.26|
+--------------------+

--- 0.5266718864440918 seconds ---
the parquet runtime is slightly shorter than the cached runtime


In [36]:
# 14. Uncache the home_sales temporary table.

spark.sql("uncache table home_sales")

DataFrame[]

In [37]:
# 15. Check if the home_sales is no longer cached

spark.catalog.isCached('home_sales')


False