# Notebook 02: Data Cleaning
# Clean and preprocess data, handle missing values, create Silver layer

## Import libraries

In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import mlflow
from delta.tables import *

## Initialize

In [0]:
%python
spark = SparkSession.builder.appName("Hotel_Churn_Cleaning").getOrCreate()


# Load Dataset From Bronze

In [0]:
%python
df = spark.table("hotel_catalog.bronze.raw_hotel_bookings")
print(f"Initial record count: {df.count():,}")

## Data quality report - Focus on NULL values

In [0]:
%python
print("Columns with NULL (actual null, not string) values:")
for col_name in df.columns:
    null_count = df.filter(col(col_name).isNull()).count()
    if null_count > 0:
        percentage = (null_count / df.count()) * 100
        print(f"  {col_name}: {null_count:,} NULL values ({percentage:.2f}%)")

# Check for string "NULL" values
print("\n=== Checking for string 'NULL' values ===")
string_null_columns = {}
for col_name in df.columns:
    # Check if column contains string "NULL"
    string_null_count = df.filter(trim(col(col_name)) == "NULL").count()
    if string_null_count > 0:
        string_null_columns[col_name] = string_null_count
        print(f"  {col_name}: {string_null_count:,} string 'NULL' values found")

## Handle the 'children' column properly

In [0]:
%python
print("\n=== Cleaning 'children' column ===")

# First, show the current data type and sample values
print("Current data type of 'children' column:")
df.select("children").printSchema()

print("\nSample distinct values in 'children' column:")
df.select("children").distinct().show(20)

# Clean the children column: Convert string values to integer
df_clean = df.withColumn(
    "children_clean",
    when(
        col("children").isNull(), 0  # Actual NULL values
    ).when(
        trim(col("children")) == "NULL", 0  # String "NULL" values
    ).when(
        trim(col("children")) == "NA", 0  # String "NA" values
    ).when(
        trim(col("children")) == "", 0  # Empty strings
    ).otherwise(
        col("children").cast(IntegerType())  # Convert valid numbers
    )
)

# Drop the original children column and rename
df_clean = df_clean.drop("children").withColumnRenamed("children_clean", "children")

print("\n'children' column after cleaning:")
df_clean.select("children").summary().show()

## Handle 'agent' and 'company' columns (which have NULL string values)

In [0]:
%python
print("\n=== Cleaning 'agent' and 'company' columns ===")

# For 'agent' column
df_clean = df_clean.withColumn(
    "agent_clean",
    when(
        col("agent").isNull(), 0  # Actual NULL values
    ).when(
        trim(col("agent")) == "NULL", 0  # String "NULL" values
    ).otherwise(
        col("agent").cast(IntegerType())  # Convert to integer
    )
)

# For 'company' column  
df_clean = df_clean.withColumn(
    "company_clean",
    when(
        col("company").isNull(), 0  # Actual NULL values
    ).when(
        trim(col("company")) == "NULL", 0  # String "NULL" values
    ).otherwise(
        col("company").cast(IntegerType())  # Convert to integer
    )
)

# Drop original columns and rename cleaned ones
df_clean = df_clean.drop("agent", "company") \
    .withColumnRenamed("agent_clean", "agent") \
    .withColumnRenamed("company_clean", "company")

print("\n'agent' column after cleaning:")
df_clean.select("agent").summary().show()

print("\n'company' column after cleaning:")
df_clean.select("company").summary().show()

##  Handle 'country' column (has some NULL values)

In [0]:
%python
print("\n=== Cleaning 'country' column ===")

df_clean = df_clean.withColumn(
    "country_clean",
    when(
        (col("country").isNull()) | 
        (trim(col("country")) == "NULL") |
        (trim(col("country")) == ""),
        "UNK"  # Use "UNK" for unknown/missing country
    ).otherwise(col("country"))
)

df_clean = df_clean.drop("country").withColumnRenamed("country_clean", "country")

print("Distinct countries after cleaning (top 10):")
df_clean.groupBy("country").count().orderBy(desc("count")).show(10)

## Handle other columns with potential NULL values

In [0]:
%python
print("\n=== Handling other columns with NULL values ===")

