In [None]:

# Install Java and PySpark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, round, year
import time

spark = SparkSession.builder.appName("HomeSales").getOrCreate()


In [None]:

# Download and read CSV with semicolon delimiter
!wget -q https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv

df = spark.read.option("header", "true").option("sep", ";").option("inferSchema", "true").csv("home_sales_revised.csv")
df.printSchema()
df.show(5)
df.createOrReplaceTempView("home_sales")


In [None]:
spark.sql("""
    SELECT year(date) AS year_sold, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE CAST(bedrooms AS INT) = 4
    GROUP BY year_sold
    ORDER BY year_sold
""").show()

In [None]:
spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE CAST(bedrooms AS INT) = 3 AND CAST(bathrooms AS INT) = 3
    GROUP BY date_built
    ORDER BY date_built
""").show()

In [None]:
spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE CAST(bedrooms AS INT) = 3 AND CAST(bathrooms AS INT) = 3 AND CAST(floors AS INT) = 2 AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
""").show()

In [None]:
start_time = time.time()

spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING avg_price >= 350000
    ORDER BY view
""").show()

print("Runtime:", round(time.time() - start_time, 2), "seconds")

In [None]:
spark.catalog.cacheTable("home_sales")

In [None]:
print("Is 'home_sales' cached?:", spark.catalog.isCached("home_sales"))

In [None]:
start_time = time.time()

spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING avg_price >= 350000
    ORDER BY view
""").show()

print("Cached Runtime:", round(time.time() - start_time, 2), "seconds")

In [None]:
df.write.partitionBy("date_built").mode("overwrite").parquet("/tmp/home_sales_partitioned")

In [None]:
parquet_df = spark.read.parquet("/tmp/home_sales_partitioned")
parquet_df.createOrReplaceTempView("home_sales_parquet")

In [None]:
start_time = time.time()

spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales_parquet
    GROUP BY view
    HAVING avg_price >= 350000
    ORDER BY view
""").show()

print("Parquet Runtime:", round(time.time() - start_time, 2), "seconds")

In [None]:
spark.catalog.uncacheTable("home_sales")

In [None]:
print("Is 'home_sales' cached after uncache?:", spark.catalog.isCached("home_sales"))