In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.5.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connected to cloud.r-project.org (18.160.213                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connecting to ppa.launchpadcontent.net] [Wai                                                                                                    Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy In

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"



In [4]:
# Read in data from S3 Buckets

spark.sparkContext.addFile(url)
#df = spark.read.csv(url, sep = ',', inferSchema=True)
#df=spark.read.format("csv").option("Header","true").load(url)
# Show the DataFrame
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep=",", header=True)
# Show DataFrame
df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [5]:
# 2. Create a temporary view of the DataFrame.

df.createOrReplaceTempView('homes')

In [6]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
#https://www.sqltutorial.org/sql-date-functions/how-to-extract-year-from-date-in-sql/
spark.sql('SELECT DISTINCT EXTRACT(YEAR FROM date) AS Year, ROUND(AVG(price), 2) AS Average_Price FROM homes GROUP BY EXTRACT(YEAR FROM date)').show()


+----+-------------+
|Year|Average_Price|
+----+-------------+
|2022|    314421.54|
|2019|    315782.18|
|2020|    315949.81|
|2021|    313218.09|
+----+-------------+



In [7]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
spark.sql('SELECT DISTINCT EXTRACT(YEAR FROM date) AS Year, AVG(bedrooms) AS Beds, AVG(bathrooms) as Baths,\
  ROUND(AVG(price), 2) AS Average_Price FROM homes\
 WHERE bedrooms = 3 AND bathrooms = 3 GROUP BY EXTRACT(YEAR FROM date)').show()
# remember the average of an infant number of 3s is 3

+----+----+-----+-------------+
|Year|Beds|Baths|Average_Price|
+----+----+-----+-------------+
|2022| 3.0|  3.0|    292725.69|
|2019| 3.0|  3.0|    287287.82|
|2020| 3.0|  3.0|    294204.16|
|2021| 3.0|  3.0|    294211.46|
+----+----+-----+-------------+



In [8]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
spark.sql('SELECT DISTINCT EXTRACT(YEAR FROM date) AS Year, AVG(bedrooms) AS Beds, AVG(bathrooms) as Baths,\
  ROUND(AVG(price), 2) AS Average_Price, AVG(floors) as Floors, ROUND(AVG(sqft_living)) AS Living_Space FROM homes\
 WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000 GROUP BY EXTRACT(YEAR FROM date)').show()



+----+----+-----+-------------+------+------------+
|Year|Beds|Baths|Average_Price|Floors|Living_Space|
+----+----+-----+-------------+------+------------+
|2022| 3.0|  3.0|    290242.99|   2.0|      2268.0|
|2019| 3.0|  3.0|    289859.14|   2.0|      2275.0|
|2020| 3.0|  3.0|    292289.09|   2.0|      2314.0|
|2021| 3.0|  3.0|    296330.96|   2.0|      2282.0|
+----+----+-----+-------------+------+------------+



In [9]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()
spark.sql('SELECT DISTINCT view, ROUND(AVG(price), 2) AS Average_Price\
 FROM homes WHERE price >= 350000 GROUP BY view').show()
#https://stackoverflow.com/questions/34629313/how-to-measure-the-execution-time-of-a-query-on-spark

print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|Average_Price|
+----+-------------+
|   7|    403005.77|
|  51|    788128.21|
|  15|     404673.3|
|  54|    798684.82|
|  11|    399548.12|
|  29|    397771.65|
|  69|    750537.94|
|  42|     396964.5|
|  87|    1072285.2|
|  73|    752861.18|
|  64|    767036.67|
|   3|     398867.6|
|  30|     397862.0|
|  34|    401419.75|
|  59|     791453.0|
|   8|    398592.71|
|  28|    402124.62|
|  22|    402022.68|
|  85|   1056336.74|
|  16|    399586.53|
+----+-------------+
only showing top 20 rows

--- 1.438124656677246 seconds ---


In [10]:
# 7. Cache the temporary table home_sales.
#spark.sql('DROP TABLE home_sales')
spark.sql("CACHE TABLE home_sales OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM homes;")
#https://spark.apache.org/docs/latest/sql-ref-syntax-aux-cache-cache-table.html

DataFrame[]

In [11]:
spark.sql("SHOW TABLES").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |home_sales|       true|
|         |     homes|       true|
+---------+----------+-----------+



In [12]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

spark.sql('SELECT DISTINCT view, ROUND(AVG(price), 2) AS Average_Price\
 FROM home_sales WHERE price >= 350000 GROUP BY view').show()

