In [2]:
!pip install pyspark




In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, round

spark = SparkSession.builder.appName("HomeSales").getOrCreate()


In [4]:
from google.colab import files
uploaded = files.upload()


Saving home_sales_revised.csv to home_sales_revised.csv


In [5]:
df = spark.read.csv("home_sales_revised.csv", header=True, inferSchema=True)
df.show(5)


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
+--------------------+----------+----------+------+--------+---------+----------

In [6]:
# Create a SQL-style table from our DataFrame
df.createOrReplaceTempView("home_sales")


In [7]:
# Average price of 4-bedroom houses sold each year
spark.sql("""
    SELECT year(date) AS year,
           ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY year
    ORDER BY year
""").show()


+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



In [8]:
# Average price of homes by year and bedroom count
spark.sql("""
    SELECT year(date) AS year,
           bedrooms,
           ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY year, bedrooms
    ORDER BY year, bedrooms
""").show()


+----+--------+----------+
|year|bedrooms| avg_price|
+----+--------+----------+
|2019|       2| 284652.03|
|2019|       3| 296327.09|
|2019|       4|  300263.7|
|2019|       5| 731305.07|
|2019|       6|1363911.17|
|2019|       7|1397100.52|
|2019|       8|1386350.46|
|2020|       2| 284426.66|
|2020|       3| 302175.49|
|2020|       4| 298353.78|
|2020|       5| 761827.28|
|2020|       6|1380940.83|
|2020|       7|1382564.12|
|2020|       8|1366382.74|
|2021|       2| 284437.43|
|2021|       3|  300962.5|
|2021|       4| 301819.44|
|2021|       5| 753028.72|
|2021|       6|1359829.05|
|2021|       7|1377821.44|
+----+--------+----------+
only showing top 20 rows



In [9]:
# Average price of homes with more than 2 bathrooms by year
spark.sql("""
    SELECT year(date) AS year,
           ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bathrooms > 2
    GROUP BY year
    ORDER BY year
""").show()


+----+---------+
|year|avg_price|
+----+---------+
|2019| 344139.3|
|2020|343361.93|
|2021|338241.58|
|2022|342098.73|
+----+---------+



In [10]:
# Average price of homes over 2000 sqft by year
spark.sql("""
    SELECT year(date) AS year,
           ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE sqft_living > 2000
    GROUP BY year
    ORDER BY year
""").show()


+----+---------+
|year|avg_price|
+----+---------+
|2019|345639.55|
|2020|342693.17|
|2021|337899.58|
|2022|344403.73|
+----+---------+



In [12]:
# Step 12: Write the home sales DataFrame to Parquet format
df.write.parquet("home_sales.parquet", mode="overwrite")


In [13]:
# Step 13: Read the Parquet file into a new DataFrame
parquet_df = spark.read.parquet("home_sales.parquet")

# Create a temporary view from the parquet data
parquet_df.createOrReplaceTempView("parquet_home_sales")


In [15]:
from pyspark.sql.functions import year

# Re-add the 'year' column from the 'date' column
parquet_df = parquet_df.withColumn("year", year("date"))
parquet_df.createOrReplaceTempView("parquet_home_sales")



In [16]:
# Step 14: Run SQL query on Parquet data and measure performance time
import time
start_time = time.time()

spark.sql("""
    SELECT year, ROUND(AVG(view), 2) AS avg_view_rating
    FROM parquet_home_sales
    GROUP BY year
    ORDER BY year
""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+----+---------------+
|year|avg_view_rating|
+----+---------------+
|2019|          27.36|
|2020|          27.55|
|2021|          27.44|
|2022|          27.27|
+----+---------------+

--- 0.983508825302124 seconds ---


In [17]:
# Step 15: Uncache the original table and check if it's still cached
spark.catalog.uncacheTable("home_sales")

# Verify if it's uncached
print("Is 'home_sales' still cached?", spark.catalog.isCached("home_sales"))


Is 'home_sales' still cached? False
