In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.2'
spark_version = 'spark-3.2.4'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu focal-security InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (52.85.151.129)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
                                                                               Hit:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease
                                                                               Hit:4 http://archive.ubuntu.com/ubuntu focal-backports InRelease
                                                                               Hit:5 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:9 http:/

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

In [4]:

# add the url to the spark
spark.sparkContext.addFile(url)

# make the DataFrame
homeDF = spark.read.csv(
    SparkFiles.get("home_sales_revised.csv"),
    sep = ',',
    header = True
)

# display the DataFrame
homeDF.show(5)



+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [7]:
# 2. Create a temporary view of the DataFrame
homeDF.createOrReplaceTempView("home_sales")

In [8]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
fourbedroom = spark.sql("SELECT date_built, ROUND(AVG(price), 2) AS average_price FROM home_sales WHERE bedrooms = 4 GROUP BY date_built")

# display the result
fourbedroom.show()

+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2016|    296050.24|
|      2012|    298233.42|
|      2017|    296576.69|
|      2014|    299073.89|
|      2013|    299999.39|
|      2011|     302141.9|
|      2015|    307908.86|
|      2010|    296800.75|
+----------+-------------+



In [9]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
threebedroom = spark.sql("SELECT date_built, ROUND(AVG(price), 2) AS average_price FROM home_sales WHERE bedrooms = 3 AND bathrooms = 3 GROUP BY date_built")

# display the result
threebedroom.show()

+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2016|    290555.07|
|      2012|    293683.19|
|      2017|    292676.79|
|      2014|    290852.27|
|      2013|    295962.27|
|      2011|    291117.47|
|      2015|     288770.3|
|      2010|    292859.62|
+----------+-------------+



In [10]:
#  5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

twothousandsqft = spark.sql("SELECT date_built, ROUND(AVG(price), 2) AS average_price FROM home_sales WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000 GROUP BY date_built")

# display the result
twothousandsqft.show()

+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2016|     293965.1|
|      2012|    307539.97|
|      2017|    280317.58|
|      2014|    298264.72|
|      2013|    303676.79|
|      2011|    276553.81|
|      2015|    297609.97|
|      2010|    285010.22|
+----------+-------------+



In [11]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query..

start_time = time.time()

# create the variable
viewrating = spark.sql("SELECT view, ROUND(AVG(price), 2) FROM home_sales GROUP BY view HAVING ROUND(AVG(price),2) >= 350000")
# show the variable
viewrating.show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  51|           788128.21|
|  54|           798684.82|
|  69|           750537.94|
|  87|           1072285.2|
|  73|           752861.18|
|  64|           767036.67|
|  59|            791453.0|
|  85|          1056336.74|
|  52|           733780.26|
|  71|            775651.1|
|  98|          1053739.33|
|  99|          1061201.42|
|  96|          1017815.92|
| 100|           1026669.5|
|  70|           695865.58|
|  61|           746877.59|
|  75|          1114042.94|
|  78|          1080649.37|
|  89|          1107839.15|
|  77|          1076205.56|
+----+--------------------+
only showing top 20 rows

--- 1.507678747177124 seconds ---


In [12]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable("home_sales")

In [13]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [15]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()
# create the variable
viewrating2 = spark.sql("SELECT view, ROUND(AVG(price), 2) FROM home_sales GROUP BY view HAVING ROUND(AVG(price),2) >= 350000")
# show the variable
viewrating2.show()


print("--- %s seconds ---" % (time.time() - start_time))


+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  51|           788128.21|
|  54|           798684.82|
|  69|           750537.94|
|  87|           1072285.2|
|  73|           752861.18|
|  64|           767036.67|
|  59|            791453.0|
|  85|          1056336.74|
|  52|           733780.26|
|  71|            775651.1|
|  98|          1053739.33|
|  99|          1061201.42|
|  96|          1017815.92|
| 100|           1026669.5|
|  70|           695865.58|
|  61|           746877.59|
|  75|          1114042.94|
|  78|          1080649.37|
|  89|          1107839.15|
|  77|          1076205.56|
+----+--------------------+
only showing top 20 rows

--- 0.3867068290710449 seconds ---


In [23]:
# determine the cached time vs original runtime
print(f"The orignal runtime was 1.507678747177124, after cacheing the data it was 0.3867068290710449 seconds.")

The orignal runtime was 1.507678747177124, after cacheing the data it was 0.3867068290710449 seconds.


In [17]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
homeDF.write.partitionBy("date_built").parquet('home_sales', mode='overwrite')

In [18]:
# 11. Read the formatted parquet data.
homeparquetDF = spark.read.parquet('home_sales')

In [19]:
# 12. Create a temporary table for the parquet data.
homeparquetDF.createOrReplaceTempView("homeparquet")

In [21]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

# calculate the start time
start_time = time.time()
# create the variable
viewparquet = spark.sql("SELECT view, ROUND(AVG(price), 2) FROM homeparquet GROUP BY view HAVING ROUND(AVG(price),2) >= 350000")
# show the variable
viewparquet.show()

# print the time
print("--- %s seconds ---" % (time.time() - start_time))

+----+--------------------+
|view|round(avg(price), 2)|
+----+--------------------+
|  51|           788128.21|
|  54|           798684.82|
|  69|           750537.94|
|  73|           752861.18|
|  87|           1072285.2|
|  64|           767036.67|
|  59|            791453.0|
|  85|          1056336.74|
|  52|           733780.26|
|  71|            775651.1|
|  98|          1053739.33|
|  99|          1061201.42|
|  96|          1017815.92|
| 100|           1026669.5|
|  70|           695865.58|
|  61|           746877.59|
|  75|          1114042.94|
|  78|          1080649.37|
|  89|          1107839.15|
|  77|          1076205.56|
+----+--------------------+
only showing top 20 rows

--- 0.49003028869628906 seconds ---


In [24]:
# determine the cached time vs parquet time
print(f"The cached runtime was 0.3867068290710449 vs the parqueted time which was a little higher at 0.49003028869628906 seconds.")

The cached runtime was 0.3867068290710449 vs the parqueted time which was a little higher at 0.49003028869628906 seconds.


In [28]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [29]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_sales')

False