In [484]:
!pip install xgboost==1.7.6
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import col, abs
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel, ClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import when
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit
from pathlib import Path
from pyspark import StorageLevel
from xgboost.spark import SparkXGBClassifier
import uuid
import warnings
import pandas as pd
import time
from datetime import timedelta
warnings.filterwarnings('ignore')



In [485]:
spark = SparkSession.builder.appName("ChurnXGBoost").master("local[*]").getOrCreate()
cores = spark.sparkContext.defaultParallelism 
target_shuffle_partitions = cores * 2 # set custom partitions for shuffling based on available cores
spark.conf.set("spark.sql.shuffle.partitions", target_shuffle_partitions) # will use above coun during shuffle operations
print("spark.sql.shuffle.partitions ->", spark.conf.get("spark.sql.shuffle.partitions")) # displays partitions

spark.sql.shuffle.partitions -> 24


In [486]:
def concatenate_clean_churn_files() -> DataFrame | None:
    """
    Combines all 'churn_clean*.csv' files in current directory into one DataFrame for processing.
    Returns None if no matching files found. First file determines schema.
    """
    csvFiles = [str(file) for file in Path(".").glob("churn_clean*.csv")]
    print(f"All available cleaned churn csv files: {csvFiles}")

    if not csvFiles:
        print("No churn_cleaned CSV files found...")
        return None
    count = 0
    for fileName in csvFiles:
        if count == 0:
            churnCombinedDf = spark.read.csv(fileName, header=True, inferSchema=True)
            count += 1
        else:
            churnCombinedDf = churnCombinedDf.unionByName(churnCombinedDf)
    return churnCombinedDf

In [487]:
def get_churn_metadata(churnDataFrame: DataFrame) -> None:
    """
    Prints basic DataFrame metadata including column dtypes and non-null counts for user visibility.
    Temporarily converts Spark DataFrame to pandas for clean meta data outut.
    """
    tempPandasDf = churnDataFrame.toPandas() 
    tempPandasDf.info() 

In [488]:
def split_dataframe_by_label(churnDataFrame: DataFrame) -> tuple[DataFrame, DataFrame, bool]:
    """
    Splits DataFrame into labeled and unlabeled sets based on if churn_risk_score contains empty rows.
    If no unlabeled data exists, reserves 1% of labeled data for demonstrative prediction purposes.
    Args:
        churnDataFrame: Input DataFrame containing cleaned churn data
    Returns:
        churnlabeled: Rows with churn_risk_score
        churnunlabeled: Rows without churn_risk_score or reserved data
        reservedFlag: True if data was reserved
    """
    churnlabeled = churnDataFrame.filter(col('churn_risk_score').isNotNull())
    churnunlabeled = churnDataFrame.filter(col('churn_risk_score').isNull())
    reserve = 0.01
    reservedFlag = False
    if churnunlabeled.count() == 0:
        reservedFlag = True
        print('no empty churn_risk_score rows...')
        print(f"Reserving {int(1)}% of labeled data for demo predictions...")
        trainDf, reservedDf = churnlabeled.randomSplit([1 - reserve, reserve], seed=42)
        churnlabeled = trainDf
        churnunlabeled = reservedDf
        print(f"Reserve dataframe with {churnunlabeled.count()} rows")
        churnunlabeled = churnunlabeled.withColumnRenamed("churn_risk_score", "actual_churn_risk_score")
        churnunlabeled = churnunlabeled.withColumn("predicted_churn_risk_score", lit(None))
    return churnlabeled, churnunlabeled, reservedFlag

In [489]:
def separate_feature_target_cols(churnDataFrame: DataFrame) -> tuple[list,str]:
    """
    Separates the input DataFrame columns into features and target variable.
    Uses all columns except the last as features, and the last column as target.
    Args:
        churnDataFrame: Input DataFrame containing cleaned churn data
    Returns:
        featureCols: List of all feature column names
        targetCol: Name of the target column to predict
    """
    featureCols = churnDataFrame.columns[:-1]
    targetCol = churnDataFrame.columns[-1]
    print(f"All Feature Columns: {featureCols}")
    print(f"Target Column: {targetCol}")
    return featureCols, targetCol

