In [0]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [0]:
# looking at all files in the filestore
dbutils.fs.ls('/FileStore/tables/')

Out[2]: [FileInfo(path='dbfs:/FileStore/tables/CT_fires_2015.csv', name='CT_fires_2015.csv', size=23357531, modificationTime=1715900812000),
 FileInfo(path='dbfs:/FileStore/tables/CT_fires_2015Copy.csv', name='CT_fires_2015Copy.csv', size=23357531, modificationTime=1715900813000),
 FileInfo(path='dbfs:/FileStore/tables/food_prices.parquet', name='food_prices.parquet', size=1394, modificationTime=1715900813000),
 FileInfo(path='dbfs:/FileStore/tables/home_sales_revised-1.csv', name='home_sales_revised-1.csv', size=2747874, modificationTime=1716077847000),
 FileInfo(path='dbfs:/FileStore/tables/home_sales_revised.csv', name='home_sales_revised.csv', size=2747874, modificationTime=1716077775000),
 FileInfo(path='dbfs:/FileStore/tables/species.csv', name='species.csv', size=1605, modificationTime=1715907236000),
 FileInfo(path='dbfs:/FileStore/tables/surveys.csv', name='surveys.csv', size=1021588, modificationTime=1715907237000),
 FileInfo(path='dbfs:/FileStore/tables/user_device-1.csv', n

In [0]:
# 1. Read in the AWS S3 bucket into a DataFrame.

file_path = '/FileStore/tables/home_sales_revised.csv'

df_home_sales = spark.read.csv(file_path, inferSchema=True, header=True)

In [0]:
# 2. Create a temporary view of the DataFrame.
df_home_sales.createOrReplaceTempView('df_temp')


In [0]:
# display df columns
df_home_sales.columns

Out[5]: ['id',
 'date',
 'date_built',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view']

In [0]:
# show the df top 5 records
df_home_sales.select('id',
 'date',
 'date_built',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view').show(5, truncate=False)

+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|id                                  |date      |date_built|price |bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+------------------------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d6-9c31-7398aa8f6089|2022-04-08|2016      |936923|4       |3        |3167       |11733   |2     |1         |76  |
|7530a2d8-1ae3-4517-9f4a-befe060c4353|2021-06-13|2013      |379628|2       |2        |2235       |14384   |1     |0         |23  |
|43de979c-0bf0-4c9f-85ef-96dc27b258d5|2019-04-12|2014      |417866|2       |2        |2127       |10575   |2     |0         |0   |
|b672c137-b88c-48bf-9f18-d0a4ac62fb8b|2019-10-16|2016      |239895|2       |2        |1631       |11149   |2     |0         |0   |
|e0726d4d-d595-4074-8283-4139a54d0d63|2022-01-08|2017      |424418|3       |2      

In [0]:
%sql
select *
from df_temp
limit 5

-- select top 5 records using sql

id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
f8a53099-ba1c-47d6-9c31-7398aa8f6089,2022-04-08,2016,936923,4,3,3167,11733,2,1,76
7530a2d8-1ae3-4517-9f4a-befe060c4353,2021-06-13,2013,379628,2,2,2235,14384,1,0,23
43de979c-0bf0-4c9f-85ef-96dc27b258d5,2019-04-12,2014,417866,2,2,2127,10575,2,0,0
b672c137-b88c-48bf-9f18-d0a4ac62fb8b,2019-10-16,2016,239895,2,2,1631,11149,2,0,0
e0726d4d-d595-4074-8283-4139a54d0d63,2022-01-08,2017,424418,3,2,2249,13878,2,0,4


In [0]:

%sql
select date_built, round(avg(price),2) as Average_Price
from df_temp
where bedrooms = 4
group by date_built
order by date_built

/*--# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
-- not clear if this meant by year built or year sold so this is by year built*/



date_built,Average_Price
2010,296800.75
2011,302141.9
2012,298233.42
2013,299999.39
2014,299073.89
2015,307908.86
2016,296050.24
2017,296576.69


In [0]:
%sql
select year(date), round(avg(price),2) as Average_Price
from df_temp
where bedrooms = 4
group by year(date)
order by year(date)

/*--# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
-- not clear if this meant by year built or year sold so this is by year sold*/

year(date),Average_Price
2019,300263.7
2020,298353.78
2021,301819.44
2022,296363.88


In [0]:
%sql
select date_built, round(avg(price),2) as Average_Price
from df_temp
where bedrooms = 3 and bathrooms = 3
group by date_built
order by date_built

/*  # 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?*/


date_built,Average_Price
2010,292859.62
2011,291117.47
2012,293683.19
2013,295962.27
2014,290852.27
2015,288770.3
2016,290555.07
2017,292676.79


In [0]:

%sql
select date_built, round(avg(price),2) as Average_Price
from df_temp
where bedrooms = 3 and bathrooms = 3 and floors = 2 and sqft_living >=2000
group by date_built
order by date_built


/*# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?*/



date_built,Average_Price
2010,285010.22
2011,276553.81
2012,307539.97
2013,303676.79
2014,298264.72
2015,297609.97
2016,293965.1
2017,280317.58


In [0]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating. 
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

spark.sql("""
select view, round(avg(price),2) as Average_Price
from df_temp
group by view
having round(avg(price),2) >= 350000
order by view desc
""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|Average_Price|
+----+-------------+
| 100|    1026669.5|
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
+----+-------------+
only showing top 20 rows

--- 1.1391870975494385 seconds ---


In [0]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table df_temp")

Out[13]: DataFrame[]

In [0]:
# 8. Check if the table is cached.
spark.catalog.isCached('df_temp')

Out[14]: True

In [0]:
# 9. Using the cached data, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

spark.sql("""
select view, round(avg(price),2) as Average_Price
from df_temp
group by view
having round(avg(price),2) >= 350000
order by view desc
""").show()


print("--- %s seconds ---" % (time.time() - start_time))


+----+-------------+
|view|Average_Price|
+----+-------------+
| 100|    1026669.5|
|  99|   1061201.42|
|  98|   1053739.33|
|  97|   1129040.15|
|  96|   1017815.92|
|  95|    1054325.6|
|  94|    1033536.2|
|  93|   1026006.06|
|  92|    970402.55|
|  91|   1137372.73|
|  90|   1062654.16|
|  89|   1107839.15|
|  88|   1031719.35|
|  87|    1072285.2|
|  86|   1070444.25|
|  85|   1056336.74|
|  84|   1117233.13|
|  83|   1033965.93|
|  82|    1063498.0|
|  81|   1053472.79|
+----+-------------+
only showing top 20 rows

--- 1.116837978363037 seconds ---


In [0]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 
df_home_sales.write.partitionBy("date_built").mode("overwrite").parquet("df_temp_partitioned")

In [0]:
# 11. Read the formatted parquet data.
p_df_home_sales=spark.read.parquet('df_temp_partitioned')

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-2755977413874974>:2[0m
[1;32m      1[0m [38;5;66;03m# 11. Read the formatted parquet data.[39;00m
[0;32m----> 2[0m p_df_home_sales[38;5;241m=[39m[43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mparquet[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43mdf_temp_partitioned[39;49m[38;5;124;43m'[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m

In [0]:
# 12. Create a temporary table for the parquet data.
p_df_home_sales.createOrReplaceTempView('df_temp_p')



In [0]:
# 13. Using the parquet DataFrame, run the last query above, that calculates 
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000. 
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

spark.sql("""
select view, round(avg(price),2) as Average_Price
from df_temp_p
group by view
having round(avg(price),2) >= 350000
order by view desc
""").show()


print("--- %s seconds ---" % (time.time() - start_time))



In [0]:
# 14. Uncache the home_sales temporary table.
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table df_temp")



In [0]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached('df_temp')


