In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
print("starting data cleaning pipeline")

starting data cleaning pipeline


In [2]:
spark = SparkSession.builder \
    .appName("AirbnbPricePredictor") \
    .master("local[*]") \
    .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
    .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
    .getOrCreate()
print(f"spark session connected")


25/11/12 16:01:07 WARN Utils: Your hostname, MacBook-Pro-110.local resolves to a loopback address: 127.0.0.1; using 172.20.10.3 instead (on interface en0)
25/11/12 16:01:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/12 16:01:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/12 16:01:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


spark session connected
active tables: []


## 1. Load Raw Data from Previous Notebook

In [3]:
df_raw = spark.read.parquet("../data/processed/listings_raw.parquet")
print(f"raw data loaded: {df_raw.count():,} rows")
print(f"columns: {len(df_raw.columns)}")

                                                                                

raw data loaded: 173,028 rows
columns: 76


## 2. Clean Price Field

In [4]:
df_cleaned = df_raw.withColumn(
    "price_clean",
    regexp_replace(col("price"), "[$,]", "").cast("float")
)
null_count = df_cleaned.filter(col("price_clean").isNull()).count()
print("price cleaning:")
df_cleaned.select("price", "price_clean").show(10, truncate=False)
print(f"\ntotal rows: {df_cleaned.count():,}")
print(f"rows with null price_clean: {null_count:,}")
print(f"note: null rows will be removed in the next section")

price cleaning:
+---------+-----------+
|price    |price_clean|
+---------+-----------+
|$81.00   |81.0       |
|$110.00  |110.0      |
|NULL     |NULL       |
|NULL     |NULL       |
|$88.00   |88.0       |
|$1,000.00|1000.0     |
|$81.00   |81.0       |
|NULL     |NULL       |
|$90.00   |90.0       |
|$1,520.00|1520.0     |
+---------+-----------+
only showing top 10 rows


total rows: 173,028
rows with null price_clean: 54,788
note: null rows will be removed in the next section


25/11/12 16:01:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## 3. Filter Out Invalid Prices

In [6]:
print("filtering invalid prices...")
before_count = df_cleaned.count()
null_price_count = df_cleaned.filter(col("price_clean").isNull()).count()
zero_price_count = df_cleaned.filter(col("price_clean") == 0).count()
high_price_count = df_cleaned.filter(col("price_clean") >= 100000).count()


print(f"before filtering: {before_count:,} rows")
print(f"  null prices: {null_price_count:,}")
print(f"  zero prices: {zero_price_count:,}")
print(f"  prices >= $100,000: {high_price_count:,}")
df_cleaned = df_cleaned.filter(
    col("price_clean").isNotNull() &
    (col("price_clean") > 0) &
    (col("price_clean") < 100000)
)
after_count = df_cleaned.count()
removed_count = before_count - after_count


print(f"\nafter filtering: {after_count:,} rows")
print(f"removed: {removed_count:,} rows ({removed_count/before_count*100:.1f}%)")
print("\nprice statistics by city:")
df_cleaned.groupBy("city").agg(
    avg("price_clean").alias("avg_price"),
    expr("percentile(price_clean, 0.5)").alias("median_price"),
    min("price_clean").alias("min_price"),
    max("price_clean").alias("max_price")
).orderBy("city").show(truncate=False)


filtering invalid prices...
before filtering: 118,240 rows
  null prices: 0
  zero prices: 0
  prices >= $100,000: 0

after filtering: 118,240 rows
removed: 0 rows (0.0%)

price statistics by city:
+-----+-----------------+------------+---------+---------+
|city |avg_price        |median_price|min_price|max_price|
+-----+-----------------+------------+---------+---------+
|LA   |341.9924495505038|155.0       |8.0      |85000.0  |
|NYC  |680.5268192048012|154.0       |10.0     |50104.0  |
|Paris|252.5904847486396|150.0       |7.0      |30400.0  |
+-----+-----------------+------------+---------+---------+



## 4. Parse Bathrooms Field

