In [14]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [16]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

24/11/14 12:33:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [18]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"


In [20]:
# Download the file using SparkFiles
spark.sparkContext.addFile(url)

In [22]:
# Load the dataset into a DataFrame
file_path = SparkFiles.get("home_sales_revised.csv")
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the schema and preview the data
df.printSchema()
df.show(5)

# Save the raw DataFrame as Parquet
df.write.parquet("output/home_sales_raw")

root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|

                                                                                

In [24]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView("home_sales")


In [26]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
spark.sql("""
    SELECT 
        year(date) AS year_sold, 
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY year_sold
    ORDER BY year_sold
""").show()



+---------+---------+
|year_sold|avg_price|
+---------+---------+
|     2019| 300263.7|
|     2020|298353.78|
|     2021|301819.44|
|     2022|296363.88|
+---------+---------+



**Analysis**
The average price for four-bedroom homes fluctuated slightly between 2019 and 2022.
The highest average price occurred in 2021 at $301,819.44.
The lowest average price occurred in 2022 at $296,363.88, indicating a slight dip in prices.
This trend suggests that prices remained relatively stable over the years, with small variations likely due to market conditions or demand.

In [28]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
spark.sql("""
    SELECT 
        date_built AS year_built,
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
""").show()



+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|292859.62|
|      2011|291117.47|
|      2012|293683.19|
|      2013|295962.27|
|      2014|290852.27|
|      2015| 288770.3|
|      2016|290555.07|
|      2017|292676.79|
+----------+---------+




**Analysis**
The average price for homes with 3 bedrooms and 3 bathrooms remained relatively consistent between 2010 and 2017.
The highest average price occurred in 2013 at $295,962.27.
The lowest average price occurred in 2015 at $288,770.30.
The data suggests a slight dip in prices around 2014–2015, followed by stabilization in later years.
Overall, the variations in price across the years appear to be minor, indicating a stable market for these homes during this period.

In [30]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
spark.sql("""
    SELECT 
        date_built AS year_built,
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 
      AND bathrooms = 3
      AND floors = 2
      AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
""").show()



+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|285010.22|
|      2011|276553.81|
|      2012|307539.97|
|      2013|303676.79|
|      2014|298264.72|
|      2015|297609.97|
|      2016| 293965.1|
|      2017|280317.58|
+----------+---------+



**Analysis**
The average price for homes meeting the criteria shows a peak in 2012 at $307,539.97.
The lowest average price occurred in 2011, with $276,553.81.
Between 2012 and 2015, the average prices remained relatively stable, staying close to or slightly above $300,000.
By 2017, the average price decreased to $280,317.58, indicating a slight downward trend in prices for this category of homes.
Overall, homes meeting these criteria appear to have experienced small price fluctuations across the years, with a noticeable dip in 2011 and a peak in 2012.

In [32]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating. 
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

# Start the timer
start_time = time.time()

# SQL query to calculate the average price per view rating
result = spark.sql("""
    SELECT 
        view AS view_rating,
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view_rating DESC
""")

# Show the result
result.show()

# Stop the timer and print the run time
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.2615368366241455 seconds ---


**Analysis**
View ratings 91 and 97 had the highest average prices, with $1,137,372.73 and $1,129,040.15, respectively.
The lowest average price in this range of view ratings is $970,402.55, which corresponds to view rating 92.
The results indicate that homes with high view ratings consistently command premium prices, with most averages exceeding $1,000,000.
The run time for the query was efficient at 0.2615 seconds, demonstrating Spark’s capability to handle large datasets efficiently.

In [38]:
# Cache the temporary table home_sales
spark.sql("CACHE TABLE home_sales")

# Force materialization by running a query
spark.sql("SELECT * FROM home_sales").show()

# Verify the table is cached
print("Is 'home_sales' cached? ", spark.catalog.isCached("home_sales"))

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

24/11/14 12:54:35 WARN CacheManager: Asked to cache already cached data.


In [40]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [42]:
# 9. Using the cached data, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

