In [1]:
# Import findspark and initialize.
import pandas as pd
import os
import findspark
import time

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark import SparkFiles
from pyspark.sql.functions import col
from pyspark.sql.functions import round, avg



In [2]:
# Import packages

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"


In [4]:
# 2. Create a temporary view of the DataFrame.
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [5]:
df.count()

33287

In [6]:
# Get the data types of the columns. 
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [7]:
from pyspark.sql.functions import year, avg, round, col

# Ensure correct data types
df = df.withColumn("sale_date", col("date").cast("date")) \
       .withColumn("price", col("price").cast("double")) \
       .withColumn("bedrooms", col("bedrooms").cast("int"))


In [8]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)
 |-- sale_date: date (nullable = true)



In [9]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?

# Filter for four-bedroom houses
four_bed_df = df.filter(col("bedrooms") == 4)

# Extract sale year and calculate average price per year
avg_price_per_year = (
    four_bed_df.withColumn("sale_year", year(col("sale_date")))
    .groupBy("sale_year")
    .agg(round(avg("price"), 2).alias("avg_price"))
    .orderBy("sale_year")
)

# Show results
avg_price_per_year.show()


+---------+---------+
|sale_year|avg_price|
+---------+---------+
|     2019| 300263.7|
|     2020|298353.78|
|     2021|301819.44|
|     2022|296363.88|
+---------+---------+



In [10]:
# 3.2 What is the average price for a four bedroom house sold per year, rounded to two decimal places?

# Ensure proper data types
df = df.withColumn("sale_date", col("date").cast("date")) \
       .withColumn("price", col("price").cast("double")) \
       .withColumn("bedrooms", col("bedrooms").cast("int"))

# Register DataFrame as a SQL temporary table
df.createOrReplaceTempView("home_data")

# Run SQL query
avg_price_4_bedroom = spark.sql("""
    SELECT YEAR(sale_date) AS year, ROUND(AVG(price), 2) AS avg_price
    FROM home_data
    WHERE bedrooms = 4
    GROUP BY YEAR(sale_date)
    ORDER BY YEAR(sale_date) DESC
""")

# Show results
avg_price_4_bedroom.show()


+----+---------+
|year|avg_price|
+----+---------+
|2022|296363.88|
|2021|301819.44|
|2020|298353.78|
|2019| 300263.7|
+----+---------+



In [11]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
# Ensure correct data types
df = df.withColumn("date_built", col("date_built").cast("int"))  # Ensure date_built is numeric

# Register DataFrame as a SQL temporary table
df.createOrReplaceTempView("home_data")

# Run SQL query
avg_price_3_bed_3_bath = spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_data
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built DESC
""")

# Show results
avg_price_3_bed_3_bath.show()



+----------+---------+
|date_built|avg_price|
+----------+---------+
|      2017|292676.79|
|      2016|290555.07|
|      2015| 288770.3|
|      2014|290852.27|
|      2013|295962.27|
|      2012|293683.19|
|      2011|291117.47|
|      2010|292859.62|
+----------+---------+



In [12]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?

# Ensure correct data types
df = df.withColumn("floors", col("floors").cast("int")) \
       .withColumn("sqft_living", col("sqft_living").cast("int")) \

# Register DataFrame as a SQL temporary table
df.createOrReplaceTempView("home_data")

# Run SQL query
avg_price_filtered = spark.sql("""
    SELECT date_built, 
           format_number(ROUND(AVG(price), 2), 2) AS avg_price
    FROM home_data
    WHERE bedrooms = 3 
    AND bathrooms = 3
    AND floors = 2
    AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built DESC
""")

avg_price_filtered.show(truncate=False)

+----------+----------+
|date_built|avg_price |
+----------+----------+
|2017      |280,317.58|
|2016      |293,965.10|
|2015      |297,609.97|
|2014      |298,264.72|
|2013      |303,676.79|
|2012      |307,539.97|
|2011      |276,553.81|
|2010      |285,010.22|
+----------+----------+



In [13]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.



# Ensure correct data types
df = df.withColumn("view", col("view").cast("int")) 

# Register DataFrame as a SQL temporary table
df.createOrReplaceTempView("home_data")

# Measure query execution time
start_time = time.time()

# Run SQL query
avg_price_filtered = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_data
    GROUP BY view
    HAVING avg_price >= 350000
    ORDER BY view DESC
""")

# Show results
avg_price_filtered.show()

# Calculate run time
end_time = time.time()
print(f"Query run time: {end_time - start_time:.4f} seconds")


+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

