In [1]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

from pyspark import SparkFiles

from pyspark.sql.functions import *
from pyspark.sql.functions import count, mean, min, max
from pyspark.sql.functions import month, year
from pyspark.sql.functions import date_format
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField, StringType, DateType

In [22]:
# Create a SparkSession

spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

# Set the partitions to 4 or 8. 
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [4]:
# 1. Read in the AWS S3 bucket into a DataFrame.

homes_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

# Read in data from s3 bucket

spark.sparkContext.addFile(homes_url)
home_sales_df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)

# Show the home sales data.
home_sales_df.show()

display(home_sales_df)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

DataFrame[id: string, date: string, date_built: string, price: string, bedrooms: string, bathrooms: string, sqft_living: string, sqft_lot: string, floors: string, waterfront: string, view: string]

In [5]:
# display the schema
home_sales_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [6]:
# format datatypes
home_sales_df = home_sales_df.withColumn("bedrooms", home_sales_df["bedrooms"].cast(IntegerType()))
home_sales_df = home_sales_df.withColumn("bathrooms", home_sales_df["bathrooms"].cast(IntegerType()))
home_sales_df = home_sales_df.withColumn("floors", home_sales_df["floors"].cast(IntegerType()))
home_sales_df = home_sales_df.withColumn("waterfront", home_sales_df["waterfront"].cast(IntegerType()))
home_sales_df = home_sales_df.withColumn("view", home_sales_df["view"].cast(IntegerType()))
home_sales_df = home_sales_df.withColumn("price", home_sales_df["price"].cast(DoubleType()))
home_sales_df = home_sales_df.withColumn("sqft_living", home_sales_df["sqft_living"].cast(DoubleType()))
home_sales_df = home_sales_df.withColumn("sqft_lot", home_sales_df["sqft_lot"].cast(DoubleType()))
home_sales_df = home_sales_df.withColumn("date", to_date(home_sales_df["date"], "yyyy-mm-dd"))
home_sales_df = home_sales_df.withColumn("date_built", year(home_sales_df["date_built"]))