In [490]:
def correlation_analysis(churnDataFrame: DataFrame, featureCols: list,
                         targetCol: str, moderateVal: float, strongVal: float) -> list:
    """
    Performs correlation analysis between features and target variable.
    Identifies features with strong, moderate and weak correlation based on threshold values.
    Args:
        churnDataFrame: Input DataFrame containing features and target
        featureCols: List of feature column names to analyze
        targetCol: Name of the target column
        moderateVal: Threshold for moderate correlation
        strongVal: Threshold for strong correlation
    Returns:
        list: Features showing strong/moderate correlation with target
    """
    print("Begin correlation analysis of all features...\n")

    corrVals = []
    for feature in featureCols:
        corr = churnDataFrame.stat.corr(feature, targetCol)
        corrVals.append((feature, corr))

    corrDf = spark.createDataFrame(corrVals, ["feature", "pearson_correlation"]) \
                  .orderBy(abs(col("pearson_correlation")).desc())
    corrDf = corrDf.withColumn("feature_strength",
         when(abs(col("pearson_correlation")) > strongVal, "Strong")
        .when(abs(col("pearson_correlation")) > moderateVal, "Moderate")
        .otherwise("Weak"))
    
    print("All Correlation Scores:")
    corrDf.show(n=corrDf.count(), truncate=False)
    print()
    print("Features displaying the strongest predictive signal:")
    filteredCorrDf = corrDf.filter(col("feature_strength").isin("Strong", "Moderate"))
    filteredCorrDf.orderBy(abs(col("pearson_correlation")).desc()).show(truncate=False)
    strongCorrFeatures = filteredCorrDf.select("feature").rdd.flatMap(lambda x: x).collect()
    return strongCorrFeatures

In [491]:
def feature_assemble(churnDataframe, featureCols, 
                     trainSplit:float, testSplit:float) -> tuple[DataFrame, DataFrame, bool]:
    """
    Combines features into vector format and splits data into training and test sets.
    Creates feature vectors for XGBoost and performs train-test split as specified.
    Args:
        churnDataframe: Input DataFrame containing features
        featureCols: List of feature column names to assemble
        trainSplit: Proportion of data for training (%)
        testSplit: Proportion of data for testing (%)
    Returns:
        trainDf: Training dataset with feature vectors
        testDf: Test dataset with feature vectors
    """
    print("Assembling features...")
    print("Combining all features into single vector...")
    assembler_all = VectorAssembler(inputCols = featureCols, outputCol="features") 
    pipeline = Pipeline(stages=[assembler_all])
    vectorDf = pipeline.fit(churnDataframe).transform(churnDataframe)
    print("Splitting data...")
    print(f"{trainSplit*100}/{testSplit*100} Test Train Split...")
    trainDf, testDf = vectorDf.randomSplit([trainSplit, testSplit], seed=42)
    return trainDf, testDf

In [492]:
def baseline_model(trainDf, testDf, targetCol) -> tuple[DataFrame, str]:
    """
    Trains and evaluates a baseline XGBoost model using all available features.
    Returns the trained model and its F1 score on the test set.
    Args:
        trainDf: Training DataFrame containing feature vectors
        testDf: Test DataFrame for model evaluation
        targetCol: Name of the target/label column
    Returns:
        xgbModel: Trained XGBoost model
        baselineF1Score: F1 score achieved on test data
    """
    xgbClassifier = SparkXGBClassifier(features_col="features", label_col=targetCol, 
                                       prediction_col="prediction", num_workers=cores)
    xgbModel = xgbClassifier.fit(trainDf)
    predictions = xgbModel.transform(testDf)
    evaluator = MulticlassClassificationEvaluator(labelCol=targetCol, 
                                                  predictionCol="prediction",metricName="f1")
    baseF1Score = round(evaluator.evaluate(predictions),5)
    precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
    recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
    
    print(f"Base Model F1 Score: {round(baseF1Score, 5)}")
    print(f"Base Model Precision: {round(precision, 5)}")
    print(f"Base Model Recall: {round(recall, 5)}\n")
    return xgbModel, baseF1Score

In [493]:
def feature_information_gain(model, featureCols) -> list:
    importances = model.get_booster().get_score(importance_type="gain")
    print("All Feature Importances by Information Gain:")

    featureMap = {f"f{i}": name for i, name in enumerate(featureCols)}
    sortedImportances = sorted(importances.items(), key=lambda x: x[1], reverse=True)

    featureDict = {}
    for feature, score in sortedImportances:
        feature = featureMap.get(feature, feature)
        featureDict[feature] = score
    scoreList = [[featureMap.get(feat, feat), score] for feat, score in sortedImportances]
    allFeatureGainDf = spark.createDataFrame(scoreList, ["feature", "gain"])
    allFeatureGainDf = allFeatureGainDf.withColumn("gain_strength",
         when(abs(col("gain")) > 30, "Strong")
        .when(abs(col("gain")) > 10, "Moderate")
        .otherwise("Weak"))
    allFeatureGainDf.show(n=allFeatureGainDf.count(), truncate=False)
    print("Features with stronger information gain scores...")
    strongGainDf = allFeatureGainDf.filter(col("gain_strength").isin("Strong", "Moderate"))
    strongGainDf.orderBy(abs(col("gain_strength")).desc()).show(truncate=False)
    strongGainFeatures = strongGainDf.select("feature").rdd.flatMap(lambda x: x).collect()
    return strongGainFeatures

