In [21]:
import os

# Set Spark version and URL
spark_version = "3.4.1"
spark_filename = f"spark-{spark_version}-bin-hadoop3"
mirror_url = f"https://archive.apache.org/dist/spark/spark-{spark_version}/{spark_filename}.tgz"

# Install Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark
!wget -q {mirror_url}
!tar -xzf {spark_filename}.tgz
!mv {spark_filename} /content/spark

# Install findspark
!pip install -q findspark

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark"

# Initialize Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ColabSpark").getOrCreate()

# Test
spark.range(5).show()


'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'spark-3.4.1-bin-hadoop3.tgz'
'mv' is not recognized as an internal or external command,
operable program or batch file.


FileNotFoundError: [WinError 2] The system cannot find the file specified

In [None]:
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# 1. Read the home_sales_revised.csv from the provided AWS S3 bucket location into a PySpark DataFrame.
from pyspark import SparkFiles
# Load the dataset from S3
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
spark.sparkContext.addFile(url)

# Read with inferred schema and convert date column
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)
df = df.withColumn("date", to_date(col("date")))
df.show()


In [None]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView("home_sales")


In [None]:
# 3. What is the average price for a four bedroom house sold per year, rounded to two decimal places?
avg_4bdr_per_year = spark.sql("""
    SELECT YEAR(date) AS year, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 4
    GROUP BY year
    ORDER BY year
""")
avg_4bdr_per_year.show()


In [None]:
# 4. What is the average price of a home for each year the home was built,
# that have 3 bedrooms and 3 bathrooms, rounded to two decimal places?
avg_3bdr_3bth_per_year = spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
""")
avg_3bdr_3bth_per_year.show()

In [None]:
# 5. What is the average price of a home for each year the home was built,
# that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet, rounded to two decimal places?
avg_filtered = spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    WHERE bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
    GROUP BY date_built
    ORDER BY date_built
""")
avg_filtered.show()

In [None]:
# 6. What is the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000? Order by descending view rating.
# Although this is a small dataset, determine the run time for this query.
start_time = time.time()

avg_price_per_view = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view DESC
""")
avg_price_per_view.show()

print("--- %s seconds ---" % (time.time() - start_time))

In [22]:
# 7. Cache the the temporary table home_sales.
spark.sql("CACHE TABLE home_sales")


NameError: name 'spark' is not defined

In [23]:
# 8. Check if the table is cached.
spark.catalog.isCached("home_sales")

NameError: name 'spark' is not defined

In [24]:
# Save partitioned Parquet files by date_built
df.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")

NameError: name 'df' is not defined

In [25]:
# 9. Using the cached data, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the uncached runtime.

parquet_df = spark.read.parquet("home_sales_partitioned")
parquet_df.createOrReplaceTempView("home_sales_partitioned")

# Re-run one analysis
avg_parquet = spark.sql("""
    SELECT date_built, ROUND(AVG(price), 2) AS avg_price
    FROM home_sales_partitioned
    WHERE bedrooms = 3 AND bathrooms = 3
    GROUP BY date_built
    ORDER BY date_built
""")
avg_parquet.show()


NameError: name 'spark' is not defined

In [26]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").mode("overwrite").parquet("home_sales_partitioned")


NameError: name 'df' is not defined

In [27]:
# 11. Read the parquet formatted data.
partitioned_home_sales = spark.read.parquet("home_sales_partitioned")
partitioned_home_sales.printSchema()
partitioned_home_sales.show(5)

NameError: name 'spark' is not defined

In [28]:
# 12. Create a temporary table for the parquet data.
parq_df = partitioned_home_sales.createOrReplaceTempView('home_sales_partitioned')


NameError: name 'partitioned_home_sales' is not defined

In [29]:
# 13. Using the parquet DataFrame, run the last query above, that calculates
# the average price of a home per "view" rating, rounded to two decimal places,
# having an average home price greater than or equal to $350,000.
# Determine the runtime and compare it to the cached runtime.

start_time = time.time()

avg_price_per_view_parquet = spark.sql("""
    SELECT view, ROUND(AVG(price), 2) AS avg
    FROM home_sales_partitioned
    GROUP BY view
    HAVING AVG(price) >= 350000
    ORDER BY view DESC
""")

avg_price_per_view_parquet.show()


print("--- %s seconds ---" % (time.time() - start_time))

NameError: name 'spark' is not defined

In [30]:
# 14. Uncache the home_sales temporary table.
spark.sql("UNCACHE TABLE home_sales")
spark.catalog.isCached("home_sales")


NameError: name 'spark' is not defined

In [31]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")


NameError: name 'spark' is not defined