# SQL query to calculate the average price per view rating
result = spark.sql("""
    SELECT 
        view AS view_rating,
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view_rating DESC
""")

# Show the results
result.show()

# Stop the timer and print the runtime
print("--- %s seconds ---" % (time.time() - start_time))


+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.12098884582519531 seconds ---


**Analysis**
The cached query ran approximately 2.5 times faster than the uncached query.
Caching the data significantly reduces query execution time by avoiding repeated reads from the underlying dataset.
This demonstrates the effectiveness of caching for iterative or frequent queries in Spark.

In [44]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 
df.write.partitionBy("date_built").parquet("home_sales_partitioned")

In [46]:
# List the partitioned directory contents
import os
for root, dirs, files in os.walk("home_sales_partitioned"):
    print(root)

home_sales_partitioned
home_sales_partitioned/date_built=2013
home_sales_partitioned/date_built=2014
home_sales_partitioned/date_built=2015
home_sales_partitioned/date_built=2012
home_sales_partitioned/date_built=2017
home_sales_partitioned/date_built=2010
home_sales_partitioned/date_built=2011
home_sales_partitioned/date_built=2016


Partitioning the data by date_built organizes the dataset into smaller, manageable chunks.
This structure improves query performance by allowing Spark to scan only the relevant partitions instead of the entire dataset.
Parquet format ensures efficient storage and retrieval for further analysis.


In [48]:
# 11. Read the formatted parquet data.
# Read the Parquet data
parquet_df = spark.read.parquet("home_sales_partitioned")

# Show the schema to confirm successful loading
parquet_df.printSchema()

# Preview the first few rows
parquet_df.show(5)


root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- date_built: integer (nullable = true)

+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|

The Parquet data was successfully loaded into a new DataFrame, parquet_df.
The schema matches the original dataset, with date_built recognized as a partition field.
This allows for efficient queries, especially when filtering by the partitioned field (date_built).


In [50]:
# 12. Create a temporary table for the parquet data.
# Create a temporary table for the Parquet data
parquet_df.createOrReplaceTempView("home_sales_parquet")

# Verify the table was created by listing all tables
print(spark.catalog.listTables())


[Table(name='home_sales', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='home_sales_parquet', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


The temporary table home_sales_parquet was successfully created from the Parquet data.
This table allows for SQL-based queries on the Parquet data within the Spark session.
Temporary tables are session-specific and need to be recreated in new sessions if required.

In [52]:
# 13. Using the parquet DataFrame, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

# Run the query on the Parquet DataFrame
result = spark.sql("""
    SELECT 
        view AS view_rating,
        ROUND(AVG(price), 2) AS avg_price
    FROM home_sales_parquet
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view_rating DESC
""")

# Show the results
result.show()

# Stop the timer and print the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.3136897087097168 seconds ---


**Analysis**
Querying the cached table was significantly faster (0.12 seconds) compared to querying the Parquet DataFrame (0.31 seconds).
Cached Table:
Data is already loaded in memory, reducing I/O operations.
Ideal for frequent or iterative queries within the same session.
Parquet DataFrame:
Data is read from disk, which involves more I/O overhead.
Better suited for long-term storage or one-time queries where caching is unnecessary.
This comparison highlights the importance of caching for improving query performance when working with large datasets.

In [54]:
# 14. Uncache the home_sales temporary table.
spark.sql("UNCACHE TABLE home_sales")

DataFrame[]

Uncaching removes the home_sales table from memory but does not delete the table or data.
This is useful for freeing up memory when the cached table is no longer needed.
Rerunning the notebook will recreate the cache if necessary, ensuring all operations work as expected.

In [56]:
# 15. Check if the home_sales is no longer cached
print("Is 'home_sales' cached? ", spark.catalog.isCached("home_sales"))


Is 'home_sales' cached?  False


The home_sales table was successfully uncached, freeing up memory.
Subsequent queries on the table will run without the performance benefit of caching.
This step ensures proper memory management, particularly when working with large datasets in Spark.
