# Phase 3: Spark Data Processing and Feature Engineering

This notebook demonstrates:
1. Setting up PySpark in Colab
2. Loading data into Spark DataFrame
3. Feature engineering using Spark transformations
4. Data preprocessing pipeline
5. Preparing data for machine learning

## Why Spark?
- Distributed processing for large datasets
- Scalable feature engineering
- MLlib for machine learning
- Handles data beyond memory limits


In [None]:
# Install PySpark
%pip install pyspark findspark -q


In [None]:
# Import libraries
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, isnull, count, avg, sum as spark_sum
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.feature import VectorAssembler, StringIndexer, Imputer, StandardScaler
from pyspark.ml import Pipeline
import warnings
warnings.filterwarnings('ignore')

print("✓ Libraries imported")


## Step 1: Initialize Spark Session


In [None]:
# Create Spark session
spark = SparkSession.builder \
    .appName("HotelBookingPreprocessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print("✓ Spark session created")
print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl if hasattr(spark.sparkContext, 'uiWebUrl') else 'N/A'}")


## Step 2: Load Data into Spark


In [None]:
# Option 1: Load from CSV
csv_path = "/content/hotel_bookings.csv"  # Colab path
# csv_path = "../data/hotel_bookings.csv"  # Local path

# Option 2: Load from MongoDB (requires MongoDB Spark Connector)
# For simplicity, we'll load from CSV and demonstrate MongoDB integration conceptually

# Load CSV into Spark DataFrame
df_spark = spark.read.csv(csv_path, header=True, inferSchema=True)

print(f"✓ Data loaded into Spark")
print(f"Number of partitions: {df_spark.rdd.getNumPartitions()}")
print(f"Total records: {df_spark.count():,}")
print(f"\nSchema:")
df_spark.printSchema()


In [None]:
# Show first few rows
print("=== Sample Data ===")
df_spark.show(5, truncate=False)


## Step 3: Data Quality Check


In [None]:
# Check for missing values
print("=== Missing Values Analysis ===")

# Use isnull() for all columns (works for all data types: numeric, string, date, etc.)
# For numeric columns, NaN values are also considered null in Spark
missing_counts = df_spark.select([count(when(isnull(col(c)), c)).alias(c) 
                                   for c in df_spark.columns])

# Convert to pandas for better display
missing_pd = missing_counts.toPandas().T
missing_pd.columns = ['Missing Count']
missing_pd = missing_pd[missing_pd['Missing Count'] > 0].sort_values('Missing Count', ascending=False)

if len(missing_pd) > 0:
    print("Columns with missing values:")
    display(missing_pd)
else:
    print("✓ No missing values found")
    

In [None]:
# Basic statistics
print("=== Basic Statistics ===")
df_spark.describe().show()


## Step 4: Feature Engineering

Create new features using Spark transformations.


In [None]:
# Feature 1: Total nights
df_spark = df_spark.withColumn(
    "total_nights",
    col("stays_in_weekend_nights") + col("stays_in_week_nights")
)

# Feature 2: Total guests
df_spark = df_spark.withColumn(
    "total_guests",
    col("adults") + 
    when(col("children").isNull(), 0).otherwise(col("children")) +
    when(col("babies").isNull(), 0).otherwise(col("babies"))
)

# Feature 3: Total booking value (ADR × total nights)
df_spark = df_spark.withColumn(
    "total_booking_value",
    col("adr") * col("total_nights")
)

# Feature 4: Lead time categories
df_spark = df_spark.withColumn(
    "lead_time_category",
    when(col("lead_time") <= 30, "Very Short")
    .when(col("lead_time") <= 90, "Short")
    .when(col("lead_time") <= 180, "Medium")
    .when(col("lead_time") <= 365, "Long")
    .otherwise("Very Long")
)

# Feature 5: Previous cancellation ratio
df_spark = df_spark.withColumn(
    "previous_cancellation_ratio",
    col("previous_cancellations") / 
    (col("previous_cancellations") + col("previous_bookings_not_canceled") + 1)
)

print("✓ Feature engineering completed")
print("\nNew features created:")
print("  - total_nights")
print("  - total_guests")
print("  - total_booking_value")
print("  - lead_time_category")
print("  - previous_cancellation_ratio")


