In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=2914ac6e1bb1a9b8a2247b98098a1e6913f6a51598c5871464da455f6d8191e8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
# Imports
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import time
from pyspark.sql.functions import year, round
import time

In [4]:
# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()


# Read in the AWS S3 bucket into a DataFrame
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv("file://" + SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)

In [5]:
# Extract 'year' from the 'date' column
df = df.withColumn("year", year("date"))

In [6]:
# Create a temporary table called home_sales
df.createOrReplaceTempView("home_sales")

In [7]:
# Exibir as colunas do DataFrame
df_columns = df.columns
print(df_columns)


['id', 'date', 'date_built', 'price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'year']


In [8]:
# 3. Query for average price of a four-bedroom house sold in each year
query3 = "SELECT year, ROUND(AVG(price), 2) as avg_price FROM home_sales WHERE bedrooms = 4 GROUP BY year"
result3 = spark.sql(query3)
result3.show()

+----+---------+
|year|avg_price|
+----+---------+
|2022|296363.88|
|2019| 300263.7|
|2020|298353.78|
|2021|301819.44|
+----+---------+



In [9]:
# 4. Query for average price of a home with three bedrooms and three bathrooms
query4 = "SELECT year, ROUND(AVG(price), 2) as avg_price FROM home_sales WHERE bedrooms = 3 AND bathrooms = 3 GROUP BY year"
result4 = spark.sql(query4)
result4.show()

+----+---------+
|year|avg_price|
+----+---------+
|2022|292725.69|
|2019|287287.82|
|2020|294204.16|
|2021|294211.46|
+----+---------+



In [14]:
# 5. Query for average price of a home with specific features for each year
query5 = "SELECT year, ROUND(AVG(price), 2) as avg_price FROM home_sales WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000 GROUP BY year"
result5 = spark.sql(query5)
result5.show()



+----+---------+
|year|avg_price|
+----+---------+
|2022|290242.99|
|2019|289859.14|
|2020|292289.09|
|2021|296330.96|
+----+---------+



In [15]:
# 6. Query for view rating for homes costing more than or equal to $350,000 with runtime
start_time = time.time()
query6 = "SELECT view, ROUND(AVG(price), 2) as avg_price FROM home_sales WHERE price >= 350000 GROUP BY view"
result6 = spark.sql(query6)
end_time = time.time()
runtime_query6 = "{:.2f}".format(end_time - start_time)
result6.show()
print(f"Runtime for query 6: {runtime_query6} seconds")


+----+----------+
|view| avg_price|
+----+----------+
|  31| 399856.95|
|  85|1056336.74|
|  65| 736679.93|
|  53|  755214.8|
|  78|1080649.37|
|  34| 401419.75|
|  81|1053472.79|
|  28| 402124.62|
|  76|1058802.78|
|  26| 401506.97|
|  27| 399537.66|
|  44| 400598.05|
|  12| 401501.32|
|  91|1137372.73|
|  22| 402022.68|
|  93|1026006.06|
|  47|  398447.5|
|   1| 401044.25|
|  52| 733780.26|
|  13| 398917.98|
+----+----------+
only showing top 20 rows

Runtime for query 6: 0.07 seconds


In [10]:
# 7. Cache the temporary table home_sales
spark.catalog.cacheTable("home_sales")

In [16]:
# 8. Run the cached query and measure runtime
start_time_cached = time.time()
result_cached = spark.sql(query6)
end_time_cached = time.time()
runtime_cached = "{:.2f}".format(end_time_cached - start_time_cached)
print(f"Runtime for cached query: {runtime_cached} seconds")


Runtime for cached query: 0.03 seconds


In [17]:
# 9. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").parquet("path/to/formatted_parquet_data")

In [18]:
# 10. Create a temporary table for the parquet data
parquet_df = spark.read.parquet("path/to/formatted_parquet_data")
parquet_df.createOrReplaceTempView("parquet_data")


In [19]:
# 11. Run the query on the parquet temporary table and measure runtime
start_time_parquet = time.time()
result_parquet = spark.sql(query6)
end_time_parquet = time.time()
runtime_parquet = end_time_parquet - start_time_parquet
runtime_parquet_formatted = "{:.2f}".format(runtime_parquet)
print(f"Runtime for parquet query: {runtime_parquet_formatted} seconds")

Runtime for parquet query: 0.07 seconds


In [23]:
# 12. Uncache the home_sales temporary table and verify
spark.catalog.uncacheTable("home_sales")
if spark.catalog.isCached("home_sales"):
    print("home_sales table is still cached.")
else:
    print("home_sales table is uncached.")


home_sales table is uncached.
