In [21]:
import os
os.environ['SPARK_HOME'] = '/Users/davidskaff/Downloads/spark-3.5.1-bin-hadoop3'

In [22]:
import findspark
findspark.init()

In [23]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [24]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

In [25]:
# Load the data into a DataFrame
df = spark.read.csv(SparkFiles.get("/Users/davidskaff/Desktop/home-sales/home_sales_revised.csv"), header=True, inferSchema=True)

# Create a temporary view of the DataFrame
df.createOrReplaceTempView("home_sales")

In [26]:
# Run a SQL query to calculate the average price for a four-bedroom house sold per year
result = spark.sql("""
    SELECT YEAR(date) as year, ROUND(AVG(price), 2) as avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY YEAR(date)
    ORDER BY YEAR(date)
""")

# Show the result
result.show()

+----+---------+
|year|avg_price|
+----+---------+
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
|2022|296363.88|
+----+---------+



In [27]:
# Run a SQL query to calculate the average price for a home for each year the home was built
result = spark.sql("""
    SELECT date_built as year_built, ROUND(AVG(price), 2) as avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
""")

# Show the result
result.show()


+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|292859.62|
|      2011|291117.47|
|      2012|293683.19|
|      2013|295962.27|
|      2014|290852.27|
|      2015| 288770.3|
|      2016|290555.07|
|      2017|292676.79|
+----------+---------+



In [28]:
# Run a SQL query to calculate the average price for a home for each year the home was built
result = spark.sql("""
    SELECT date_built as year_built, ROUND(AVG(price), 2) as avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
""")

# Show the result
result.show()

+----------+---------+
|year_built|avg_price|
+----------+---------+
|      2010|285010.22|
|      2011|276553.81|
|      2012|307539.97|
|      2013|303676.79|
|      2014|298264.72|
|      2015|297609.97|
|      2016| 293965.1|
|      2017|280317.58|
+----------+---------+



In [29]:
# Start the timer
start_time = time.time()

# Run a SQL query to calculate the average price for a home per "view" rating
result = spark.sql("""
    SELECT view as view_rating, ROUND(AVG(price), 2) as avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view DESC
""")

# Show the result
result.show()

# Calculate and print the run time
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.49784326553344727 seconds ---


In [30]:
# Cache the DataFrame
spark.table("home_sales").cache()

24/05/05 23:17:21 WARN CacheManager: Asked to cache already cached data.


DataFrame[id: string, date: date, date_built: int, price: int, bedrooms: int, bathrooms: int, sqft_living: int, sqft_lot: int, floors: int, waterfront: int, view: int]

In [31]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [32]:
# Start the timer
start_time = time.time()

# Run a SQL query to calculate the average price for a home per "view" rating
result = spark.sql("""
    SELECT view as view_rating, ROUND(AVG(price), 2) as avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view DESC
""")

# Show the result
result.show()

# Calculate and print the run time
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 0.3088369369506836 seconds ---


In [None]:
# Write DataFrame to Parquet format and partition by "date_built"
df.write.partitionBy("date_built").parquet("/Users/davidskaff/Desktop/home-sales/Parquet")

In [35]:
# Read the partitioned Parquet data into a DataFrame
df = spark.read.parquet("/Users/davidskaff/Desktop/home-sales/Parquet")

In [36]:
# Create a temporary view of the DataFrame
df.createOrReplaceTempView("home_sales_parquet")

In [37]:
# Start the timer
start_time = time.time()

# Run a SQL query to calculate the average price for a home per "view" rating
result = spark.sql("""
    SELECT view as view_rating, ROUND(AVG(price), 2) as avg_price
    FROM home_sales_parquet
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view DESC
""")

# Show the result
result.show()

# Calculate and print the run time
print("--- %s seconds ---" % (time.time() - start_time))



+-----------+----------+
|view_rating| avg_price|
+-----------+----------+
|        100| 1026669.5|
|         99|1061201.42|
|         98|1053739.33|
|         97|1129040.15|
|         96|1017815.92|
|         95| 1054325.6|
|         94| 1033536.2|
|         93|1026006.06|
|         92| 970402.55|
|         91|1137372.73|
|         90|1062654.16|
|         89|1107839.15|
|         88|1031719.35|
|         87| 1072285.2|
|         86|1070444.25|
|         85|1056336.74|
|         84|1117233.13|
|         83|1033965.93|
|         82| 1063498.0|
|         81|1053472.79|
+-----------+----------+
only showing top 20 rows

--- 1.3635649681091309 seconds ---


                                                                                

In [38]:
# Uncache the DataFrame
spark.table("home_sales").unpersist()

DataFrame[id: string, date: date, date_built: int, price: int, bedrooms: int, bathrooms: int, sqft_living: int, sqft_lot: int, floors: int, waterfront: int, view: int]

In [39]:
# Check if the DataFrame is cached
is_cached = spark.table("home_sales").is_cached

# Print the result
print("Is 'home_sales' cached?:", is_cached)

Is 'home_sales' cached?: False
