In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
     ---------------------------------------- 0.0/310.8 MB ? eta -:--:--
     ---------------------------------------- 0.0/310.8 MB 1.3 MB/s eta 0:04:03
     ---------------------------------------- 0.0/310.8 MB 1.3 MB/s eta 0:04:03
     -------------------------------------- 0.0/310.8 MB 281.8 kB/s eta 0:18:23
     -------------------------------------- 0.1/310.8 MB 525.1 kB/s eta 0:09:52
     -------------------------------------- 0.2/310.8 MB 706.2 kB/s eta 0:07:20
     -------------------------------------- 0.2/310.8 MB 692.9 kB/s eta 0:07:29
     -------------------------------------- 0.2/310.8 MB 692.4 kB/s eta 0:07:29
     -------------------------------------- 0.3/310.8 MB 774.0 kB/s eta 0:06:42
     -------------------------------------- 0.3/310.8 MB 759.5 kB/s eta 0:06:49
     -------------------------------------- 0.4/310.8 MB 796.7 kB/s eta 0:06:30
     -------------------------------------- 0.4/310.8 MB 807.8

In [5]:
!pip install py4j



In [9]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"  # You might need to adjust the Python path here based on your system
os.environ["PYTHONPATH"] = f"{os.environ['SPARK_HOME']}/python/lib/py4j-0.10.9.7-src.zip:{os.environ.get('PYTHONPATH', '')}"
# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-hadoop3.tgz'


Exception: Unable to find py4j in /content/spark-3.4.0-bin-hadoop3\python, your SPARK_HOME may not be configured correctly

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# 1. Read in the AWS S3 bucket into a DataFrame.
from pyspark import SparkFiles

url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)

df_home_sales = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), header=True, inferSchema=True)
df_home_sales.show(5)

In [None]:
# 2. Create a temporary view of the DataFrame.
df_home_sales.createOrReplaceTempView("home_sales")

In [None]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
avg_price_4b = spark.sql("""
    select year(date) as year,
           round(avg(price),2) as avg_price
    from home_sales
    where bedrooms = 4
    group by year(date)
    order by year desc
    """)

avg_price_4b.show(5)

In [None]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
avg_price_3b = spark.sql("""
    select year(date) as year,
           round(avg(price),2) as avg_price
    from home_sales
    where bedrooms = 3
    AND bathrooms = 3
    group by year(date)
    order by year desc
    """)

avg_price_3b.show(5)

In [None]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?
avg_price_3b_2floors = spark.sql("""
    select year(date) as year,
           round(avg(price),2) as avg_price
    from home_sales
    where bedrooms = 3
    AND bathrooms = 3
    AND floors = 2
    AND sqft_living > 2000
    group by year(date)
    order by year desc
    """)

avg_price_3b_2floors.show(5)

In [None]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()
view_rating = spark.sql("""
  select view,
         round(avg(price),2) as avg_price
  from home_sales
  where price >= 350000
  group by view
  order by view desc
                        """)
view_rating.show(10)

print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# 7. Cache the the temporary table home_sales.
spark.sql("CACHE TABLE home_sales")

In [None]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

In [None]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()

view_rating = spark.sql("""
  select view,
         round(avg(price),2) as avg_price
  from home_sales
  group by view
  having avg(price) >= 350000
  order by view desc
                        """)
view_rating.show(10)


print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
!mkdir /content/output/

In [None]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df_home_sales.write.option("header", True) \
        .partitionBy("date_built") \
        .mode("overwrite") \
        .parquet('/content/output')

In [None]:
# 11. Read the parquet formatted data.
df_parquet = spark.read.parquet("/content/output")
df_parquet.show(5)

In [None]:
# 12. Create a temporary table for the parquet data.
df_parquet.registerTempTable("temp_table")

In [None]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.

start_time = time.time()

view_rating_parquet = spark.sql("""
  select view,
         round(avg(price),2) as avg_price
  from temp_table
  group by view
  having avg(price) >= 350000
  order by view desc
                        """)
view_rating_parquet.show(5)


print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# 14. Uncache the home_sales temporary table.
spark.sql("CLEAR CACHE")

In [None]:
%rm -rf /content/output/*