In [1]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=1f109792af59e4c4aa8df55336b0ab8fc3edb17bb373fad531c81508e09d0edc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import time

schema = StructType([
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("date_built", IntegerType(), True),
    StructField("price", IntegerType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("bathrooms", IntegerType(), True),
    StructField("sqft_living", IntegerType(), True),
    StructField("sqft_lot", IntegerType(), True),
    StructField("floors", IntegerType(), True),
    StructField("waterfront", IntegerType(), True),
    StructField("view", IntegerType(), True),
])

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [4]:
# 2. Create a temporary view of the DataFrame.
spark.sparkContext.addFile(url)
file_path = "file://" + SparkFiles.get("home_sales_revised.csv")

df = spark.sql(f"SELECT * FROM csv.`{file_path}`")
df.createOrReplaceTempView("home_sales")

In [5]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)



In [6]:
df = df.withColumnRenamed("_c0", "id") \
    .withColumnRenamed("_c1", "date") \
    .withColumnRenamed("_c2", "date_built") \
    .withColumnRenamed("_c3", "price") \
    .withColumnRenamed("_c4", "bedrooms") \
    .withColumnRenamed("_c5", "bathrooms") \
    .withColumnRenamed("_c6", "sqft_living") \
    .withColumnRenamed("_c7", "sqft_lot") \
    .withColumnRenamed("_c8", "floors") \
    .withColumnRenamed("_c9", "waterfront") \
    .withColumnRenamed("_c10", "view")

In [7]:
df.createOrReplaceTempView("home_sales")

In [8]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- date_built: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- bathrooms: string (nullable = true)
 |-- sqft_living: string (nullable = true)
 |-- sqft_lot: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- waterfront: string (nullable = true)
 |-- view: string (nullable = true)



In [9]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType, IntegerType, DateType

df = df.withColumn("id", col("id").cast(FloatType()))
df = df.withColumn("date", col("date").cast(DateType()))
df = df.withColumn("date_built", col("date_built").cast(DateType()))
df = df.withColumn("price", col("price").cast(IntegerType()))
df = df.withColumn("bedrooms", col("bedrooms").cast(IntegerType()))
df = df.withColumn("bathrooms", col("bathrooms").cast(IntegerType()))
df = df.withColumn("sqft_living", col("sqft_living").cast(IntegerType()))
df = df.withColumn("sqft_lot", col("sqft_lot").cast(IntegerType()))
df = df.withColumn("floors", col("floors").cast(IntegerType()))
df = df.withColumn("waterfront", col("waterfront").cast(IntegerType()))
df = df.withColumn("view", col("view").cast(IntegerType()))

In [10]:
df.createOrReplaceTempView("home_sales")

In [11]:
df.printSchema()

root
 |-- id: float (nullable = true)
 |-- date: date (nullable = true)
 |-- date_built: date (nullable = true)
 |-- price: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: integer (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)



In [12]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
question_3 = """
SELECT date_built as year, AVG(price) AS avg_price
FROM home_sales
WHERE bedrooms = 4
GROUP BY date_built
ORDER BY date_built
"""

result_3 = spark.sql(question_3)
result_3.show()

+----------+------------------+
|      year|         avg_price|
+----------+------------------+
|2010-01-01| 296800.7544776119|
|2011-01-01|302141.89581749047|
|2012-01-01| 298233.4150805271|
|2013-01-01|299999.38822652755|
|2014-01-01| 299073.8858447489|
|2015-01-01|307908.86020761245|
|2016-01-01|296050.24326347304|
|2017-01-01| 296576.6934782609|
+----------+------------------+



In [15]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
question_4 = """
SELECT date_built as year_built, AVG(price) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY date_built
ORDER BY date_built
"""

result_4 = spark.sql(question_4)
result_4.show()

+----------+------------------+
|year_built|         avg_price|
+----------+------------------+
|2010-01-01|  292859.615942029|
|2011-01-01|291117.46706586826|
|2012-01-01| 293683.1872074883|
|2013-01-01|295962.27145085804|
|2014-01-01| 290852.2661870504|
|2015-01-01| 288770.2966101695|
|2016-01-01|  290555.073964497|
|2017-01-01| 292676.7887740029|
+----------+------------------+



In [16]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
question_5 = """
SELECT date_built as year_built, AVG(price) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY date_built
ORDER BY date_built
"""

result_5 = spark.sql(question_5)
result_5.show()

