In [31]:

!pip install pyspark py4j
!pip install findspark

# Import findspark and initialize.
import findspark
findspark.init()



In [32]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [33]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)
df=spark.read.csv(SparkFiles.get("home_sales_revised.csv"),sep= "," ,header=True, inferSchema=True)
df.show()


+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [34]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [35]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView('home_sales')


In [36]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
# Create a column with the year from the date column
from pyspark.sql.functions import year
import pyspark.sql.functions as f
df = df.withColumn("year",year(df['date']))

#Filter to only 4 bedroom houses
df_4br = df.filter(df['bedrooms'] == 4)
# Calculate the average price per year for 4 bedroom houses
averages_4br = df_4br.groupBy('year').avg()

# Rename the avg(price) column to avg_price
averages_4br = averages_4br.withColumnRenamed('avg(price)', 'avg_price')
# create a new column with the rounded average price showing 2 decimal places
averages_4br = averages_4br.withColumn('rounded_avg_price', f.round(f.col('avg_price'),2))

averages_4br.orderBy('year').select('year','rounded_avg_price').show()


+----+-----------------+
|year|rounded_avg_price|
+----+-----------------+
|2019|         300263.7|
|2020|        298353.78|
|2021|        301819.44|
|2022|        296363.88|
+----+-----------------+



In [37]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?

# Filter to only 3 bedroom houses with 3 bathrooms
df_3br_3ba = df.filter((df['bedrooms'] == 3) & (df['bathrooms'] == 3))

# Calculate the average price by year built for these houses
averages_3br_3ba = df_3br_3ba.groupBy('date_built').avg()

# Rename the avg(price) column to avg_price
averages_3br_3ba = averages_3br_3ba.withColumnRenamed('avg(price)', 'avg_price')

# create a new column with the rounded average price showing 2 decimal places
averages_3br_3ba = averages_3br_3ba.withColumn('rounded_avg_price', f.round(f.col('avg_price'),2))

averages_3br_3ba.orderBy('date_built').select('date_built','rounded_avg_price').show()


+----------+-----------------+
|date_built|rounded_avg_price|
+----------+-----------------+
|      2010|        292859.62|
|      2011|        291117.47|
|      2012|        293683.19|
|      2013|        295962.27|
|      2014|        290852.27|
|      2015|         288770.3|
|      2016|        290555.07|
|      2017|        292676.79|
+----------+-----------------+



In [38]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
# Filter 3 bedroom houses with 3 bathrooms to include only 2 floors and at least 2000 sqft
df_3br_3ba_2fl_2k = df_3br_3ba.filter((df_3br_3ba['floors'] == 2) & (df_3br_3ba['sqft_living'] >= 2000))

# Calculate the average price by year built for these houses
averages_3br_3ba_2fl_2k = df_3br_3ba_2fl_2k.groupBy('date_built').avg()

# Rename the avg(price) column to avg_price
averages_3br_3ba_2fl_2k = averages_3br_3ba_2fl_2k.withColumnRenamed('avg(price)', 'avg_price')

# create a new column with the rounded average price showing 2 decimal places
averages_3br_3ba_2fl_2k = averages_3br_3ba_2fl_2k.withColumn('rounded_avg_price', f.round(f.col('avg_price'),2))

averages_3br_3ba_2fl_2k.orderBy('date_built').select('date_built','rounded_avg_price').show()


+----------+-----------------+
|date_built|rounded_avg_price|
+----------+-----------------+
|      2010|        285010.22|
|      2011|        276553.81|
|      2012|        307539.97|
|      2013|        303676.79|
|      2014|        298264.72|
|      2015|        297609.97|
|      2016|         293965.1|
|      2017|        280317.58|
+----------+-----------------+



In [54]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.

start_time = time.time()

# Calculate the average price by view rating
averages_by_view = df.groupBy('view').avg()

# create a new column with the rounded average price showing 2 decimal places
averages_by_view_rounded = averages_by_view.withColumn('rounded_avg_price', f.round(f.col('avg(price)'),2))

# Filter to only houses with average price greater than or equal to $350,000
averages_by_view_rounded= averages_by_view_rounded.filter((averages_by_view_rounded['avg(price)'] >= 350000))

averages_by_view_rounded.orderBy(f.col('view').desc()).select('rounded_avg_price','view').show()
print("--- %s seconds ---" % (time.time() - start_time))

+-----------------+----+
|rounded_avg_price|view|
+-----------------+----+
|        1026669.5| 100|
|       1061201.42|  99|
|       1053739.33|  98|
|       1129040.15|  97|
|       1017815.92|  96|
|        1054325.6|  95|
|        1033536.2|  94|
|       1026006.06|  93|
|        970402.55|  92|
|       1137372.73|  91|
|       1062654.16|  90|
|       1107839.15|  89|
|       1031719.35|  88|
|        1072285.2|  87|
|       1070444.25|  86|
|       1056336.74|  85|
|       1117233.13|  84|
|       1033965.93|  83|
|        1063498.0|  82|
|       1053472.79|  81|
+-----------------+----+
only showing top 20 rows

--- 0.6586861610412598 seconds ---


In [55]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable('home_sales')

In [56]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [57]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

start_time = time.time()

# Calculate the average price by view rating
averages_by_view = df.groupBy('view').avg()

# create a new column with the rounded average price showing 2 decimal places
averages_by_view_rounded = averages_by_view.withColumn('rounded_avg_price', f.round(f.col('avg(price)'),2))

# Filter to only houses with average price greater than or equal to $350,000
averages_by_view_rounded= averages_by_view_rounded.filter((averages_by_view_rounded['avg(price)'] >= 350000))

averages_by_view_rounded.orderBy(f.col('view').desc()).select('rounded_avg_price','view').show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------------+----+
|rounded_avg_price|view|
+-----------------+----+
|        1026669.5| 100|
|       1061201.42|  99|
|       1053739.33|  98|
|       1129040.15|  97|
|       1017815.92|  96|
|        1054325.6|  95|
|        1033536.2|  94|
|       1026006.06|  93|
|        970402.55|  92|
|       1137372.73|  91|
|       1062654.16|  90|
|       1107839.15|  89|
|       1031719.35|  88|
|        1072285.2|  87|
|       1070444.25|  86|
|       1056336.74|  85|
|       1117233.13|  84|
|       1033965.93|  83|
|        1063498.0|  82|
|       1053472.79|  81|
+-----------------+----+
only showing top 20 rows

--- 1.7444448471069336 seconds ---


In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data


In [None]:
# 11. Read the formatted parquet data.


In [None]:
# 12. Create a temporary table for the parquet data.


In [None]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()



print("--- %s seconds ---" % (time.time() - start_time))

In [58]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

DataFrame[]

In [59]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")


False