In [494]:
def get_strong_features(corrFeatures: list, gainFeatures: list) -> list:
        strongFeatures = list(set(corrFeatures + gainFeatures))
        print(f"Strong feature list: {strongFeatures}")
        return strongFeatures

In [495]:
def kfold_cross_validation(trainDf, testDf, featureCols, 
                           TargetCol, modelType = "GBT", k=5)-> tuple[GBTClassificationModel, float]:
    print(f"Running {k}-fold CV on training data using model: {modelType}")

    assembler = VectorAssembler(inputCols= featureCols, outputCol="strongFeatures")
    train = assembler.transform(trainDf).select("strongFeatures", TargetCol)
    test = assembler.transform(testDf).select("strongFeatures", TargetCol)

    model = GBTClassifier(labelCol=TargetCol, featuresCol="strongFeatures")
    paramGrid = ParamGridBuilder() \
        .addGrid(model.maxDepth, [3, 5]) \
        .addGrid(model.maxIter, [10, 20]) \
        .addGrid(model.stepSize, [0.05, 0.1]) \
        .addGrid(model.subsamplingRate, [0.8, 1.0]) \
        .build()

    # Evaluator
    evaluator = MulticlassClassificationEvaluator(
        labelCol=TargetCol,
        predictionCol="prediction",
        metricName="f1")

    # CrossValidator
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=k,
        parallelism=cores)

    cvModel = cv.fit(train)
    predictions = cvModel.transform(test)
    print(predictions)
    optimisedF1Score = round(evaluator.evaluate(predictions),5)
    precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
    recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)

    print(f"Optimised Model F1 Score: {round(optimisedF1Score, 5)}")
    print(f"Optimised Precision: {round(precision, 5)}")
    print(f"Optimised Recall: {round(recall, 5)}\n")
    
    return cvModel.bestModel, optimisedF1Score

In [496]:
def select_best_model(baselineF1, optimisedF1, baseModel, 
                      optimisedModel, strongFeatures, allFeatures) -> tuple[ClassificationModel, str, list]:
    print(f"Base model F1 Score: {baselineF1}")
    print(f"Optimised model F1 Score: {optimisedF1}")
    if optimisedF1 > baselineF1:
        model = optimisedModel
        modelType = "optimised"
        features = strongFeatures
        print(f"Optimised model selected...")
    else:
        model = baseModel
        modelType = "base"
        features = allFeatures
        print(f"Base model selected...")
    return model, modelType, features

In [497]:
def predict_churn_score(modelType, model, 
                        churnDataUnlabelled: DataFrame, features, reservedFlag) -> None:

    if modelType == "optimised":
        feature_vector_col = "strongFeatures"
        selectedFeatures = ['actual_churn_risk_score'] + features
    else:
        feature_vector_col = "features"
        selectedFeatures = ['actual_churn_risk_score'] + features

    churnDataUnlabelled = churnDataUnlabelled.select(*[feature for feature in selectedFeatures if feature in churnDataUnlabelled.columns])

    assembler = VectorAssembler(inputCols=features, outputCol=feature_vector_col)
    churnunlabeledPredict = assembler.transform(churnDataUnlabelled)
    predictions = model.transform(churnunlabeledPredict)

    predictions = predictions.withColumnRenamed("prediction", "predicted_churn_risk_score")
    
    # Determine which label column to use
    if "actual_churn_risk_score" in predictions.columns:
        label_col = "actual_churn_risk_score"
    elif "churn_risk_score" in predictions.columns:
        label_col = "churn_risk_score"

    if label_col:
        selected_df = predictions.select(label_col, "predicted_churn_risk_score")
        print(f"\n----- Prediction Results ({modelType} model using '{label_col}') -----")
        selected_df = selected_df.withColumn("predicted_churn_risk_score", selected_df["predicted_churn_risk_score"].cast(IntegerType()))
        selected_df.show(truncate=False)

        correct = predictions.filter(col(label_col) == col("predicted_churn_risk_score")).count()
        total = predictions.count()
        accuracy = correct / total if total else 0
        print(f"Prediction Accuracy: {accuracy:.2%} ({correct}/{total})")
    else:
        print("\nNo label column found ('actual_churn_risk_score' or 'churn_risk_score') in data.")
        predictions.select("predicted_churn_risk_score").show(truncate=False)