print("--- %s seconds ---" % (time.time() - start_time))


+----+-------------+
|view|Average_Price|
+----+-------------+
|   7|    403005.77|
|  51|    788128.21|
|  15|     404673.3|
|  54|    798684.82|
|  11|    399548.12|
|  29|    397771.65|
|  69|    750537.94|
|  42|     396964.5|
|  87|    1072285.2|
|  73|    752861.18|
|  64|    767036.67|
|   3|     398867.6|
|  30|     397862.0|
|  34|    401419.75|
|  59|     791453.0|
|   8|    398592.71|
|  28|    402124.62|
|  22|    402022.68|
|  85|   1056336.74|
|  16|    399586.53|
+----+-------------+
only showing top 20 rows

--- 0.8858330249786377 seconds ---


In [13]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.mode('overwrite').partitionBy('date_built').parquet('home_sales')
#https://kontext.tech/article/1173/pyspark-partitionby-with-examples
#https://www.bing.com/videos/riverview/relatedvideo?q=how+to+format+a+parquet&mid=84248CB10CB649081B1384248CB10CB649081B13&FORM=VIRE

In [14]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=5d9c9f5b34bce431e45cc9dc942a2e5baca67e8a1b8d7c0a06b957df00fda60d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [15]:
from pyspark.sql import DataFrameWriter

In [19]:
# 11. Read the parquet formatted data.
adf = spark.read.format('parquet').load('home_sales')
adf.show()

+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|                  id|      date| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|date_built|
+--------------------+----------+------+--------+---------+-----------+--------+------+----------+----+----------+
|2ed8d509-7372-46d...|2021-08-06|258710|       3|        3|       1918|    9666|     1|         0|  25|      2015|
|941bad30-eb49-4a7...|2020-05-09|229896|       3|        3|       2197|    8641|     1|         0|   3|      2015|
|c797ca12-52cd-4b1...|2019-06-08|288650|       2|        3|       2100|   10419|     2|         0|   7|      2015|
|0cfe57f3-28c2-472...|2019-10-04|308313|       3|        3|       1960|    9453|     2|         0|   2|      2015|
|d715f295-2fbf-4e9...|2021-05-17|391574|       3|        2|       1635|    8040|     2|         0|  10|      2015|
|a18515a2-86f3-46b...|2022-02-18|419543|       3|        2|       1642|   12826|

In [20]:
# 12. Create a temporary table for the parquet data.
pdf = adf.createOrReplaceTempView('sales')

In [21]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

start_time = time.time()
spark.sql('SELECT DISTINCT view, ROUND(AVG(price), 2) AS Average_Price\
 FROM sales WHERE price >= 350000 GROUP BY view').show()


print("--- %s seconds ---" % (time.time() - start_time))

+----+-------------+
|view|Average_Price|
+----+-------------+
|   7|    403005.77|
|  51|    788128.21|
|  15|     404673.3|
|  54|    798684.82|
|  11|    399548.12|
|  29|    397771.65|
|  69|    750537.94|
|  42|     396964.5|
|  73|    752861.18|
|  87|    1072285.2|
|  64|    767036.67|
|   3|     398867.6|
|  30|     397862.0|
|  34|    401419.75|
|  59|     791453.0|
|   8|    398592.71|
|  28|    402124.62|
|  22|    402022.68|
|  85|   1056336.74|
|  35|    401934.21|
+----+-------------+
only showing top 20 rows

--- 0.9603877067565918 seconds ---


In [22]:
# 14. Uncache the home_sales temporary table.
spark.sql('uncache table sales')

DataFrame[]

In [23]:
# 15. Check if the home_sales is no longer cached

if spark.catalog.isCached("sales") or spark.catalog.isCached("sales"):
  print("a table is still cached")
else:
  print("all clear")

all clear


In [24]:
spark.sql('SHOW TABLES').show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |home_sales|       true|
|         |     homes|       true|
|         |     sales|       true|
+---------+----------+-----------+



In [26]:
# Pick all tables in 'agg' schema which contains word 'customer' in it. Usual pattern matching.(In your case, its oct)
xdf = spark.sql("show tables")

#https://stackoverflow.com/questions/70625290/delete-tables-in-batches-pyspark

# Iterate the dataframe that contains list of tables, and drop one by one.
for row in xdf.rdd.collect():
   print(f'Dropping table {row.tableName}')
   spark.sql(f'drop table {row.tableName}')

Dropping table home_sales
Dropping table homes
Dropping table sales


In [27]:
spark.sql('show tables').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+

