In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col


In [14]:
# Create a Spark session
spark = SparkSession.builder.appName("HomeSales").getOrCreate()

# Read the CSV file into a Spark DataFrame
file_path = "Home_Sales/home_sales_revised.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Create a temporary table called home_sales
df.createOrReplaceTempView("home_sales")


FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/Cellar/apache-spark/3.x.x/libexec/./bin/spark-submit'

In [None]:
# SQL query for average price of a four-bedroom house sold per year
query = """
SELECT YEAR(date) AS year_sold, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY year_sold
ORDER BY year_sold
"""
spark.sql(query).show()


In [None]:
query = """
SELECT YEAR(date_built) AS year_built, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY year_built
ORDER BY year_built
"""
spark.sql(query).show()


In [None]:
query = """
SELECT YEAR(date_built) AS year_built, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY year_built
ORDER BY year_built
"""
spark.sql(query).show()


In [None]:
import time

# Track the runtime
start_time = time.time()

query = """
SELECT view, ROUND(AVG(price), 2) AS avg_price
FROM home_sales
GROUP BY view
HAVING avg_price >= 350000
ORDER BY avg_price DESC
"""
spark.sql(query).show()

# Print the runtime
print("Runtime: %s seconds" % (time.time() - start_time))


In [None]:
# Cache the table
spark.sql("CACHE TABLE home_sales")


In [None]:
# Check if the table is cached
spark.catalog.isCached("home_sales")


In [None]:
# Re-run the same query to compare runtimes
start_time = time.time()

spark.sql(query).show()

print("Cached Query Runtime: %s seconds" % (time.time() - start_time))


In [None]:
# Partition data by 'date_built' and write to parquet
df.write.partitionBy("date_built").parquet("home_sales_partitioned")


In [None]:
# Read the partitioned data
parquet_df = spark.read.parquet("home_sales_partitioned")

# Create temporary table for the parquet data
parquet_df.createOrReplaceTempView("parquet_home_sales")


In [None]:
start_time = time.time()

spark.sql(query).show()

print("Parquet Query Runtime: %s seconds" % (time.time() - start_time))


In [None]:
# Uncache the table
spark.sql("UNCACHE TABLE home_sales")


In [None]:
# Verify that the table is uncached
spark.catalog.isCached("home_sales")


In [None]:
# Add changes
git add Home_Sales.ipynb

# Commit changes
git commit -m "Completed home sales analysis with SparkSQL"

# Push to GitHub
git push origin main
