#Read CSV file

In [0]:
airbnb_raw_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/meghanasharma300@gmail.com/Airbnb_datasets.csv")

#Define All lib here

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *  
     

#EXTRACT
Read with Early Filtering (Optimization #1)-> (e.g., only listings from Brooklyn)

In [0]:
airbnb_raw_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", True) \
    .load("dbfs:/FileStore/shared_uploads/meghanasharma300@gmail.com/Airbnb_datasets.csv") \
    .filter(col("neighbourhood_group") == "Brooklyn")


Checking Datatypes and few records

In [0]:
airbnb_raw_df.printSchema()
display(airbnb_raw_df.limit(5))


root
 |-- id: double (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- bedrooms: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: string (nullable = true)



id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,number_of_reviews_ltm,license,rating,bedrooms,beds,baths
1312228.0,Rental unit in Brooklyn · ★5.0 · 1 bedroom,7130382,Walter,Brooklyn,Clinton Hill,40.68371,-73.96461,Private room,55.0,30,3,2015-12-20,0.03,1,0,0,No License,5.0,1,1,Not specified
19280212.0,Rental unit in Brooklyn · ★4.79 · 1 bedroom · 1 bed · 1 shared bath,2526182,Margaux,Brooklyn,Williamsburg,40.71153,-73.95312,Private room,90.0,30,19,2023-10-01,0.24,2,5,2,No License,4.79,1,1,1
8.96e+17,Rental unit in Brooklyn · ★4.67 · 1 bedroom · 1 bed · 1 bath,14251313,Kristin,Brooklyn,Sunset Park,40.65891327,-73.98958973,Entire home/apt,292.0,30,12,2023-10-19,1.71,1,365,12,No License,4.67,1,1,1
22066430.0,Rental unit in Brooklyn · ★4.71 · Studio · 1 bed · 1 bath,20604809,Houman,Brooklyn,Williamsburg,40.7132,-73.96176,Entire home/apt,160.0,30,49,2023-10-08,0.67,1,0,7,No License,4.71,Studio,1,1
34511726.0,Rental unit in Brooklyn · ★4.65 · 1 bedroom · 1 bed · 1 bath,260475429,Zahira,Brooklyn,Kensington,40.64304,-73.97964,Entire home/apt,120.0,30,26,2020-03-15,0.47,2,0,0,No License,4.65,1,1,1


#CLEAN
Casting Datatypes,Handling Null\
Clean price, cast columns, drop rows with key nulls

In [0]:

airbnb_cleaned_df = airbnb_raw_df \
    .withColumn("price_clean", regexp_replace(col("price"), "[$,]", "").cast("float")) \
    .withColumn("reviews_per_month", col("reviews_per_month").cast("float")) \
    .withColumn("last_review", col("last_review").cast("date")) \
    .dropna(subset=["price_clean", "latitude", "longitude", "reviews_per_month", "last_review"])


#Business cleaning goals:

Accurate price analysis
Valid reviews for engagement trends

#Transform
Price segmentation

Host performance

Room size comparison

In [0]:
airbnb_transformed_df = airbnb_cleaned_df \
    .withColumn("price_category", when(col("price_clean") < 50, "Low")
                                   .when((col("price_clean") >= 50) & (col("price_clean") <= 150), "Medium")
                                   .otherwise("High")) \
    .withColumn("availability_rate", (col("availability_365") / 365).cast("float")) \
    .withColumn("room_capacity", concat_ws(" | ", col("bedrooms"), col("beds"), col("baths")))

print("✅ Job completed successfully!")


✅ Job completed successfully!


#(Optimization #2)
Cache Transformed Data 

In [0]:
airbnb_transformed_df.cache()

Out[7]: DataFrame[id: double, name: string, host_id: int, host_name: string, neighbourhood_group: string, neighbourhood: string, latitude: double, longitude: double, room_type: string, price: double, minimum_nights: int, number_of_reviews: int, last_review: date, reviews_per_month: float, calculated_host_listings_count: int, availability_365: int, number_of_reviews_ltm: int, license: string, rating: string, bedrooms: string, beds: int, baths: string, price_clean: float, price_category: string, availability_rate: float, room_capacity: string]

#Handling Success and Failure During Transformations

In [0]:
try:
    print("🔹 Transforming columns...")

    airbnb_transformed_df = airbnb_cleaned_df \
        .withColumn("price_category", when(col("price_clean") < 50, "Low")
                                       .when((col("price_clean") >= 50) & (col("price_clean") <= 150), "Medium")
                                       .otherwise("High")) \
        .withColumn("availability_rate", (col("availability_365") / 365).cast("float")) \
        .withColumn("room_capacity", concat_ws(" | ", col("bedrooms"), col("beds"), col("baths")))

    airbnb_transformed_df.cache()

    print("✅ Transformation job ran successfully.")

except Exception as e:
    print("❌ Transformation job failed.")
    print(f"⚠️ Error details: {e}")

🔹 Transforming columns...
✅ Transformation job ran successfully.


#LOAD-Incremental Load
Save as Parquet with Partition on price catageory and compress it (Optimizations #3 & #4)

In [0]:
airbnb_transformed_df.write.mode("append") \
    .partitionBy("price_category") \
    .option("compression", "snappy") \
    .parquet("/FileStore/airbnb/processed_partitioned")


#Create Temp Table for SQL
For Global add global keyword later--dont 4get

In [0]:
airbnb_transformed_df.createOrReplaceTempView("airbnb_cleaned")


RETRIEVE WITH sql

In [0]:
%sql
SELECT room_type, price_category, COUNT(*) AS total
FROM airbnb_cleaned
GROUP BY room_type, price_category
ORDER BY total DESC


room_type,price_category,total
Private room,Medium,2314
Entire home/apt,High,2278
Entire home/apt,Medium,1831
Private room,Low,654
Private room,High,487
Shared room,Low,46
Shared room,Medium,44
Shared room,High,30
Entire home/apt,Low,6
Hotel room,High,3


In [0]:
%sql
SELECT  COUNT(1)
FROM airbnb_cleaned

count(1)
7694


#SQL-Based Visualizations

**1**. most popular room types among hosts-Bar chart

In [0]:
%sql
SELECT room_type, COUNT(*) AS listing_count
FROM airbnb_cleaned
GROUP BY room_type
ORDER BY listing_count DESC

room_type,listing_count
Entire home/apt,4115
Private room,3455
Shared room,120
Hotel room,4


Databricks visualization. Run in Databricks to view.

**2**. Monthly Review Count Trend\
Identified peak booking seasons

In [0]:
%sql
SELECT date_trunc('month', last_review) AS review_month,
        COUNT(*) AS total_reviews
FROM airbnb_cleaned
WHERE last_review IS NOT NULL
GROUP BY review_month
ORDER BY review_month


review_month,total_reviews
2012-06-01T00:00:00.000+0000,1
2012-07-01T00:00:00.000+0000,1
2012-09-01T00:00:00.000+0000,1
2013-03-01T00:00:00.000+0000,1
2013-04-01T00:00:00.000+0000,1
2013-05-01T00:00:00.000+0000,1
2013-09-01T00:00:00.000+0000,1
2013-11-01T00:00:00.000+0000,1
2014-01-01T00:00:00.000+0000,2
2014-05-01T00:00:00.000+0000,1


Databricks visualization. Run in Databricks to view.