# List of columns to check for NULL and handle
columns_to_check = ["meal", "market_segment", "distribution_channel", 
                    "deposit_type", "customer_type", "reserved_room_type", 
                    "assigned_room_type"]

for col_name in columns_to_check:
    if col_name in df_clean.columns:
        # Count NULLs before cleaning
        null_count_before = df_clean.filter(col(col_name).isNull()).count()
        
        # Clean the column
        df_clean = df_clean.withColumn(
            col_name,
            when(
                col(col_name).isNull(), "Unknown"
            ).otherwise(col(col_name))
        )
        
        # Count NULLs after cleaning
        null_count_after = df_clean.filter(col(col_name).isNull()).count()
        
        if null_count_before > 0:
            print(f"  {col_name}: {null_count_before} NULL values â†’ replaced with 'Unknown'")

## Handle Illogical values

In [0]:
%python
print("\n=== Removing impossible values ===")

# Adults cannot be 0 or negative
initial_count = df_clean.count()
df_clean = df_clean.filter(col("adults") > 0)
removed = initial_count - df_clean.count()
print(f"  Removed {removed} rows where adults <= 0")

# ADR (Average Daily Rate) should be positive or zero
initial_count = df_clean.count()
df_clean = df_clean.filter(col("adr") >= 0)
removed = initial_count - df_clean.count()
print(f"  Removed {removed} rows where adr < 0")

# Lead time should be non-negative
initial_count = df_clean.count()
df_clean = df_clean.filter(col("lead_time") >= 0)
removed = initial_count - df_clean.count()
print(f"  Removed {removed} rows where lead_time < 0")

# Total nights should be at least 1 (can't have 0 nights stay)
initial_count = df_clean.count()
df_clean = df_clean.filter(
    (col("stays_in_weekend_nights") + col("stays_in_week_nights")) >= 1
)
removed = initial_count - df_clean.count()
print(f"  Removed {removed} rows where total nights < 1")

## Create derived columns

In [0]:
%python
print("\n=== Creating derived columns ===")

df_clean = df_clean.withColumn(
    "total_nights",
    col("stays_in_weekend_nights") + col("stays_in_week_nights")
).withColumn(
    "total_guests",
    col("adults") + col("children") + col("babies")
).withColumn(
    "arrival_date_month_num",
    expr("""
        CASE lower(arrival_date_month)
            WHEN 'january' THEN 1
            WHEN 'february' THEN 2
            WHEN 'march' THEN 3
            WHEN 'april' THEN 4
            WHEN 'may' THEN 5
            WHEN 'june' THEN 6
            WHEN 'july' THEN 7
            WHEN 'august' THEN 8
            WHEN 'september' THEN 9
            WHEN 'october' THEN 10
            WHEN 'november' THEN 11
            WHEN 'december' THEN 12
            ELSE 0
        END
    """)
).withColumn(
    "total_spend",
    col("adr") * col("total_nights")
).withColumn(
    "is_weekend_stay",
    when(col("stays_in_weekend_nights") > 0, 1).otherwise(0)
).withColumn(
    "room_type_match",
    when(col("reserved_room_type") == col("assigned_room_type"), 1).otherwise(0)
)

print("Derived columns created: total_nights, total_guests, arrival_date_month_num, total_spend, is_weekend_stay, room_type_match")

## Deduplicate records

In [0]:
%python
# This removes bookings that are likely the same based on key attributes
print("\nMethod: Removing likely duplicates based on key attributes")
window_spec = Window.partitionBy(
    "hotel", 
    "lead_time", 
    "arrival_date_year", 
    "arrival_date_month", 
    "arrival_date_day_of_month",
    "adults", 
    "children",
    "country"
).orderBy("reservation_status_date")

df_clean = df_clean.withColumn("row_num", row_number().over(window_spec))
before_count = df_clean.count()
df_clean = df_clean.filter(col("row_num") == 1).drop("row_num")
after_count = df_clean.count()
duplicates_removed = before_count - after_count
print(f"  Records before deduplication: {before_count:,}")
print(f"  Records after deduplication: {after_count:,}")
print(f"  Duplicates removed: {duplicates_removed:,}")


## Define Churn Label

