In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.2'
spark_version = 'spark-3.4.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [673 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,065 kB]

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [4]:
# 2. Create a temporary view of the DataFrame.
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
df.show()



+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [5]:
df.createOrReplaceTempView('home_sales')

In [6]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [7]:
#Converting the data types of teh following columns
from pyspark.sql.functions import col, year
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType

df = df.withColumn("date", col("date").cast("date"))
df = df.withColumn("price", df["price"].cast(IntegerType()))
df = df.withColumn("bedrooms", df["bedrooms"].cast(IntegerType()))
df = df.withColumn("bathrooms", df["bathrooms"].cast(IntegerType()))
df = df.withColumn("sqft_living", df["sqft_living"].cast(IntegerType()))
df = df.withColumn("sqft_lot", df["sqft_lot"].cast(IntegerType()))
df = df.withColumn("floors", df["floors"].cast(IntegerType()))
df = df.withColumn("view", df["view"].cast(IntegerType()))

# Extract the year from the "date" column
df = df.withColumn("year", year("date"))


df.printSchema()


root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: integer (nullable = true)
 |-- year: integer (nullable = true)



In [8]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
from pyspark.sql.functions import mean, format_number


#Filtering data to just look at homes with a four bedroom
four_bedroom = df.filter("bedrooms = 4")

#Calculating the median price of a four bedroom house for each year
four_bedroom_by_year = four_bedroom.groupBy("year").avg("price")
four_bedroom_by_year.show()

# Format the "price" column to two decimal places
formatted_df = four_bedroom_by_year.withColumn("avg(price)", format_number("avg(price)", 2))

# Show the result
formatted_df.show(truncate=False)



+----+------------------+
|year|        avg(price)|
+----+------------------+
|2022| 296363.8845050215|
|2019| 300263.6955128205|
|2020|298353.78003169573|
|2021|   301819.44398864|
+----+------------------+

+----+----------+
|year|avg(price)|
+----+----------+
|2022|296,363.88|
|2019|300,263.70|
|2020|298,353.78|
|2021|301,819.44|
+----+----------+



In [9]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?

#Filtering the data to three bedrooms and 3 bathrooms
three_bed_bath = df.filter((col("bedrooms")==3) & (col("bathrooms")==3))
three_bed_bath.show()

#Calculating the median price of a 3 bedroom and 3 bathroom for each year
three_bed_bath_by_year = three_bed_bath.groupBy("year").avg("price")
three_bed_bath_by_year.show()

#Formatting price column to two decimal places
formatted_price = three_bed_bath_by_year.withColumn("avg(price)", format_number("avg(price)", 2))

#Show the result
formatted_df.show(truncate=False)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|year|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+----+
|e81aacfe-17fe-46b...|2020-06-16|      2016|181925|       3|        3|       2137|   11709|     2|         0|  22|2020|
|2ed8d509-7372-46d...|2021-08-06|      2015|258710|       3|        3|       1918|    9666|     1|         0|  25|2021|
|f876d86f-3c9f-42b...|2019-02-27|      2011|167864|       3|        3|       2471|   13924|     2|         0|  15|2019|
|941bad30-eb49-4a7...|2020-05-09|      2015|229896|       3|        3|       2197|    8641|     1|         0|   3|2020|
|ea620c7b-c2f7-4c6...|2021-05-31|      2011|437958|       3|        3|       2356|   11052|     1|         0|  26|2021|
|0cfe57f3-28c2-472...|2019-10-04|      2

In [24]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

#Filtering to three bedroom, three bath, with 2 floors and greater than or equal to 2,000 sqft.
two_floors_2000sqft = three_bed_bath.filter((col("floors") == 2) & (col("sqft_living")>=2000))
two_floors_2000sqft.show()

#Calculating the average price of a 3 bed, 3 bath, with 2 floors for each year
two_floors_2000sqft_by_year = two_floors_2000sqft.groupBy("year").avg("price")
two_floors_2000sqft_by_year.show()

#Formatting prices to two places after decimal
formatted_price = two_floors_2000sqft_by_year.withColumn("avg(price)", format_number("avg(price)", 2))

#Show result
formatted_price.show(truncate = False)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|year|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+----+
|e81aacfe-17fe-46b...|2020-06-16|      2016|181925|       3|        3|       2137|   11709|     2|         0|  22|2020|
|f876d86f-3c9f-42b...|2019-02-27|      2011|167864|       3|        3|       2471|   13924|     2|         0|  15|2019|
|4566cd2a-ac6e-435...|2019-07-15|      2016|177541|       3|        3|       2130|   10517|     2|         0|  25|2019|
|e0fc52aa-c349-4ba...|2019-04-17|      2016|202790|       3|        3|       2025|   10987|     2|         0|  19|2019|
|e252c4ce-b5b2-4ef...|2019-10-28|      2014|168463|       3|        3|       2271|    8317|     2|         0|   2|2019|
|bb2ff269-f08f-4e8...|2022-03-06|      2

In [11]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

query_sql = """
SELECT (ROUND(AVG(price), 2)), view
 FROM home_sales
 GROUP BY view
 HAVING AVG(price)>=350000
 ORDER BY view desc
 """

spark.sql(query_sql).show()


print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+----+
|round(avg(price), 2)|view|
+--------------------+----+
|          1061201.42|  99|
|          1053739.33|  98|
|          1129040.15|  97|
|          1017815.92|  96|
|           1054325.6|  95|
|           1033536.2|  94|
|          1026006.06|  93|
|           970402.55|  92|
|          1137372.73|  91|
|          1062654.16|  90|
|          1107839.15|  89|
|          1031719.35|  88|
|           1072285.2|  87|
|          1070444.25|  86|
|          1056336.74|  85|
|          1117233.13|  84|
|          1033965.93|  83|
|           1063498.0|  82|
|          1053472.79|  81|
|           991767.38|  80|
+--------------------+----+
only showing top 20 rows

--- 1.6999609470367432 seconds ---


In [13]:
# 7. Cache the the temporary table home_sales.
spark.sql('cache table home_sales')


DataFrame[]

In [16]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [17]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

query_sql = """
SELECT (ROUND(AVG(price), 2)), view
 FROM home_sales
 GROUP BY view
 HAVING AVG(price)>=350000
 ORDER BY view desc
 """

print("--- %s seconds ---" % (time.time() - start_time))


--- 0.00011229515075683594 seconds ---


#The run time for the cached data is 0.0001, compared to the original runtime of 1.7000. This is significantly faster than the original runtime indicating that caching the data can make queries much faster.

In [18]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").mode("overwrite").parquet("date_built_partitioned")

In [19]:
# 11. Read the parquet formatted data.
p_df=spark.read.parquet('date_built_partitioned')

In [20]:
# 12. Create a temporary table for the parquet data.
p_df.createOrReplaceTempView('p_date_built_p')

In [21]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

start_time = time.time()

query_sql = """
SELECT (ROUND(AVG(price), 2)), view
 FROM home_sales
 GROUP BY view
 HAVING AVG(price)>=350000
 ORDER BY view desc
 """


print("--- %s seconds ---" % (time.time() - start_time))

--- 8.296966552734375e-05 seconds ---


The runtime for the parquet data is 8.2970 compared to the runtime for the original 1.7000 and .0001 for the cached data, indicating that paraquet data is slower to run compared to both the original and cached data.

In [22]:
# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable("home_sales")

In [23]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('home_sales')


False