+----------+------------------+
|year_built|         avg_price|
+----------+------------------+
|2010-01-01| 285010.2215909091|
|2011-01-01| 276553.8128654971|
|2012-01-01|307539.97402597405|
|2013-01-01|      303676.79375|
|2014-01-01| 298264.7183908046|
|2015-01-01| 297609.9679144385|
|2016-01-01| 293965.1046511628|
|2017-01-01|280317.57692307694|
+----------+------------------+



In [17]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()

question_6 = """
SELECT view, AVG(price) AS avg_price
FROM home_sales
WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY view
ORDER BY view
"""

result_6 = spark.sql(question_6)
result_6.show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+------------------+
|view|         avg_price|
+----+------------------+
|   0|276161.10714285716|
|   1|         280277.25|
|   2|         254207.04|
|   3|258428.27777777778|
|   4| 287105.1724137931|
|   5|286016.53571428574|
|   6|         323486.72|
|   7|271921.04761904763|
|   8|        298208.875|
|   9| 290356.6923076923|
|  10|          303305.4|
|  11|274758.85714285716|
|  12|280996.63636363635|
|  13|291548.53846153844|
|  14|         318510.96|
|  15| 281520.1515151515|
|  16|290491.17647058825|
|  17| 287555.9166666667|
|  18|293643.92307692306|
|  19| 277739.1935483871|
+----+------------------+
only showing top 20 rows

--- 1.049471139907837 seconds ---


In [18]:
# 7. Cache the the temporary table home_sales.
spark.sql("cache table home_sales")

DataFrame[]

In [19]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

True

In [21]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

question_9_cached = """SELECT view, AVG(price) AS avg_price
                      FROM home_sales
                      WHERE price >= 350000
                      GROUP BY view
                      ORDER BY view"""

result_9_cached = spark.sql(question_9_cached)
result_9_cached.show()

print("--- %s seconds ---" % (time.time() - start_time))

spark.sql("UNCACHE TABLE home_sales")


+----+------------------+
|view|         avg_price|
+----+------------------+
|   0|403848.51461988303|
|   1| 401044.2513368984|
|   2|397389.24752475246|
|   3| 398867.5964912281|
|   4|         399631.89|
|   5| 401471.8248587571|
|   6| 395655.3789473684|
|   7| 403005.7709497207|
|   8|398592.70658682636|
|   9| 401393.3370786517|
|  10|401868.42523364484|
|  11|399548.11891891895|
|  12| 401501.3243243243|
|  13|         398917.98|
|  14|398570.02923976607|
|  15|404673.29545454547|
|  16| 399586.5311004785|
|  17|398474.49029126216|
|  18| 399332.9090909091|
|  19| 398953.1703296703|
+----+------------------+
only showing top 20 rows

--- 0.4988260269165039 seconds ---


DataFrame[]

In [23]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").parquet("/content/parquet_home_sales")

In [26]:
# 11. Read the parquet formatted data.
parquet_df = spark.read.parquet("/content/parquet_home_sales")

In [27]:
# 12. Create a temporary table for the parquet data.
parquet_df.createOrReplaceTempView("parquet_home_sales")

In [28]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

spark.sql("CACHE TABLE parquet_home_sales")

start_time = time.time()

question_13 = """SELECT view, AVG(price) AS avg_price
                 FROM parquet_home_sales
                 WHERE price >= 350000
                 GROUP BY view
                 ORDER BY view"""

result_13 = spark.sql(question_13)
result_13.show()

print("--- %s seconds ---" % (time.time() - start_time))

# Uncache the parquet DataFrame after the query is executed
spark.sql("UNCACHE TABLE parquet_home_sales")

+----+------------------+
|view|         avg_price|
+----+------------------+
|   0|403848.51461988303|
|   1| 401044.2513368984|
|   2|397389.24752475246|
|   3| 398867.5964912281|
|   4|         399631.89|
|   5| 401471.8248587571|
|   6| 395655.3789473684|
|   7| 403005.7709497207|
|   8|398592.70658682636|
|   9| 401393.3370786517|
|  10|401868.42523364484|
|  11|399548.11891891895|
|  12| 401501.3243243243|
|  13|         398917.98|
|  14|398570.02923976607|
|  15|404673.29545454547|
|  16| 399586.5311004785|
|  17|398474.49029126216|
|  18| 399332.9090909091|
|  19| 398953.1703296703|
+----+------------------+
only showing top 20 rows

--- 0.6894721984863281 seconds ---


DataFrame[]

In [29]:
# 14. Uncache the home_sales temporary table.
spark.catalog.uncacheTable("home_sales")

In [31]:
# 15. Check if the home_sales is no longer cached
print("Is home_sales cached?", "Yes" if spark.catalog.isCached("home_sales") else "No")

Is home_sales cached? No