In [7]:
df_cleaned = df_cleaned.withColumn(
    "bathrooms_clean",
    when(col("bathrooms_text").isNotNull(),
         regexp_extract(col("bathrooms_text"), r"(\d+\.?\d*)", 1).cast("float")
    ).otherwise(1.0)
)
print("bathrooms parsing:")
df_cleaned.select("bathrooms_text", "bathrooms_clean").show(10, truncate=False)

# note: we're not differentiating between shared and private bath

bathrooms parsing:
+--------------+---------------+
|bathrooms_text|bathrooms_clean|
+--------------+---------------+
|1 shared bath |1.0            |
|1 private bath|1.0            |
|1 shared bath |1.0            |
|1.5 baths     |1.5            |
|1 private bath|1.0            |
|1 shared bath |1.0            |
|6.5 baths     |6.5            |
|1 bath        |1.0            |
|1 bath        |1.0            |
|1 bath        |1.0            |
+--------------+---------------+
only showing top 10 rows



## 5. Handle Missing Values in Bedrooms/Beds

In [9]:
from pyspark.sql.window import Window
print("missing beds and bedrooms:")
print(f"bedrooms nulls: {df_cleaned.filter(col('bedrooms').isNull()).count()}")
print(f"beds nulls: {df_cleaned.filter(col('beds').isNull()).count()}")


temp_cols = ["median_bedrooms"]
for temp_col in temp_cols:
    if temp_col in df_cleaned.columns:
        df_cleaned = df_cleaned.drop(temp_col)

bedroom_medians = df_cleaned.groupBy("property_type").agg(
    expr("percentile(bedrooms, 0.5)").alias("median_bedrooms")
)


df_cleaned = df_cleaned.join(bedroom_medians, "property_type", "left")
df_cleaned = df_cleaned.withColumn(
    "bedrooms_clean",
    when(col("bedrooms").isNull(), col("median_bedrooms")).otherwise(col("bedrooms"))
)
df_cleaned = df_cleaned.withColumn(
    "beds_clean",
    when(col("beds").isNull(), (col("accommodates") / 2).cast("int"))
    .otherwise(col("beds"))
)
df_cleaned = df_cleaned.drop("median_bedrooms")
print("\nmissing beds and bedrooms after cleaning:")
print(f"bedrooms nulls: {df_cleaned.filter(col('bedrooms_clean').isNull()).count()}")
print(f"beds nulls: {df_cleaned.filter(col('beds_clean').isNull()).count()}")

missing beds and bedrooms:
bedrooms nulls: 343
beds nulls: 287

missing beds and bedrooms cleaned:
bedrooms nulls: 0
beds nulls: 0


## 6. Parse Host Response Rate

In [10]:
df_cleaned = df_cleaned.withColumn(
    "host_response_rate_clean",
    when(col("host_response_rate").isNotNull(),
         regexp_replace(col("host_response_rate"), "%", "").cast("float") / 100
    ).otherwise(0.0)
)
df_cleaned = df_cleaned.withColumn(
    "host_response_rate_clean",
    when(col("host_response_rate_clean").isNull(), 0.0).otherwise(col("host_response_rate_clean"))
)
print("host response rate parsing:")
df_cleaned.select("host_response_rate", "host_response_rate_clean").show(10, truncate=False)

host response rate parsing:
+------------------+------------------------+
|host_response_rate|host_response_rate_clean|
+------------------+------------------------+
|100%              |1.0                     |
|100%              |1.0                     |
|100%              |1.0                     |
|100%              |1.0                     |
|100%              |1.0                     |
|N/A               |0.0                     |
|100%              |1.0                     |
|N/A               |0.0                     |
|100%              |1.0                     |
|100%              |1.0                     |
+------------------+------------------------+
only showing top 10 rows



## 7. Convert Boolean Fields

In [11]:
boolean_cols = ["host_is_superhost", "instant_bookable", "has_availability"]
for col_name in boolean_cols:
    if col_name in df_cleaned.columns:
        df_cleaned = df_cleaned.withColumn(
            f"{col_name}_clean",
            when(col(col_name) == "t", 1).otherwise(0)
        )
