<a href="https://colab.research.google.com/github/VinayD2028/BigData-Inclass-Assignment/blob/main/BigDataInclassAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline  # Import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, ChiSqSelector
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd  # Import pandas for .toPandas()

# Create output directory
output_dir = "outputs"
os.makedirs(output_dir, exist_ok=True)

# Initialize Spark session
spark = SparkSession.builder.appName("CustomerChurnMLlib").getOrCreate()

# Load dataset - CHANGED to new file
data_path = "churn_streaming_data.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
# CHANGED to drop new customer_id column
df = df.drop("customer_id")

# Task 1: Data Preprocessing and Feature Engineering
def preprocess_data(df):
    # CHANGED: Define new categorical and numerical columns based on churn_streaming_data.csv
    categorical_cols = ['region', 'plan_type']
    numeric_cols = [
        'age', 'monthly_fee', 'tenure_months', 'logins_per_week',
        'avg_session_minutes', 'content_watched_per_week',
        'num_support_tickets', 'satisfaction_score', 'used_discount'
    ]

    # Index and encode categorical features
    indexers = [
        StringIndexer(inputCol=col, outputCol=col + "_Index", handleInvalid="keep")
        for col in categorical_cols
    ]
    # Use dropLast=False to make feature selection mapping easier to interpret
    encoders = [
        OneHotEncoder(inputCol=col + "_Index", outputCol=col + "_Vec", dropLast=False)
        for col in categorical_cols
    ]

    # Assemble features
    feature_cols = [col + "_Vec" for col in categorical_cols] + numeric_cols
    # --- FIX: Added handleInvalid="skip" ---
    # The error log shows VectorAssembler fails on null values.
    # "skip" will filter out rows that contain nulls in any of the feature columns
    # before they are passed to the model.
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")

    # --- REVISED: Use a Pipeline for preprocessing ---
    # The original code fit and transformed stages individually, which is less efficient
    # and can lead to errors. A Pipeline is the correct approach.
    pipeline_stages = indexers + encoders + [assembler]
    pipeline = Pipeline(stages=pipeline_stages)

    pipeline_model = pipeline.fit(df)
    df = pipeline_model.transform(df)
    # --- End of Pipeline revision ---

    # Get metadata from the features column to robustly get feature names later
    features_metadata = df.schema["features"].metadata

    # Save output sample
    with open(f"{output_dir}/task1_preprocessing_summary.txt", "w") as f:
        f.write("Task 1: Data Preprocessing and Feature Engineering\n")
        f.write(f"Categorical columns: {categorical_cols}\n")
        f.write(f"Numerical columns: {numeric_cols}\n")
        f.write("Sample Output:\n")
        # CHANGED: Label column is now 'churn_flag'
        sample = df.select("features", "churn_flag").limit(5).toPandas()
        f.write(sample.to_string(index=False))

    # CHANGED: The label column is 'churn_flag' and is already numeric (0/1)
    # No StringIndexer needed for the label.
    # We return the metadata for use in Task 3
    return df.select("features", "churn_flag").withColumnRenamed("churn_flag", "label"), features_metadata

# Task 2: Train and Evaluate Logistic Regression Model
# This function is generic and requires no changes as it uses the "features" and "label" columns
def train_logistic_regression_model(df):
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
    lr = LogisticRegression(featuresCol="features", labelCol="label")
    model = lr.fit(train_df)
    predictions = model.transform(test_df)

    evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")
    auc = evaluator.evaluate(predictions)

    with open(f"{output_dir}/task2_logistic_regression_results.txt", "w") as f:
        f.write("Task 2: Logistic Regression Evaluation\n")
        f.write(f"Logistic Regression Model Accuracy (AUC): {auc:.2f}\n")