Query run time: 4.4938 seconds


In [14]:
# 7. Cache the the temporary table home_sales.
# Cache the table using SQL
spark.sql("cache table home_data")


DataFrame[]

In [15]:
# 8. Check if the table is cached.
# Check if the table is cached


In [16]:
df.createOrReplaceTempView("home_sales")
df.show()  # Check if the DataFrame has been loaded properly


+--------------------+----------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date|date_built|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view| sale_date|
+--------------------+----------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923.0|       4|        3|       3167|   11733|     2|         1|  76|2022-04-08|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628.0|       2|        2|       2235|   14384|     1|         0|  23|2021-06-13|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866.0|       2|        2|       2127|   10575|     2|         0|   0|2019-04-12|
|b672c137-b88c-48b...|2019-10-16|      2016|239895.0|       2|        2|       1631|   11149|     2|         0|   0|2019-10-16|
|e0726d4d-d595-407...|2022-01-08|      2017|424418.0|       3|        2|       2249|   13878|     2|    

In [17]:
df.createOrReplaceTempView("home_sales")
spark.catalog.listTables()  # List all tables


[Table(name='home_data', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='home_sales', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [18]:
# Create or replace the temporary view
df.createOrReplaceTempView("home_sales")

# Now run your SQL query
result = spark.sql("SELECT * FROM home_sales")
result.show()


+--------------------+----------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date|date_built|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view| sale_date|
+--------------------+----------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923.0|       4|        3|       3167|   11733|     2|         1|  76|2022-04-08|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628.0|       2|        2|       2235|   14384|     1|         0|  23|2021-06-13|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866.0|       2|        2|       2127|   10575|     2|         0|   0|2019-04-12|
|b672c137-b88c-48b...|2019-10-16|      2016|239895.0|       2|        2|       1631|   11149|     2|         0|   0|2019-10-16|
|e0726d4d-d595-407...|2022-01-08|      2017|424418.0|       3|        2|       2249|   13878|     2|    

In [19]:
# Check if the 'home_sales' table is cached
is_cached = spark.catalog.isCached("home_sales")
print(f"Is the 'home_sales' table cached? {is_cached}")


Is the 'home_sales' table cached? True


In [20]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.


# Start the timer
start_time = time.time()

# Execute the query
cached_result = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING avg_price >= 350000
    ORDER BY view DESC
""")

# Show results
cached_result.show()

# End timer and print execution time
print("--- %s seconds ---" % (time.time() - start_time))



+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 1.4993374347686768 seconds ---


In [21]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
# Define the output path for the partitioned Parquet data
output_path = "home_sales_partitioned"

# Write the DataFrame as a partitioned Parquet file
df.write.mode("overwrite").partitionBy("date_built").parquet(output_path)

print("Partitioned Parquet file saved successfully.")


Partitioned Parquet file saved successfully.


In [22]:
# 11. Read the formatted parquet data.
# Define the path to the partitioned Parquet data
parquet_path = "home_sales_partitioned"

# Read the Parquet file
df_parquet = spark.read.parquet(parquet_path)

# Show the schema to verify it's correctly loaded
df_parquet.printSchema()

# Display a sample of the data
df_parquet.show(5)


root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: integer (nullable = true)
 |-- sale_date: date (nullable = true)
 |-- date_built: integer (nullable = true)

+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|                  id|      date|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view| sale_date|date_built|
+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|2ed8d509-7372-46d...|2021-08-06|258710.0|       3|        3|       1918|    9666|     1|         0|  25|2021-08-06|      2015|
|941bad30

In [23]:
# Select the distinct years when homes were built and display them  
df_parquet.select("date_built").distinct().show()


+----------+
|date_built|
+----------+
|      2015|
|      2017|
|      2012|
|      2014|
|      2011|
|      2016|
|      2010|
|      2013|
+----------+



In [24]:
# Read data from the Parquet file located at 'parquet_path'
# and filter the DataFrame to only include rows where 'date_built' is 2016.
df_2000 = spark.read.parquet(parquet_path).filter("date_built == 2016")

# Display the contents of the filtered DataFrame.
df_2000.show()


+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|                  id|      date|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view| sale_date|date_built|
+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|f8a53099-ba1c-47d...|2022-04-08|936923.0|       4|        3|       3167|   11733|     2|         1|  76|2022-04-08|      2016|
|b672c137-b88c-48b...|2019-10-16|239895.0|       2|        2|       1631|   11149|     2|         0|   0|2019-10-16|      2016|
|e81aacfe-17fe-46b...|2020-06-16|181925.0|       3|        3|       2137|   11709|     2|         0|  22|2020-06-16|      2016|
|dd61eb34-6589-4c0...|2021-07-25|210247.0|       3|        2|       1672|   11986|     2|         0|  28|2021-07-25|      2016|
|f233cb41-6f33-4b0...|2021-07-18|437375.0|       4|        3|       1704|   11721|     2|         0|  34

In [25]:
# 12. Create a temporary table for the parquet data.
# Define the path to the partitioned Parquet data
parquet_path = "home_sales_partitioned"

# Read the Parquet file into a DataFrame
df_parquet = spark.read.parquet(parquet_path)


----------------------------------------------------------

In [27]:
import time
from pyspark.sql.functions import round, avg

# Define the Parquet file path
parquet_path = "home_sales_partitioned"

# Read the Parquet file
df_parquet = spark.read.parquet(parquet_path)


In [28]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

# Start the timer
start_time = time.time()

# Run the query using DataFrame operations
parquet_result = (
    df_parquet.groupBy("view")
    .agg(round(avg("price"), 2).alias("avg_price"))
    .filter("avg_price >= 350000")
    .orderBy("view", ascending=False)
)

# Show results
parquet_result.show()

# End timer and print execution time
print("--- %s seconds (Parquet) ---" % (time.time() - start_time))


+----+----------+
|view| avg_price|
+----+----------+
| 100| 1026669.5|
|  99|1061201.42|
|  98|1053739.33|
|  97|1129040.15|
|  96|1017815.92|
|  95| 1054325.6|
|  94| 1033536.2|
|  93|1026006.06|
|  92| 970402.55|
|  91|1137372.73|
|  90|1062654.16|
|  89|1107839.15|
|  88|1031719.35|
|  87| 1072285.2|
|  86|1070444.25|
|  85|1056336.74|
|  84|1117233.13|
|  83|1033965.93|
|  82| 1063498.0|
|  81|1053472.79|
+----+----------+
only showing top 20 rows

--- 5.0861029624938965 seconds (Parquet) ---


In [29]:
# Define the path to the directory containing the partitioned Parquet files
parquet_path = "home_sales_partitioned"

# List and print all files in the specified directory
print("Files in directory:", os.listdir(parquet_path))

Files in directory: ['._SUCCESS.crc', 'date_built=2010', 'date_built=2011', 'date_built=2012', 'date_built=2013', 'date_built=2014', 'date_built=2015', 'date_built=2016', 'date_built=2017', '_SUCCESS']


In [30]:
# Read the partitioned Parquet files into a Spark DataFrame  
df_parquet = spark.read.parquet("home_sales_partitioned")  

# Display the first 5 rows to verify that the data is loaded correctly  
df_parquet.show(5)  


+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|                  id|      date|   price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view| sale_date|date_built|
+--------------------+----------+--------+--------+---------+-----------+--------+------+----------+----+----------+----------+
|2ed8d509-7372-46d...|2021-08-06|258710.0|       3|        3|       1918|    9666|     1|         0|  25|2021-08-06|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896.0|       3|        3|       2197|    8641|     1|         0|   3|2020-05-09|      2015|
|c797ca12-52cd-4b1...|2019-06-08|288650.0|       2|        3|       2100|   10419|     2|         0|   7|2019-06-08|      2015|
|0cfe57f3-28c2-472...|2019-10-04|308313.0|       3|        3|       1960|    9453|     2|         0|   2|2019-10-04|      2015|
|d715f295-2fbf-4e9...|2021-05-17|391574.0|       3|        2|       1635|    8040|     2|         0|  10

In [31]:
# Create or replace a temporary SQL table named "home_sales_parquet"  
# This allows querying the DataFrame using SQL queries within Spark  
df_parquet.createOrReplaceTempView("home_sales_parquet")  


In [32]:
# List all the tables currently registered in the Spark catalog
# This will return information about all temporary and permanent tables available for querying
spark.catalog.listTables()


[Table(name='home_data', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='home_sales', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='home_sales_parquet', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [33]:
# 14. Uncache the home_sales temporary table.
# Uncache the temporary table
spark.catalog.uncacheTable("home_sales_parquet")

# Verify if the table is uncached
print("Is home_sales_parquet cached?", spark.catalog.isCached("home_sales_parquet"))


Is home_sales_parquet cached? False


In [34]:
# 15. Check if the home_sales is no longer cached
# Verify if the table is still cached
print("Is home_sales_parquet cached?", spark.catalog.isCached("home_sales_parquet"))


Is home_sales_parquet cached? False
