In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Step 1: Start Spark session
spark = SparkSession.builder.appName("HotelCancellationPrediction").getOrCreate()

# Step 2: Read CSV from HDFS
df = spark.read.csv("hdfs://namenode:9000/data.csv", header=True, inferSchema=True)

# Step 3: Select useful columns and drop missing
df = df.select("hotel", "lead_time", "adr", "total_of_special_requests", "is_canceled").na.drop()

# Step 4: StringIndexer and VectorAssembler
indexer = StringIndexer(inputCol="hotel", outputCol="hotel_index")
assembler = VectorAssembler(
    inputCols=["hotel_index", "lead_time", "adr", "total_of_special_requests"],
    outputCol="features"
)

# Step 5: Train/Test Split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Step 6: Define models with names
models = {
    "LogisticRegression": LogisticRegression(labelCol="is_canceled", featuresCol="features"),
    "DecisionTree": DecisionTreeClassifier(labelCol="is_canceled", featuresCol="features"),
    "RandomForest": RandomForestClassifier(labelCol="is_canceled", featuresCol="features", numTrees=50),
    "GBTClassifier": GBTClassifier(labelCol="is_canceled", featuresCol="features", maxIter=20)
}

# Step 7: Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="is_canceled", metricName="accuracy")

# Step 8: Train, Evaluate, and Save Models
for name, clf in models.items():
    print(f"\n🔍 Training {name}...")

    pipeline = Pipeline(stages=[indexer, assembler, clf])
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    
    # Evaluate accuracy
    accuracy = evaluator.evaluate(predictions)
    print(f"✅ {name} Accuracy: {accuracy:.4f}")

    # Confusion matrix
    print("📊 Confusion Matrix:")
    predictions.groupBy("is_canceled", "prediction").count().orderBy("is_canceled", "prediction").show()

    # Save model to HDFS
    model_path = f"hdfs://namenode:9000/models/{name}_model"
    model.write().overwrite().save(model_path)
    print(f"📁 Model saved to: {model_path}")



🔍 Training LogisticRegression...
✅ LogisticRegression Accuracy: 0.6960
📊 Confusion Matrix:
+-----------+----------+-----+
|is_canceled|prediction|count|
+-----------+----------+-----+
|          0|       0.0|12895|
|          0|       1.0| 2063|
|          1|       0.0| 5146|
|          1|       1.0| 3613|
+-----------+----------+-----+

📁 Model saved to: hdfs://namenode:9000/models/LogisticRegression_model

🔍 Training DecisionTree...
✅ DecisionTree Accuracy: 0.7230
📊 Confusion Matrix:
+-----------+----------+-----+
|is_canceled|prediction|count|
+-----------+----------+-----+
|          0|       0.0|13162|
|          0|       1.0| 1796|
|          1|       0.0| 4774|
|          1|       1.0| 3985|
+-----------+----------+-----+

📁 Model saved to: hdfs://namenode:9000/models/DecisionTree_model

🔍 Training RandomForest...
✅ RandomForest Accuracy: 0.7209
📊 Confusion Matrix:
+-----------+----------+-----+
|is_canceled|prediction|count|
+-----------+----------+-----+
|          0|       0