# Task 3: Feature Selection using Chi-Square
# REVISED: This function now uses the metadata to correctly map feature indices to names
def feature_selection(df, metadata):
    selector = ChiSqSelector(numTopFeatures=5, featuresCol="features", outputCol="selectedFeatures", labelCol="label")
    model = selector.fit(df)
    selected = model.transform(df)

    # --- REVISED: Robustly get feature names from metadata ---
    selected_indices = model.selectedFeatures

    attrs = metadata["ml_attr"]["attrs"]
    all_feature_names = []

    # The assembler puts OHE (nominal/binary) features first, then numeric features
    # We reconstruct the list in that exact order
    ohe_features = []
    if "nominal" in attrs:
        ohe_features.extend(attrs["nominal"])
    if "binary" in attrs: # OHE can also be considered binary
        ohe_features.extend(attrs["binary"])

    # Sort by "idx" to ensure correct order
    ohe_features.sort(key=lambda x: x["idx"])
    all_feature_names.extend([feat["name"] for feat in ohe_features])

    # Add numeric features
    numeric_features = attrs.get("numeric", [])
    numeric_features.sort(key=lambda x: x["idx"])
    all_feature_names.extend([feat["name"] for feat in numeric_features])

    # Map the selected indices to their names
    selected_feature_names = [all_feature_names[i] for i in selected_indices]
    # --- End of revision ---

    with open(f"{output_dir}/task3_feature_selection.txt", "w") as f:
        f.write("Task 3: Feature Selection using Chi-Square\n")
        f.write("Top 5 features selected (from indices):\n")
        for i, name in zip(selected_indices, selected_feature_names):
            f.write(f"- Index {i}: {name}\n")
        f.write("\nSample Output:\n")
        sample = selected.select("selectedFeatures", "label").limit(5).toPandas()
        f.write(sample.to_string(index=False))

# Task 4: Hyperparameter Tuning and Model Comparison
# This function is also generic, but I'll add a small improvement for clarity
def tune_and_compare_models(df):
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
    evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")
    output_lines = ["Task 4: Hyperparameter Tuning and Model Comparison\n"]

    # Define models and parameter grids
    lr = LogisticRegression(labelCol="label")
    dt = DecisionTreeClassifier(labelCol="label")
    rf = RandomForestClassifier(labelCol="label")
    gbt = GBTClassifier(labelCol="label")

    models = {
        "LogisticRegression": (lr, ParamGridBuilder()
                                .addGrid(lr.regParam, [0.01, 0.1])
                                .build()),
        "DecisionTree": (dt, ParamGridBuilder()
                             .addGrid(dt.maxDepth, [5, 10])
                             .build()),
        "RandomForest": (rf, ParamGridBuilder()
                             .addGrid(rf.numTrees, [10, 50])
                             .build()),
        "GBT": (gbt, ParamGridBuilder()
                   .addGrid(gbt.maxIter, [10, 20])
                   .build())
    }

    for name, (model, grid) in models.items():
        output_lines.append(f"\nTuning {name}...")
        cv = CrossValidator(estimator=model,
                            estimatorParamMaps=grid,
                            evaluator=evaluator,
                            numFolds=3) # Reduced to 3 folds for faster execution, can be 5

        cv_model = cv.fit(train_df)
        best_model = cv_model.bestModel
        predictions = best_model.transform(test_df)
        auc = evaluator.evaluate(predictions)
        output_lines.append(f"{name} Best Model Accuracy (AUC): {auc:.2f}")

        # --- IMPROVEMENT: Print only the tuned parameters ---
        best_params_map = best_model.extractParamMap()
        tuned_params = {}
        for param_map in grid:
            for param, value in param_map.items():
                tuned_params[param.name] = best_params_map[param]
        output_lines.append(f"Best Params for {name}: {tuned_params}")
        # --- End of improvement ---

    with open(f"{output_dir}/task4_model_comparison.txt", "w") as f:
        f.write("\n".join(output_lines))

# --- Run all tasks ---
# CHANGED: preprocess_data now returns metadata
preprocessed_df, metadata = preprocess_data(df)

train_logistic_regression_model(preprocessed_df)

# CHANGED: pass metadata to feature_selection
feature_selection(preprocessed_df, metadata)

tune_and_compare_models(preprocessed_df)

# Stop Spark session
print("MLlib tasks complete. Output files saved to 'outputs' directory.")
spark.stop()

MLlib tasks complete. Output files saved to 'outputs' directory.
