In [1]:
!pip install pyspark



In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, round
import pandas as pd
import time
import os

In [3]:
# Start a Spark session
spark = SparkSession.builder.appName("HomeSales").getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# Simulate the home_sales_revised.csv file using sample data
data = {
    "id": range(1, 11),
    "date_sold": pd.date_range(start="2020-01-01", periods=10, freq="180D"),
    "price": [450000, 350000, 520000, 410000, 390000, 620000, 370000, 290000, 680000, 430000],
    "bedrooms": [4, 3, 4, 4, 3, 3, 3, 4, 4, 3],
    "bathrooms": [3, 3, 3, 2, 3, 3, 2, 3, 2, 3],
    "floors": [1, 2, 2, 1, 2, 2, 1, 2, 2, 2],
    "sqft_living": [1800, 2200, 2500, 1900, 2000, 2400, 1700, 2100, 3000, 2300],
    "view": [0, 1, 0, 2, 4, 3, 2, 1, 0, 4],
    "date_built": [2005, 2010, 2005, 2011, 2010, 2005, 2011, 2006, 2008, 2005]
}
df = pd.DataFrame(data)
df.to_csv("home_sales_revised.csv", index=False)
print("Simulated CSV saved.")

In [None]:
# Load the CSV file into a Spark DataFrame
home_df = spark.read.csv("home_sales_revised.csv", header=True, inferSchema=True)
home_df.createOrReplaceTempView("home_sales")

In [None]:
# Calculate the average price for four-bedroom homes sold each year
spark.sql("""
    SELECT YEAR(date_sold) AS year_sold, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales 
    WHERE bedrooms = 4 
    GROUP BY year_sold 
    ORDER BY year_sold
""").show()

In [None]:
# Calculate the average price for homes with three bedrooms and three bathrooms by year built
spark.sql("""
    SELECT date_built, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales 
    WHERE bedrooms = 3 AND bathrooms = 3 
    GROUP BY date_built 
    ORDER BY date_built
""").show()

In [None]:
# Calculate the average price for homes with 3 bed, 3 bath, 2 floors, and at least 2000 sqft by year built
spark.sql("""
    SELECT date_built, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales 
    WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000 
    GROUP BY date_built 
    ORDER BY date_built
""").show()

In [None]:
# Calculate average price per view rating where the average price is at least $350,000
start = time.time()
spark.sql("""
    SELECT view, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales 
    GROUP BY view 
    HAVING AVG(price) >= 350000 
    ORDER BY view
""").show()
end = time.time()
print(f"Runtime: {end - start:.2f} seconds")

In [None]:
# Cache the temporary home_sales table
spark.sql("CACHE TABLE home_sales")

In [None]:
# Check if the home_sales table is cached
spark.catalog.isCached("home_sales")

In [None]:
# Run the previous query again using cached data
start = time.time()
spark.sql("""
    SELECT view, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales 
    GROUP BY view 
    HAVING AVG(price) >= 350000 
    ORDER BY view
""").show()
end = time.time()
print(f"Cached Runtime: {end - start:.2f} seconds")

In [None]:
# Save the data in Parquet format partitioned by the date_built column
home_df.write.partitionBy("date_built").parquet("home_sales_partitioned.parquet", mode="overwrite")

In [None]:
# Load the Parquet data and create a new temporary table
parquet_df = spark.read.parquet("home_sales_partitioned.parquet")
parquet_df.createOrReplaceTempView("home_sales_parquet")

In [None]:
# Run the same query on the parquet temp table
start = time.time()
spark.sql("""
    SELECT view, 
           ROUND(AVG(price), 2) AS avg_price 
    FROM home_sales_parquet 
    GROUP BY view 
    HAVING AVG(price) >= 350000 
    ORDER BY view
""").show()
end = time.time()
print(f"Parquet Runtime: {end - start:.2f} seconds")

In [None]:
# Uncache the home_sales table
spark.sql("UNCACHE TABLE home_sales")

In [None]:
# Verify that the home_sales table is no longer cached
spark.catalog.isCached("home_sales")