In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:9 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [53.5 kB]
Get:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/multiverse amd64 Packages [51.5 kB]
Get:13 http://archive.ubuntu.com/ubuntu ja

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [4]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header = True, inferSchema=True)
df.show()


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [7]:
# Import date time function for pulling the year out of 'date' column (22-2, activity 04)
from pyspark.sql.functions import year

# Create new df with the column year for year sold
df.select(year(df['date']))

# Save the year as a new column 'year_sold'
df = df.withColumn('year_sold', year(df['date']))
df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|year_sold|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|     2022|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|     2021|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|     2019|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|     2019|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|     2022|


In [8]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')


In [16]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  year_sold,
  bedrooms
FROM home_sales
WHERE bedrooms = 4
GROUP BY bedrooms, year_sold
ORDER BY year_sold DESC""").show()



+---------+---------+--------+
| AvgPrice|year_sold|bedrooms|
+---------+---------+--------+
|296363.88|     2022|       4|
|301819.44|     2021|       4|
|298353.78|     2020|       4|
| 300263.7|     2019|       4|
+---------+---------+--------+



In [17]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  date_built,
  bedrooms,
  bathrooms
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY bedrooms, bathrooms, date_built
ORDER BY date_built DESC""").show()


+---------+----------+--------+---------+
| AvgPrice|date_built|bedrooms|bathrooms|
+---------+----------+--------+---------+
|292676.79|      2017|       3|        3|
|290555.07|      2016|       3|        3|
| 288770.3|      2015|       3|        3|
|290852.27|      2014|       3|        3|
|295962.27|      2013|       3|        3|
|293683.19|      2012|       3|        3|
|291117.47|      2011|       3|        3|
|292859.62|      2010|       3|        3|
+---------+----------+--------+---------+



In [22]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  date_built
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY date_built
ORDER BY date_built DESC""").show()


+---------+----------+
| AvgPrice|date_built|
+---------+----------+
|280317.58|      2017|
| 293965.1|      2016|
|297609.97|      2015|
|298264.72|      2014|
|303676.79|      2013|
|307539.97|      2012|
|276553.81|      2011|
|285010.22|      2010|
+---------+----------+



In [23]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  view
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----------+----+
|  AvgPrice|view|
+----------+----+
| 1026669.5| 100|
|1061201.42|  99|
|1053739.33|  98|
|1129040.15|  97|
|1017815.92|  96|
| 1054325.6|  95|
| 1033536.2|  94|
|1026006.06|  93|
| 970402.55|  92|
|1137372.73|  91|
|1062654.16|  90|
|1107839.15|  89|
|1031719.35|  88|
| 1072285.2|  87|
|1070444.25|  86|
|1056336.74|  85|
|1117233.13|  84|
|1033965.93|  83|
| 1063498.0|  82|
|1053472.79|  81|
+----------+----+
only showing top 20 rows

--- 1.1995255947113037 seconds ---


In [24]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable('home_sales')

In [25]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [26]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  view
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+----------+----+
|  AvgPrice|view|
+----------+----+
| 1026669.5| 100|
|1061201.42|  99|
|1053739.33|  98|
|1129040.15|  97|
|1017815.92|  96|
| 1054325.6|  95|
| 1033536.2|  94|
|1026006.06|  93|
| 970402.55|  92|
|1137372.73|  91|
|1062654.16|  90|
|1107839.15|  89|
|1031719.35|  88|
| 1072285.2|  87|
|1070444.25|  86|
|1056336.74|  85|
|1117233.13|  84|
|1033965.93|  83|
| 1063498.0|  82|
|1053472.79|  81|
+----------+----+
only showing top 20 rows

--- 2.937605619430542 seconds ---


The time for the cached query to run is 2.94 seconds, where the uncached query took 1.20 seconds.

In [27]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data

df.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")

In [29]:
# 11. Read the parquet formatted data.
p_df = spark.read.parquet('home_sales_partitioned')
p_df.show()

+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|year_sold|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+---------+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|     2021|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|   3|     2020|      2015|
|c797ca12-52cd-4b1...|2019-06-08|288650|       2|        3|       2100|   10419|     2|         0|   7|     2019|      2015|
|0cfe57f3-28c2-472...|2019-10-04|308313|       3|        3|       1960|    9453|     2|         0|   2|     2019|      2015|
|d715f295-2fbf-4e9...|2021-05-17|391574|       3|        2|       1635|    8040|     2|         0|  10|     2021|      2015|


In [30]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('home_sales_partioned')

In [33]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql("""
SELECT
  ROUND (AVG(price),2) as AvgPrice,
  view
FROM home_sales
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----------+----+
|  AvgPrice|view|
+----------+----+
| 1026669.5| 100|
|1061201.42|  99|
|1053739.33|  98|
|1129040.15|  97|
|1017815.92|  96|
| 1054325.6|  95|
| 1033536.2|  94|
|1026006.06|  93|
| 970402.55|  92|
|1137372.73|  91|
|1062654.16|  90|
|1107839.15|  89|
|1031719.35|  88|
| 1072285.2|  87|
|1070444.25|  86|
|1056336.74|  85|
|1117233.13|  84|
|1033965.93|  83|
| 1063498.0|  82|
|1053472.79|  81|
+----------+----+
only showing top 20 rows

--- 0.7819457054138184 seconds ---


The partioned query took 0.78 seconds to run, versus the 2.94 seconds for the cached query. This is because similar data is grouped together and only the data pertinent to the query is evaluated.

In [34]:
# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable('home_sales')

In [35]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_sales')


False