print("boolean conversions complete")
print("post cleaning:")
df_cleaned.select("host_is_superhost", "host_is_superhost_clean").show(10, truncate=False)
df_cleaned.select("instant_bookable", "instant_bookable_clean").show(10, truncate=False)
df_cleaned.select("has_availability", "has_availability_clean").show(10, truncate=False)


boolean conversions complete
post cleaning:
+-----------------+-----------------------+
|host_is_superhost|host_is_superhost_clean|
+-----------------+-----------------------+
|t                |1                      |
|f                |0                      |
|t                |1                      |
|t                |1                      |
|f                |0                      |
|f                |0                      |
|f                |0                      |
|f                |0                      |
|f                |0                      |
|t                |1                      |
+-----------------+-----------------------+
only showing top 10 rows

+----------------+----------------------+
|instant_bookable|instant_bookable_clean|
+----------------+----------------------+
|f               |0                     |
|f               |0                     |
|t               |1                     |
|f               |0                     |
|f               |0 

## 8. Handle Review Scores

In [12]:
review_cols = [
    "review_scores_rating", "review_scores_accuracy",
    "review_scores_cleanliness", "review_scores_checkin",
    "review_scores_communication", "review_scores_location",
    "review_scores_value"
]
for col_name in review_cols:
    if col_name in df_cleaned.columns:
        median_val = df_cleaned.agg(
            expr(f"percentile({col_name}, 0.5)").alias("median")
        ).collect()[0]["median"]
        df_cleaned = df_cleaned.withColumn(
            f"{col_name}_clean",
            when(col(col_name).isNull(), median_val).otherwise(col(col_name))
        )
df_cleaned = df_cleaned.withColumn(
    "reviews_per_month",
    when(col("reviews_per_month").isNull(), 0).otherwise(col("reviews_per_month"))
)
for col_name in review_cols:
    print(f"nulls: {df_cleaned.filter(col(col_name).isNull()).count()}")
    before_df = df_cleaned.select(col_name).limit(10).toPandas()
    after_df = df_cleaned.select(f"{col_name}_clean").limit(10).toPandas()
    side_by_side = before_df.copy()
    side_by_side[f"{col_name}_clean"] = after_df[f"{col_name}_clean"]
    print(f"\n{col_name} (before -> after):")
    print(side_by_side.head(10).to_string(index=False))


nulls: 30951

review_scores_rating (before -> after):
 review_scores_rating  review_scores_rating_clean
                 4.87                        4.87
                 4.41                        4.41
                 4.86                        4.86
                 4.89                        4.89
                 4.83                        4.83
                 4.72                        4.72
                 4.96                        4.96
                 4.77                        4.77
                 4.84                        4.84
                 4.93                        4.93
nulls: 30961

review_scores_accuracy (before -> after):
 review_scores_accuracy  review_scores_accuracy_clean
                   4.82                          4.82
                   4.26                          4.26
                   4.85                          4.85
                   4.84                          4.84
                   4.90                          4.90
                

## 9. Drop Irrelevant Columns

In [13]:
drop_cols = [
    "listing_url", "scrape_id", "last_scraped", "picture_url",
    "host_url", "host_thumbnail_url", "host_picture_url",
    "description", "neighborhood_overview", "host_about",
    "host_neighbourhood", "neighbourhood", "neighbourhood_group_cleansed",
    "license", "bathrooms", "calendar_updated", "calendar_last_scraped"
]
existing_drop_cols = [c for c in drop_cols if c in df_cleaned.columns]
df_cleaned = df_cleaned.drop(*existing_drop_cols)
print(f"dropped {len(existing_drop_cols)} irrelevant columns")
print(f"remaining columns: {len(df_cleaned.columns)}")

dropped 17 irrelevant columns
remaining columns: 74


## 10. Select Final Clean Columns