In [None]:
# Verify new features
print("=== Sample of New Features ===")
df_spark.select("total_nights", "total_guests", "total_booking_value", 
                "lead_time_category", "previous_cancellation_ratio").show(10)


## Step 5: Handle Missing Values


In [None]:
# Fill missing values
# Handle both null values and 'NA' strings
# Children
df_spark = df_spark.withColumn(
    "children",
    when(col("children").isNull(), 0)
    .when(col("children").cast("string") == "NA", 0)
    .otherwise(col("children").cast("double"))
)

# Agent
df_spark = df_spark.withColumn(
    "agent",
    when(col("agent").isNull(), 0)
    .when(col("agent").cast("string") == "NA", 0)
    .otherwise(col("agent").cast("double"))
)

# Company
df_spark = df_spark.withColumn(
    "company",
    when(col("company").isNull(), 0)
    .when(col("company").cast("string") == "NA", 0)
    .otherwise(col("company").cast("double"))
)

# Country (string column - handle differently)
df_spark = df_spark.withColumn(
    "country",
    when(col("country").isNull(), "Unknown")
    .when(col("country") == "NA", "Unknown")
    .otherwise(col("country"))
)

# CRITICAL: Clean is_canceled column early to avoid issues later
# This is the target variable and must be integer type with no 'NA' strings
df_spark = df_spark.withColumn(
    "is_canceled",
    when(col("is_canceled").cast("string").isin(["NA", "na", "Na", "nA", "null", "NULL", ""]), 0)
    .when(col("is_canceled").isNull(), 0)
    .otherwise(col("is_canceled").cast("integer"))
)

print("✓ Missing values handled (including 'NA' strings)")
print("✓ Target variable (is_canceled) cleaned and cast to integer")


## Step 6: Prepare Features for ML

Select and prepare features for machine learning models.


In [None]:
# Select features for ML
# Numerical features
numerical_features = [
    "lead_time", "arrival_date_year", "arrival_date_week_number",
    "arrival_date_day_of_month", "stays_in_weekend_nights", 
    "stays_in_week_nights", "adults", "children", "babies",
    "previous_cancellations", "previous_bookings_not_canceled",
    "adr", "required_car_parking_spaces", "total_of_special_requests",
    "total_nights", "total_guests", "total_booking_value",
    "previous_cancellation_ratio"
]

# Categorical features to encode
categorical_features = [
    "hotel", "meal", "country", "market_segment",
    "distribution_channel", "reserved_room_type",
    "assigned_room_type", "deposit_type", "customer_type",
    "lead_time_category"
]

# Filter to existing columns
numerical_features = [f for f in numerical_features if f in df_spark.columns]
categorical_features = [f for f in categorical_features if f in df_spark.columns]

print(f"Numerical features: {len(numerical_features)}")
print(f"Categorical features: {len(categorical_features)}")


In [None]:
# Create StringIndexers for categorical features
indexers = []
for col_name in categorical_features:
    indexer = StringIndexer(
        inputCol=col_name,
        outputCol=f"{col_name}_indexed",
        handleInvalid="keep"
    )
    indexers.append(indexer)

print(f"✓ Created {len(indexers)} StringIndexers")


In [None]:
# Apply StringIndexers
df_indexed = df_spark
for indexer in indexers:
    df_indexed = indexer.fit(df_indexed).transform(df_indexed)

print("✓ Categorical features indexed")


In [None]:
# Prepare feature columns for VectorAssembler
feature_columns = numerical_features + [f"{col}_indexed" for col in categorical_features]

# Final cleaning: Aggressively clean 'NA' strings and ensure proper types
df_cleaned = df_indexed

# Clean each numerical column: replace 'NA' strings and ensure double type
for col_name in numerical_features:
    if col_name in df_cleaned.columns:
        # Step 1: Convert to string, replace 'NA' variants with null, then cast to double
        df_cleaned = df_cleaned.withColumn(
            col_name + "_temp",
            when(col(col_name).cast("string").isin(["NA", "na", "Na", "nA", "null", "NULL", "Null", "", "None"]), None)
            .otherwise(col(col_name))
        ).withColumn(
            col_name,
            col(col_name + "_temp").cast("double")
        ).drop(col_name + "_temp")