In [498]:
def execute_ml_pipeline() -> None:
    
    startTime = time.time()
    
    print("Spark session created: ChurnXGBoost ")
    print()

    randomId = uuid.uuid4()
    stringID = str(randomId)
    print(f"Executing Machine Learning Pipeline...")
    print() # Creates a gap for for clean logging output
    print(f"Run ID: {stringID}")
    print()

    print("***** DATA PREPARATION *****", end="\n")
    print()
    
    print("===== Combine Cleaned Files and Load Data =====", end="\n")
    churnData = concatenate_clean_churn_files()

    print("===== Cleaned Churn Metadata =====", end="\n")
    get_churn_metadata(churnData)
    print()

    print("===== Split Dataframe by labels =====", end="\n")
    churnDataLabelled, churnDataUnlabelled, reservedFlag = split_dataframe(churnData)
    print()

    print("***** TRAIN AND SCORE BASELINE MODEL *****", end="\n")
    print()

    print("===== Baseline Model: Separate Feature and Target Column/s =====", end="\n")
    allFeatures, targetCol = separate_feature_target_cols(churnDataLabelled)
    print()

    print("===== Baseline Model: Correlation Analysis =====", end="\n")
    strongCorrFeatures = correlation_analysis(churnDataLabelled, allFeatures, targetCol, 0.2, 0.4)
    print()

    print("===== Baseline Model: Assemble Features =====", end="\n")
    trainSetDf, testSetDf = feature_assemble(churnDataLabelled, allFeatures, trainSplit=0.8, testSplit=0.2)
    print()

    print("===== Baseline Model: Create and Score =====",end="\n")
    baseModel, baselineF1Score = baseline_model(trainSetDf, testSetDf, targetCol)
    print()

    print("===== Baseline Model: Feature Information Gain =====",end="\n")
    strongGainFeatures = feature_information_gain(baseModel, allFeatures)
    print()

    print("***** TRAIN AND SCORE OPTIMISED MODEL *****",end="\n")
    print()

    print("===== Optimised Model: Strongest Features =====",end="\n")
    strongFeatures = get_strong_features(strongCorrFeatures, strongGainFeatures)
    print()

    print("===== Optimised Model: K-Fold Cross Validation =====",end="\n")
    optimisedModel, optimisedF1Score = kfold_cross_validation(trainSetDf, testSetDf, strongFeatures, targetCol)
    print()

    print("***** SELECT BEST MODEL AND PREDICT UNSEEN DATA *****",end="\n")
    print()

    print("===== Select Best Model =====",end="\n")
    bestModel, modelType, features = select_best_model(baselineF1Score, optimisedF1Score,
                                             baseModel, optimisedModel, strongFeatures, allFeatures)
    print()

    print("===== Apply Model to Unseen Data =====",end="\n")
    predict_churn_score(modelType, bestModel, churnDataUnlabelled, features, reservedFlag)
    
    endTime = time.time()
    pipelineRunTime = endTime - startTime
    print(f"Machine learning pipeline complete. Run time: {timedelta(seconds=int(pipelineRunTime))}")

In [499]:
execute_ml_pipeline()

Spark session created: ChurnXGBoost 

Executing Machine Learning Pipeline...

Run ID: 5129a1b6-6dc3-4b16-89ac-e5491997482c

***** DATA PREPARATION *****

===== Combine Cleaned Files and Load Data =====
All available cleaned churn csv files: ['churn_clean.csv']
===== Cleaned Churn Metadata =====
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 25 columns):
 #   Column                        Non-Null Count  Dtype  
---  ------                        --------------  -----  
 0   age                           20000 non-null  int32  
 1   gender                        20000 non-null  int32  
 2   region_category               20000 non-null  int32  
 3   membership_category           20000 non-null  int32  
 4   joined_through_referral       20000 non-null  int32  
 5   preferred_offer_types         20000 non-null  int32  
 6   medium_of_operation           20000 non-null  int32  
 7   internet_option               20000 non-null  int32  
 8   

DataFrame[strongFeatures: vector, churn_risk_score: int, rawPrediction: vector, probability: vector, prediction: double]
Optimised Model F1 Score: 0.93373
Optimised Precision: 0.93461
Optimised Recall: 0.93385


***** SELECT BEST MODEL AND PREDICT UNSEEN DATA *****

===== Select Best Model =====
Base model F1 Score: 0.93497
Optimised model F1 Score: 0.93373
Base model selected...

===== Apply Model to Unseen Data =====

----- Prediction Results (base model using 'actual_churn_risk_score') -----
+-----------------------+--------------------------+
|actual_churn_risk_score|predicted_churn_risk_score|
+-----------------------+--------------------------+
|1                      |1                         |
|0                      |0                         |
|0                      |0                         |
|1                      |1                         |
|0                      |0                         |
|1                      |1                         |
|1                     