In [14]:
final_cols = [
    "id", "host_id", "city",
    "latitude", "longitude", "neighbourhood_cleansed",
    "property_type", "room_type", "accommodates",
    "bedrooms_clean", "beds_clean", "bathrooms_clean",
    "price_clean", "minimum_nights", "maximum_nights",
    "availability_30", "availability_60", "availability_90", "availability_365",
    "host_since", "host_response_rate_clean",
    "host_is_superhost_clean", "host_listings_count",
    "number_of_reviews", "reviews_per_month",
    "review_scores_rating_clean",
    "instant_bookable_clean"
]
final_cols = [c for c in final_cols if c in df_cleaned.columns]
df_final = df_cleaned.select(final_cols)
rename_map = {
    "bedrooms_clean": "bedrooms",
    "beds_clean": "beds",
    "bathrooms_clean": "bathrooms",
    "price_clean": "price",
    "host_response_rate_clean": "host_response_rate",
    "host_is_superhost_clean": "host_is_superhost",
    "instant_bookable_clean": "instant_bookable",
    "review_scores_rating_clean": "review_scores_rating"
}
for old_name, new_name in rename_map.items():
    if old_name in df_final.columns:
        df_final = df_final.withColumnRenamed(old_name, new_name)
print(f"final cleaned dataset: {df_final.count():,} rows, {len(df_final.columns)} columns")

final cleaned dataset: 118,240 rows, 27 columns


## 11. Data Quality Summary

In [15]:
from pyspark.sql.functions import col, isnan
print("=" * 70)
print("cleaned data summary")
print("=" * 70)
print("\nlistings by city:")
df_final.groupBy("city").count().orderBy(col("count").desc()).show()
print("\nnull values in final dataset:")
key_cols = ["price", "latitude", "longitude", "bedrooms", "beds", "accommodates"]
null_check = []
for col_name in key_cols:
    if col_name in df_final.columns:
        null_count = df_final.filter(col(col_name).isNull() | isnan(col(col_name))).count()
        null_check.append((col_name, null_count))
print("{:>12} {:>10}".format("column", "nulls"))
for col_name, null_count in null_check:
    print("{:>12} {:>10}".format(col_name, null_count))
print("\nprice statistics:")
df_final.select("price").describe().show()


cleaned data summary

listings by city:
+-----+-----+
| city|count|
+-----+-----+
|Paris|60093|
|   LA|36819|
|  NYC|21328|
+-----+-----+


null values in final dataset:
      column      nulls
       price          0
    latitude          0
   longitude          0
    bedrooms          0
        beds          0
accommodates          0

price statistics:
+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            118240|
|   mean| 357.6202300405954|
| stddev|2197.2840847020575|
|    min|               7.0|
|    max|           85000.0|
+-------+------------------+



## 12. Save Cleaned Data

In [17]:
output_path = "../data/processed/listings_clean.parquet"
df_final.cache()
print("saving cleaned data...")
df_final.write.mode("overwrite").parquet(output_path)
print(f"data saved to {output_path}")
print(f"rows: {df_final.count():,}")
print(f"columns: {len(df_final.columns)}")

25/11/12 16:03:54 WARN CacheManager: Asked to cache already cached data.


saving cleaned data...
data saved to ../data/processed/listings_clean.parquet
rows: 118,240
columns: 27


## 13. Verify Data Quality

In [18]:
print("sample of cleaned data:")
df_final.show(10, truncate=False)
print("columns in cleaned data:")
print(df_final.columns)
num_rows = df_final.count()
num_cols = len(df_final.columns)
print(f"\nshape of cleaned data: ({num_rows:,} rows, {num_cols} columns)")

sample of cleaned data:
+------+-------+----+--------+----------+----------------------+---------------------------+---------------+------------+--------+----+---------+------+--------------+--------------+---------------+---------------+---------------+----------------+----------+------------------+-----------------+-------------------+-----------------+-----------------+--------------------+----------------+
|id    |host_id|city|latitude|longitude |neighbourhood_cleansed|property_type              |room_type      |accommodates|bedrooms|beds|bathrooms|price |minimum_nights|maximum_nights|availability_30|availability_60|availability_90|availability_365|host_since|host_response_rate|host_is_superhost|host_listings_count|number_of_reviews|reviews_per_month|review_scores_rating|instant_bookable|
+------+-------+----+--------+----------+----------------------+---------------------------+---------------+------------+--------+----+---------+------+--------------+--------------+--------------

25/11/12 16:30:48 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 201583 ms exceeds timeout 120000 ms
25/11/12 16:30:48 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/12 16:30:51 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$