print("✓ Cleaned 'NA' strings and ensured numerical columns are double type")

# Handle missing values manually - calculate mean on clean data
df_imputed = df_cleaned

print("Calculating means and filling missing values...")
for i, col_name in enumerate(numerical_features):
    if col_name in df_imputed.columns:
        try:
            # Calculate mean (this will work now that 'NA' strings are null)
            mean_result = df_imputed.select(avg(col(col_name)).alias("mean")).collect()[0]
            mean_value = mean_result["mean"] if mean_result["mean"] is not None else 0.0
            
            # Fill null values with mean
            df_imputed = df_imputed.withColumn(
                f"{col_name}_imputed",
                when(col(col_name).isNull(), mean_value)
                .otherwise(col(col_name))
            )
            
            if (i + 1) % 5 == 0:
                print(f"  Processed {i + 1}/{len(numerical_features)} columns...")
        except Exception as e:
            print(f"  Warning: Could not process {col_name}: {e}")
            # If mean calculation fails, just use 0.0
            df_imputed = df_imputed.withColumn(
                f"{col_name}_imputed",
                when(col(col_name).isNull(), 0.0)
                .otherwise(col(col_name))
            )

print("✓ Missing values filled with column means")

# Update feature columns to use imputed versions
feature_columns_imputed = [f"{col}_imputed" for col in numerical_features] + \
                         [f"{col}_indexed" for col in categorical_features]

print(f"✓ Features prepared: {len(feature_columns_imputed)} total features")


## Step 7: Create Feature Vector


In [None]:
# Assemble features into a vector
assembler = VectorAssembler(
    inputCols=feature_columns_imputed,
    outputCol="features",
    handleInvalid="skip"
)

df_features = assembler.transform(df_imputed)

print("✓ Features assembled into vector")
print(f"Feature vector size: {len(feature_columns_imputed)}")


In [None]:
# Select final columns for ML
# Use try_cast to safely handle 'NA' strings without errors
from pyspark.sql.functions import expr

# Step 1: Use try_cast to safely convert is_canceled to integer
# try_cast returns NULL if casting fails, which we can then handle
df_ml = df_features.withColumn(
    "label_temp",
    expr("try_cast(is_canceled as int)")
).withColumn(
    "label",
    when(col("label_temp").isNull(), 0)
    .otherwise(col("label_temp"))
).select("features", "label")

# Step 2: Filter out any rows where features might be problematic
# (This shouldn't be needed if previous steps worked, but just in case)
df_ml = df_ml.filter(col("label").isNotNull())

# Cache for faster access (only after data is clean)
df_ml.cache()

print("✓ Data prepared for ML")
print(f"Total records: {df_ml.count():,}")
df_ml.show(5)


## Step 8: Train/Test Split


In [None]:
# Split data into training and test sets (80/20)
train_df, test_df = df_ml.randomSplit([0.8, 0.2], seed=42)

print("=== Train/Test Split ===")
print(f"Training set: {train_df.count():,} records ({train_df.count()/df_ml.count()*100:.1f}%)")
print(f"Test set: {test_df.count():,} records ({test_df.count()/df_ml.count()*100:.1f}%)")

# Check label distribution in both sets
print("\n=== Label Distribution ===")
print("Training set:")
train_df.groupBy("label").count().show()
print("\nTest set:")
test_df.groupBy("label").count().show()


## Step 9: Save Processed Data (Optional)

Save processed data for use in next notebook.


In [None]:
# Note: In production, you might save to Parquet for efficient storage
# For this project, we'll keep data in memory and use in next notebook

print("✓ Data preprocessing completed")
print("\nReady for machine learning models!")
print("\nNext steps:")
print("  - Use train_df and test_df for model training")
print("  - Features are in 'features' column")
print("  - Labels are in 'label' column")


## Summary

✓ Spark session initialized
✓ Data loaded and cached
✓ Feature engineering completed
✓ Missing values handled
✓ Categorical features encoded
✓ Features assembled into vectors
✓ Train/test split performed

**Next Steps**: Proceed to `04_ml_models.ipynb` to train machine learning models.