In [0]:
%python
print("\n=== Defining target variable ===")
df_clean = df_clean.withColumnRenamed("is_canceled", "churn")

# Check churn distribution
churn_stats = df_clean.groupBy("churn").agg(
    count("*").alias("count"),
    (count("*") / df_clean.count() * 100).alias("percentage")
).orderBy("churn")

print("Churn distribution:")
churn_stats.show()


## Write Clean Dataset to Silver layer

In [0]:
%python
silver_table_name = "hotel_catalog.silver.cleaned_hotel_bookings"

print(f"\n=== Writing cleaned data to Silver layer ===")
print(f"Table: {silver_table_name}")

df_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(silver_table_name)

print("Data successfully written to Silver layer")

## Final data quality summary

In [0]:
%python
print("\n" + "="*60)
print("FINAL DATA QUALITY SUMMARY")
print("="*60)

print(f"\nRecord Counts:")
print(f"  Initial records in Bronze: {df.count():,}")
print(f"  Final records in Silver: {df_clean.count():,}")
print(f"  Total records removed: {df.count() - df_clean.count():,}")

print(f"\nTarget Variable (Churn):")
churn_count = df_clean.filter(col("churn") == 1).count()
non_churn_count = df_clean.filter(col("churn") == 0).count()
total_count = df_clean.count()
print(f"  Churn cases: {churn_count:,} ({churn_count/total_count*100:.2f}%)")
print(f"  Non-churn cases: {non_churn_count:,} ({non_churn_count/total_count*100:.2f}%)")

print(f"\nHotel Distribution:")
hotel_dist = df_clean.groupBy("hotel").agg(
    count("*").alias("count"),
    (count("*") / df_clean.count() * 100).alias("percentage"),
    avg("churn").alias("churn_rate")
).orderBy(desc("count"))
hotel_dist.show(truncate=False)

print(f"\nTime Period:")
date_stats = df_clean.agg(
    min("arrival_date_year").alias("min_year"),
    max("arrival_date_year").alias("max_year"),
    countDistinct("arrival_date_month").alias("unique_months")
).collect()[0]
print(f"  Year range: {date_stats['min_year']} to {date_stats['max_year']}")
print(f"  Unique months: {date_stats['unique_months']}")

print(f"\nFinancial Metrics:")
financial_stats = df_clean.agg(
    avg("adr").alias("avg_daily_rate"),
    avg("total_spend").alias("avg_total_spend"),
    avg("lead_time").alias("avg_lead_time_days"),
    avg("total_nights").alias("avg_stay_length")
).collect()[0]
print(f"  Average Daily Rate: ${financial_stats['avg_daily_rate']:.2f}")
print(f"  Average Total Spend: ${financial_stats['avg_total_spend']:.2f}")
print(f"  Average Lead Time: {financial_stats['avg_lead_time_days']:.1f} days")
print(f"  Average Stay Length: {financial_stats['avg_stay_length']:.1f} nights")
print(f"\nNULL Values Check:")
# Check if any NULL values remain
remaining_nulls = []
for col_name in df_clean.columns:
    null_count = df_clean.filter(col(col_name).isNull()).count()
    if null_count > 0:
        remaining_nulls.append((col_name, null_count))

if remaining_nulls:
    print("  Warning: Some columns still have NULL values:")
    for col_name, count in remaining_nulls:
        print(f"    {col_name}: {count} NULLs")
else:
    print(" No NULL values found in cleaned data!")

# Cell 14: Display sample of cleaned data
print("\n" + "="*60)
print("SAMPLE OF CLEANED DATA (10 records)")
print("="*60)

sample_columns = [
    "hotel", "churn", "lead_time", "arrival_date_year", 
    "arrival_date_month", "total_nights", "total_guests",
    "adr", "country", "deposit_type", "customer_type"
]

df_clean.select(sample_columns).show(10, truncate=False)

print("\n" + "="*60)
print("DATA CLEANING COMPLETE!")
print("="*60)
print(f"Data saved to: {silver_table_name}")
print(f"Total records: {df_clean.count():,}")
print(f"Churn rate: {churn_count/total_count*100:.2f}%")
print("="*60)