In [None]:
# Required module for Spark initialization
import findspark
findspark.init()




In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Initialize a Spark session
spark_session = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Local file path (corrected)
local_csv_file = "file:///C:/Users/adity/Downloads/home_sales_revised.csv"

# Reading the local CSV file into a DataFrame
df_home_sales = spark_session.read.csv(local_csv_file, header=True, inferSchema=True)
df_home_sales.show()








In [None]:
import requests
from pyspark.sql import SparkSession

# Corrected URL of the CSV file
s3_url = "https://correct-s3-url/path/to/home_sales_revised.csv"

# Local file path to save the downloaded CSV file
local_csv_file = "C:/Users/adity/Downloads/home_sales_revised.csv"

# Downloading the file from S3 with error handling
try:
    response = requests.get(s3_url)
    response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
    with open(local_csv_file, 'wb') as file:
        file.write(response.content)
except requests.exceptions.HTTPError as errh:
    print(f"Http Error: {errh}")
except requests.exceptions.ConnectionError as errc:
    print(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
    print(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
    print(f"OOps: Something Else: {err}")

# Initialize a Spark session
spark_session = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Reading the local CSV file into a DataFrame
df_home_sales = spark_session.read.csv("file:///" + local_csv_file, header=True, inferSchema=True)
df_home_sales.show()







In [None]:

if 'df_home_sales' in locals() and not df_home_sales.rdd.isEmpty():
    df_home_sales.createOrReplaceTempView('home_sales_view')
else:
    print('DataFrame not loaded correctly or is empty!')






In [None]:
# Example Query to calculate average price of 4 bedroom houses sold each year
query_avg_price_4_bedrooms = """
SELECT YEAR(date) AS Year,
       ROUND(AVG(price), 2) AS Average_Price
FROM home_sales_view
WHERE bedrooms = 4
GROUP BY Year
ORDER BY Year DESC
"""
result = spark_session.sql(query_avg_price_4_bedrooms)
result.show()






In [None]:
# Query for average price of homes with 3 bedrooms and 3 bathrooms per year built
query_avg_price_3_bed_3_bath = """
SELECT date_built AS Year_Built,
       ROUND(AVG(price), 2) AS Average_Price
FROM home_sales_view
WHERE bedrooms = 3 AND bathrooms = 3
GROUP BY Year_Built
ORDER BY Year_Built DESC
"""
spark_session.sql(query_avg_price_3_bed_3_bath).show()




In [None]:
# Query for average price of specific homes per year built
query_specific_homes = """
SELECT date_built AS Year_Built,
       ROUND(AVG(price), 2) AS Average_Price
FROM home_sales_view
WHERE bedrooms = 3 AND bathrooms = 3 AND sqft_living >= 2000 AND floors = 2
GROUP BY Year_Built
ORDER BY Year_Built DESC
"""
spark_session.sql(query_specific_homes).show()





In [None]:
# Query for average price and view rating of homes
import time
start_time_query = time.time()
query_avg_price_view = """
SELECT view, 
       ROUND(AVG(price), 2) AS Average_Price
FROM home_sales_view
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC
"""
spark_session.sql(query_avg_price_view).show()
print("--- %s seconds ---" % (time.time() - start_time_query))


In [None]:
# Caching the temporary table and checking if it's cached
spark_session.sql('CACHE TABLE home_sales_view')
is_cached = spark_session.catalog.isCached('home_sales_view')
print("Is home_sales_view cached? ", is_cached)




In [None]:
# Check if the table is cached.
is_cached = spark_session.catalog.isCached('home_sales_view')



In [None]:
# Using the cached data for the view ratings query
start_time_cached = time.time()

query_view_ratings_cached = """
SELECT view, 
       ROUND(AVG(price), 2) AS Average_Price
FROM home_sales_view
GROUP BY view
HAVING AVG(price) >= 350000
ORDER BY view DESC
"""
spark_session.sql(query_view_ratings_cached).show()

print("--- %s seconds ---" % (time.time() - start_time_cached))



In [None]:
# Ensure you have the DataFrame df_home_sales loaded at this point
# Writing the DataFrame to partitioned Parquet format
output_parquet_path = "C:/Users/adity/Downloads/partitioned_home_sales"

try:
    df_home_sales.write.partitionBy("date_built").mode("overwrite").parquet(output_parquet_path)
    print("DataFrame successfully written to partitioned Parquet format.")
except Exception as e:
    print(f"Error occurred: {e}")




In [None]:
# Reading the partitioned Parquet data
try:
    df_partitioned_home_sales = spark_session.read.parquet(output_parquet_path)
    print("Successfully read from partitioned Parquet format.")
    df_partitioned_home_sales.show()
except Exception as e:
    print(f"Error occurred while reading Parquet data: {e}")



In [None]:
# Adjust the path to where your partitioned Parquet files are stored
parquet_path = "C:/path_to_partitioned_parquet_data"

# Read the partitioned Parquet data into a DataFrame
try:
    df_partitioned_home_sales = spark_session.read.parquet(parquet_path)
    print("Partitioned Parquet data loaded successfully.")
except Exception as e:
    print(f"Error loading Parquet data: {e}")




In [None]:
# Create or replace a temporary view using the DataFrame
try:
    df_partitioned_home_sales.createOrReplaceTempView("partitioned_home_sales_view")
    print("Temporary view created successfully.")
except Exception as e:
    print(f"Error creating temporary view: {e}")



In [None]:
# Example query on the temporary view
try:
    result = spark_session.sql("""
        SELECT some_column, COUNT(*)
        FROM partitioned_home_sales_view
        GROUP BY some_column
    """)
    result.show()
except Exception as e:
    print(f"Error executing query: {e}")



In [None]:
# Uncaching the temporary table
try:
    # Check if the table is cached
    if spark_session.catalog.isCached("partitioned_home_sales_view"):
        spark_session.sql("UNCACHE TABLE partitioned_home_sales_view")
        print("Temporary view uncached successfully.")
    else:
        print("Temporary view is not cached.")
except Exception as e:
    print(f"Error uncaching temporary view: {e}")



