In [None]:

# Import findspark and initialize
import findspark
findspark.init()

# Import necessary PySpark SQL functions
from pyspark.sql import SparkSession, functions as F

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL_HomeSales").getOrCreate()


In [None]:

# Load the home_sales_revised.csv into a Spark DataFrame
file_path = '/mnt/data/home_sales_revised.csv'
home_sales_df = spark.read.csv(file_path, header=True, inferSchema=True)


In [None]:

# Create a temporary view of the DataFrame
home_sales_df.createOrReplaceTempView("home_sales")


In [None]:

# 3. What is the average price for a four-bedroom house sold per year, rounded to two decimal places?
avg_price_4_bed = spark.sql('''
    SELECT 
        year(date) as year_sold, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales 
    WHERE 
        bedrooms = 4 
    GROUP BY 
        year_sold 
    ORDER BY 
        year_sold
''')
avg_price_4_bed.show()


In [None]:

# 4. What is the average price of a home for each year the home was built, that has three bedrooms and three bathrooms?
avg_price_3_bed_3_bath = spark.sql('''
    SELECT 
        year(date_built) as year_built, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales 
    WHERE 
        bedrooms = 3 
        AND bathrooms = 3 
    GROUP BY 
        year_built 
    ORDER BY 
        year_built
''')
avg_price_3_bed_3_bath.show()


In [None]:

# 5. What is the average price of a home for each year the home was built, that has three bedrooms, three bathrooms, two floors, and is >= 2,000 sq ft?
avg_price_specific = spark.sql('''
    SELECT 
        year(date_built) as year_built, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales 
    WHERE 
        bedrooms = 3 
        AND bathrooms = 3 
        AND floors = 2 
        AND sqft_living >= 2000 
    GROUP BY 
        year_built 
    ORDER BY 
        year_built
''')
avg_price_specific.show()


In [None]:

# 6. What is the average price of a home per "view" rating having an average home price >= $350,000?
import time
start_time = time.time()

avg_price_per_view = spark.sql('''
    SELECT 
        view, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales 
    GROUP BY 
        view 
    HAVING 
        avg_price >= 350000 
    ORDER BY 
        avg_price DESC
''')
avg_price_per_view.show()

print("Runtime: %s seconds" % (time.time() - start_time))


In [None]:

# 7. Cache the temporary table home_sales
spark.sql("CACHE TABLE home_sales")


In [None]:

# 8. Verify the table is cached
is_cached = spark.catalog.isCached("home_sales")
print(f"Is 'home_sales' table cached? {is_cached}")


In [None]:

# 9. Run the last query on cached data and compare the runtime
start_time_cached = time.time()

avg_price_per_view_cached = spark.sql('''
    SELECT 
        view, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales 
    GROUP BY 
        view 
    HAVING 
        avg_price >= 350000 
    ORDER BY 
        avg_price DESC
''')
avg_price_per_view_cached.show()

print("Cached Runtime: %s seconds" % (time.time() - start_time_cached))


In [None]:

# 10. Partition by the "date_built" field on the formatted parquet home sales data
home_sales_df.write.partitionBy("date_built").parquet("home_sales_partitioned")

# Read the partitioned parquet data
partitioned_df = spark.read.parquet("home_sales_partitioned")
partitioned_df.createOrReplaceTempView("home_sales_partitioned")


In [None]:

# 11. Run the last query on the partitioned parquet temporary table and compare the runtime
start_time_partitioned = time.time()

avg_price_per_view_partitioned = spark.sql('''
    SELECT 
        view, 
        ROUND(AVG(price), 2) as avg_price 
    FROM 
        home_sales_partitioned 
    GROUP BY 
        view 
    HAVING 
        avg_price >= 350000 
    ORDER BY 
        avg_price DESC
''')
avg_price_per_view_partitioned.show()

print("Partitioned Runtime: %s seconds" % (time.time() - start_time_partitioned))


In [None]:

# 12. Uncache the home_sales temporary table
spark.sql("UNCACHE TABLE home_sales")

# Verify that the table is uncached
is_uncached = not spark.catalog.isCached("home_sales")
print(f"Is 'home_sales' table uncached? {is_uncached}")