# check results
home_sales_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: double (nullable = true)
 |-- sqft_lot: double (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [7]:
# 2. Create a temporary view of the DataFrame.

home_sales_df.createOrReplaceTempView('home_sales')


In [8]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?

spark.sql("""select year(date) as year_sold, format_number(mean(price), '#,###.##') as average_price 
from home_sales 
where bedrooms = 4 
group by 1 
sort by year_sold""").show()


+---------+-------------+
|year_sold|average_price|
+---------+-------------+
|     2019|    300,263.7|
|     2020|   298,353.78|
|     2021|   301,819.44|
|     2022|   296,363.88|
+---------+-------------+



In [9]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?

spark.sql("""select date_built, format_number(mean(price), '#,###.##') as average_price 
from home_sales 
where bedrooms = 3 AND bathrooms = 3 
group by 1 
sort by date_built""").show()


+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2010|   292,859.62|
|      2011|   291,117.47|
|      2012|   293,683.19|
|      2013|   295,962.27|
|      2014|   290,852.27|
|      2015|    288,770.3|
|      2016|   290,555.07|
|      2017|   292,676.79|
+----------+-------------+



In [10]:
#  5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

spark.sql("""select date_built, format_number(mean(price), '#,###.##') as average_price 
from home_sales 
where bedrooms = 3 AND bathrooms = 3 AND floors = 2 and sqft_living >= 2000
group by 1 
sort by date_built""").show()


+----------+-------------+
|date_built|average_price|
+----------+-------------+
|      2010|   285,010.22|
|      2011|   276,553.81|
|      2012|   307,539.97|
|      2013|   303,676.79|
|      2014|   298,264.72|
|      2015|   297,609.97|
|      2016|    293,965.1|
|      2017|   280,317.58|
+----------+-------------+



In [11]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("""select view, format_number(mean(price), '#,###.##') as average_price 
from home_sales 
where price >= 350000
group by 1 
sort by view desc""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|average_price|
+----+-------------+
| 100|  1,026,669.5|
|  99| 1,061,201.42|
|  98| 1,053,739.33|
|  97| 1,129,040.15|
|  96| 1,017,815.92|
|  95|  1,054,325.6|
|  94|  1,033,536.2|
|  93| 1,026,006.06|
|  92|   970,402.55|
|  91| 1,137,372.73|
|  90| 1,062,654.16|
|  89| 1,107,839.15|
|  88| 1,031,719.35|
|  87|  1,072,285.2|
|  86| 1,070,444.25|
|  85| 1,056,336.74|
|  84| 1,117,233.13|
|  83| 1,033,965.93|
|  82|    1,063,498|
|  81| 1,053,472.79|
+----+-------------+
only showing top 20 rows

--- 0.410900354385376 seconds ---


In [12]:
# 7. Cache the temporary table home_sales.

spark.sql("cache table home_sales")

DataFrame[]

In [13]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [14]:
# 9. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

spark.sql("""select view, format_number(mean(price), '#,###.##') as average_price 
from home_sales 
where price >= 350000
group by 1 
sort by view desc""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+----+-------------+
|view|average_price|
+----+-------------+
| 100|  1,026,669.5|
|  99| 1,061,201.42|
|  98| 1,053,739.33|
|  97| 1,129,040.15|
|  96| 1,017,815.92|
|  95|  1,054,325.6|
|  94|  1,033,536.2|
|  93| 1,026,006.06|
|  92|   970,402.55|
|  91| 1,137,372.73|
|  90| 1,062,654.16|
|  89| 1,107,839.15|
|  88| 1,031,719.35|
|  87|  1,072,285.2|
|  86| 1,070,444.25|
|  85| 1,056,336.74|
|  84| 1,117,233.13|
|  83| 1,033,965.93|
|  82|    1,063,498|
|  81| 1,053,472.79|
+----+-------------+
only showing top 20 rows

--- 0.24933195114135742 seconds ---


In [15]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 

spark.sql("""select date_built, id, date, price, bedrooms, bathrooms, sqft_living, sqft_lot, floors, waterfront, view
from home_sales 
sort by date_built""").show(truncate=False)

home_sales_df.write.parquet('parquet_sales', mode='overwrite')

+----------+------------------------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+
|date_built|id                                  |date      |price   |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+----------+------------------------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+
|2010      |8d54a71b-c520-44e5-8ba1-5a84be03ad35|2019-01-21|323956.0|2       |3        |1506.0     |11816.0 |1     |0         |25  |
|2010      |f5e01433-f7b0-44e5-96d9-1e190b721af5|2021-01-10|335326.0|4       |3        |1588.0     |14107.0 |1     |0         |8   |
|2010      |15a5830f-1529-4c57-9030-495d706dda9d|2019-01-06|843191.0|3       |6        |5086.0     |14637.0 |2     |0         |91  |
|2010      |63640c17-973d-4f59-9db6-992609006a22|2019-01-14|148400.0|2       |3        |2338.0     |11275.0 |2     |0         |5   |
|2010      |209c1286-0834-4635-bf48-c84eead14852|2021-01-04|395178.0|

In [16]:
# 11. Read the formatted parquet data.

p_home_sales_df=spark.read.parquet('parquet_sales')

In [17]:
# 12. Create a temporary table for the parquet data.

p_home_sales_df.createOrReplaceTempView('p_home_sales')

In [18]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()

spark.sql("""select view, format_number(mean(price), '#,###.##') as average_price 
from p_home_sales 
where price >= 350000
group by 1 
sort by view desc""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|average_price|
+----+-------------+
| 100|  1,026,669.5|
|  99| 1,061,201.42|
|  98| 1,053,739.33|
|  97| 1,129,040.15|
|  96| 1,017,815.92|
|  95|  1,054,325.6|
|  94|  1,033,536.2|
|  93| 1,026,006.06|
|  92|   970,402.55|
|  91| 1,137,372.73|
|  90| 1,062,654.16|
|  89| 1,107,839.15|
|  88| 1,031,719.35|
|  87|  1,072,285.2|
|  86| 1,070,444.25|
|  85| 1,056,336.74|
|  84| 1,117,233.13|
|  83| 1,033,965.93|
|  82|    1,063,498|
|  81| 1,053,472.79|
+----+-------------+
only showing top 20 rows

--- 0.3899552822113037 seconds ---


In [19]:
# 13a. Running the view query a second time, as in our class examples, to "eliminate the load time."
start_time = time.time()

spark.sql("""select view, format_number(mean(price), '#,###.##') as average_price 
from p_home_sales 
where price >= 350000
group by 1 
sort by view desc""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|average_price|
+----+-------------+
| 100|  1,026,669.5|
|  99| 1,061,201.42|
|  98| 1,053,739.33|
|  97| 1,129,040.15|
|  96| 1,017,815.92|
|  95|  1,054,325.6|
|  94|  1,033,536.2|
|  93| 1,026,006.06|
|  92|   970,402.55|
|  91| 1,137,372.73|
|  90| 1,062,654.16|
|  89| 1,107,839.15|
|  88| 1,031,719.35|
|  87|  1,072,285.2|
|  86| 1,070,444.25|
|  85| 1,056,336.74|
|  84| 1,117,233.13|
|  83| 1,033,965.93|
|  82|    1,063,498|
|  81| 1,053,472.79|
+----+-------------+
only showing top 20 rows

--- 0.20246005058288574 seconds ---


#### With the load time eliminated, the parquet time of approximately 0.163976 seconds, above, is faster than the time to run the same query on the cached table, which is approximately 0.194511 seconds.  With a small data set, we expect that the differences will be small.

In [20]:
# 14. Uncache the home_sales temporary table.

spark.sql("uncache table home_sales")

DataFrame[]

In [21]:
# 15. Check if the home_sales is no longer cached

if spark.catalog.isCached("home_sales"):
  print("The home sales table is till cached")
else:
  print("All clear, cache removed")


All clear, cache removed
