In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, max, when, current_date, datediff, substring, length, expr
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("CustomerChurn").getOrCreate()

In [None]:
customers_df = spark.sql("SELECT * FROM Banking_Data.Customers")
accounts_df = spark.sql("SELECT * FROM Banking_Data.Accounts")
transactions_df = spark.sql("SELECT * FROM Banking_Data.Transactions")
loans_df = spark.sql("SELECT * FROM Banking_Data.Loans")
cards_df = spark.sql("SELECT * FROM Banking_Data.Cards")
support_calls_df = spark.sql("SELECT * FROM Banking_Data.SupportCalls")

In [None]:
customers_df = customers_df.withColumn(
    "state_code",
    substring(col("Address"), -8, 2)
).withColumn(
    "tenure_days",
    datediff(current_date(), col("JoinDate"))
)

customers_features = customers_df.select(
    "CustomerID",
    "state_code",
    "tenure_days"
)

account_agg = accounts_df.groupBy("CustomerID").agg(
    count("AccountID").alias("num_accounts"),
    sum("Balance").alias("total_balance"),
    count(when(col("AccountType") == "Savings", 1)).alias("num_savings_accounts"),
    count(when(col("AccountType") == "Checking", 1)).alias("num_checking_accounts"),
    count(when(col("AccountType") == "Business", 1)).alias("num_business_accounts"),
    avg("Balance").alias("avg_balance"),
    max(datediff(current_date(), col("CreatedDate"))).alias("max_account_age_days"),
    max("CreatedDate").alias("last_balance_update"),
    datediff(current_date(), max("CreatedDate")).alias("days_since_last_balance_update")
)

transaction_agg = transactions_df.groupBy("AccountID").agg(
    count("TransactionID").alias("num_transactions"),
    sum("Amount").alias("total_transaction_amount"),
    count(when(col("TransactionType") == "Deposit", 1)).alias("num_deposits"),
    count(when(col("TransactionType") == "Withdrawal", 1)).alias("num_withdrawals"),
    count(when(col("TransactionType") == "Transfer", 1)).alias("num_transfers"),
    count(when(col("TransactionType") == "Payment", 1)).alias("num_payments"),
    max("TransactionDate").alias("last_transaction_date"),
    datediff(current_date(), max("TransactionDate")).alias("days_since_last_transaction")
)

trans_acc = transaction_agg.join(
    accounts_df.select("AccountID", "CustomerID"),
    "AccountID",
    "inner"
).groupBy("CustomerID").agg(
    sum("num_transactions").alias("total_num_transactions"),
    sum("total_transaction_amount").alias("total_transaction_amount"),
    sum("num_deposits").alias("total_deposits"),
    sum("num_withdrawals").alias("total_withdrawals"),
    sum("num_transfers").alias("total_transfers"),
    sum("num_payments").alias("total_payments"),
    max("last_transaction_date").alias("last_transaction_date"),
    max("days_since_last_transaction").alias("days_since_last_transaction")
)

loan_agg = loans_df.groupBy("CustomerID").agg(
    count("LoanID").alias("num_loans"),
    sum("LoanAmount").alias("total_loan_amount"),
    avg("InterestRate").alias("avg_interest_rate"),
    count(when(col("LoanType") == "Car", 1)).alias("num_car_loans"),
    count(when(col("LoanType") == "Personal", 1)).alias("num_personal_loans"),
    count(when(col("LoanType") == "Home", 1)).alias("num_home_loans"),
    count(when(col("LoanType") == "Education", 1)).alias("num_education_loans"),
    max(datediff(col("LoanEndDate"), col("LoanStartDate"))).alias("max_loan_duration_days"),
    max(when(col("LoanEndDate") <= current_date(), 0).otherwise(datediff(col("LoanEndDate"), current_date()))).alias("days_to_loan_end")
)

card_agg = cards_df.groupBy("CustomerID").agg(
    count("CardID").alias("num_cards"),
    count(when(col("CardType") == "Credit", 1)).alias("num_credit_cards"),
    count(when(col("CardType") == "Debit", 1)).alias("num_debit_cards"),
    count(when(col("CardType") == "Prepaid", 1)).alias("num_prepaid_cards"),
    max(datediff(col("ExpirationDate"), current_date())).alias("max_days_to_card_expiry")
)

support_agg = support_calls_df.groupBy("CustomerID").agg(
    count("CallID").alias("num_support_calls"),
    count(when(col("IssueType") == "Transaction Dispute", 1)).alias("num_transaction_disputes"),
    count(when(col("IssueType") == "Loan Query", 1)).alias("num_loan_queries"),
    count(when(col("IssueType") == "Card Issue", 1)).alias("num_card_issues"),
    count(when(col("IssueType") == "Account Access", 1)).alias("num_account_access_issues"),
    count(when(col("Resolved") == "Yes", 1)).alias("num_resolved_calls"),
    max(when(col("Resolved") == "Yes", datediff(current_date(), col("CallDate")))).alias("days_since_last_resolved_call")
)

features_df = customers_features.join(account_agg, "CustomerID", "left") \
    .join(trans_acc, "CustomerID", "left") \
    .join(loan_agg, "CustomerID", "left") \
    .join(card_agg, "CustomerID", "left") \
    .join(support_agg, "CustomerID", "left")

features_df = features_df.withColumn(
    "loan_to_balance_ratio",
    when(
        (col("total_balance").isNotNull()) & (col("total_balance") > 0),
        col("total_loan_amount") / col("total_balance")
    ).otherwise(0.0)
)
display(features_df)

In [None]:
# Code generated by Data Wrangler for PySpark DataFrame

def clean_data(features_df):
    # Drop columns: 'last_balance_update', 'last_transaction_date'
    features_df = features_df.drop('last_balance_update', 'last_transaction_date')
    # Drop duplicate rows across all columns
    features_df = features_df.dropDuplicates()
    # LLM translation result for custom operation
    features_df = features_df.dropna(subset=['num_accounts'])
    # Replace missing values with 0 in columns: 'num_accounts', 'total_balance' and 35 other columns
    features_df = features_df.fillna(value=0, subset=['num_accounts', 'total_balance', 'num_savings_accounts', 'num_checking_accounts', 'num_business_accounts', 'avg_balance', 'max_account_age_days', 'days_since_last_balance_update', 'total_num_transactions', 'total_transaction_amount', 'total_deposits', 'total_withdrawals', 'total_transfers', 'total_payments', 'days_since_last_transaction', 'num_loans', 'total_loan_amount', 'avg_interest_rate', 'num_car_loans', 'num_personal_loans', 'num_home_loans', 'num_education_loans', 'max_loan_duration_days', 'days_to_loan_end', 'num_cards', 'num_credit_cards', 'num_debit_cards', 'num_prepaid_cards', 'max_days_to_card_expiry', 'num_support_calls', 'num_transaction_disputes', 'num_loan_queries', 'num_card_issues', 'num_account_access_issues', 'num_resolved_calls', 'days_since_last_resolved_call', 'loan_to_balance_ratio'])
    return features_df

features_df = clean_data(features_df)
display(features_df)

In [None]:
df = features_df.withColumn(
    "churn",
    when(
        (col("days_since_last_transaction") > 365) |
        ((col("total_num_transactions") <= 5) & (col("days_since_last_transaction") >= 180)) |
        (col("num_cards") == 0) |
        (col("days_since_last_resolved_call") > 180) |
        (col("num_accounts") == 1) & (col("total_balance") < 3000),
        1
    ).otherwise(0)
)
display(df)

In [None]:
# Code generated by Data Wrangler for PySpark DataFrame

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.functions import vector_to_array

def clean_data(df):
    # LLM translation result for custom operation
    indexer = StringIndexer(inputCol='state_code', outputCol='state_code_index')
    df = indexer.fit(df).transform(df).drop('state_code').withColumnRenamed('state_code_index', 'state_code')
    # Select columns to normalize
    columns_to_normalize = [col for col in df.columns if col != 'CustomerID']
    # Assemble the columns into a feature vector
    assembler = VectorAssembler(inputCols=columns_to_normalize, outputCol="features")
    # Initialize the scaler
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    # Create a pipeline to assemble and scale
    pipeline = Pipeline(stages=[assembler, scaler])
    # Fit and transform the data
    scaled_model = pipeline.fit(df)
    scaled_df = scaled_model.transform(df)
    # Split the scaled features back into separate columns
    scaled_df = scaled_df.withColumn("scaled_array", vector_to_array("scaledFeatures"))
    for i, col_name in enumerate(columns_to_normalize):
        scaled_df = scaled_df.withColumn(col_name, scaled_df["scaled_array"][i])
    # Select the final columns including the CustomerID and the scaled columns
    df = scaled_df.drop("features", "scaledFeatures", "scaled_array")
    return df

df_clean = clean_data(df)
display(df_clean)

In [None]:
# Churn: Train-test split and save
train_df, test_df = df_clean.randomSplit([0.9, 0.1], seed=42)
train_df.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save("Files/customer_churn/train")
test_df.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save("Files/customer_churn/test")


# train_df.write.format("delta").saveAsTable("customer_churn_train")
test_df.write.format("delta").saveAsTable("customer_churn_test")

# Automated ML
## Introduction

This notebook is automatically generated by the Fabric low-code AutoML wizard based on your selections. Whether you're building a regression model, a classifier, or another machine-learning solution, this tool simplifies the process by transforming your goals into executable code. You can easily modify any settings or code snippets to better align with your requirements.

### What is FLAML?

[FLAML (Fast and Lightweight Automated Machine Learning)](https://aka.ms/fabric-automl) is an open-source AutoML library designed to quickly and efficiently find the best machine learning models and hyperparameters. FLAML optimizes for speed, accuracy, and cost, making it an excellent choice for a wide range of machine-learning tasks.

### Steps in this notebook

1. **Load the data**: Import your dataset.
2. **Generate features**: Automatically transform and preprocess your data to improve model performance.
3. **Use AutoML to find your best model**: Use FLAML to automatically select the most suitable model and optimize its parameters.
4. **Save the final machine learning model**: Store the trained model for future use.
5. **Generate predictions**: Use the saved model to predict outcomes on new data.

> [!IMPORTANT]
> **Automated ML is currently supported on Fabric Runtimes 1.2+ or any Fabric environment with Spark 3.4+.**


In [None]:
%pip install scikit-learn==1.5.1


### Default notebook optimization

This cell configures the logging and warning settings to reduce unnecessary output and focus on critical information. It suppresses specific warnings and logs from the underlying libraries, ensuring a cleaner and more readable notebook experience.

In [None]:
import logging
import warnings
 
logging.getLogger('synapse.ml').setLevel(logging.CRITICAL)
logging.getLogger('mlflow.utils').setLevel(logging.CRITICAL)
warnings.simplefilter('ignore', category=FutureWarning)
warnings.simplefilter('ignore', category=UserWarning)

## Step 1: Load the Data

This cell is responsible for importing the raw data from the specified source into the notebook environment. The data could come from various sources, such as a file or table in your lakehouse.

Once loaded, this data will serve as the input for subsequent steps, such as data transformation, model training, and evaluation.

In [None]:
import re
import pandas as pd
import numpy as np

# Transform to pandas according to the selected models
X = train_df.limit(100000).toPandas() # Use df.toPandas() to use all the data
X = X.rename(columns = lambda c:re.sub('[^A-Za-z0-9_]+', '_', c))  # Replace not supported characters in column name with underscore to avoid invalid character for model training and saving

target_col = re.sub('[^A-Za-z0-9_]+', '_', "churn")


In [None]:
display(X)

## Step 2: Generate features

Featurization is the process of transforming raw data into a format optimized for training a machine learning model. It ensures the model can access the most relevant information, significantly impacting its accuracy and performance.

This step applies various techniques to refine the data, enhance its quality, and make it compatible with the selected algorithms, helping the model learn patterns more effectively.

In [None]:
# Handle class imbalance
import matplotlib.pyplot as plt


distribution = X[target_col].value_counts(normalize=True)
dominant_class_proportion = distribution.max()

distribution.plot(kind='bar')
plt.title("Class Distribution")
plt.xlabel("Class")
plt.ylabel("Proportion")
plt.show()

if dominant_class_proportion > 0.8:
    print(f"The dataset is imbalanced. The dominant class has {dominant_class_proportion * 100:.2f}% of the samples.")
    print("You may need to handle class imbalance before training the model.")
    print("You can use techniques such as oversampling, undersampling, or using class weights to handle class imbalance.")
    print("For more information, see https://aka.ms/smote-example")
else:
    print("The dataset is balanced.")


In [None]:
# Set Functions if needed for Featurization
def create_fillna_processor(
    df, mean_features=None, median_features=None, mode_features=None
):
    """
    Create a ColumnTransformer that fills missing values in a DataFrame using different strategies
    based on the skewness of the numerical features and the specified feature lists.

    Parameters:
    df (pd.DataFrame): The input DataFrame.
    mean_features (list, optional): List of features to impute using the mean strategy. Defaults to None.
    median_features (list, optional): List of features to impute using the median strategy. Defaults to None.
    mode_features (list, optional): List of features to impute using the mode strategy. Defaults to None.

    Returns:
    ColumnTransformer: A fitted ColumnTransformer that can be used to transform the DataFrame.
    list: List of all features supported by SimpleImputer in the DataFrame.
    list: List of datetime features in the DataFrame.
    """
    if mean_features is None:
        mean_features = []
    if median_features is None:
        median_features = []
    if mode_features is None:
        mode_features = []
    all_features = mean_features + median_features + mode_features
    # Group features by their imputation needs
    mean_features = [
        col
        for col in df.select_dtypes(include=["number"]).columns
        if df[col].skew(skipna=True) <= 1 and col not in all_features
    ] + mean_features
    median_features = [
        col
        for col in df.select_dtypes(include=["number"]).columns
        if df[col].skew(skipna=True) > 1 and col not in all_features
    ] + median_features
    all_features = mean_features + median_features
    datetime_features = df.select_dtypes(include=["datetime"]).columns.tolist()
    mode_features = [col for col in df.columns.tolist() if col not in all_features + datetime_features]

    transformers = []

    if mean_features:
        transformers.append(
            ("mean_imputer", SimpleImputer(strategy="mean"), mean_features)
        )
    if median_features:
        transformers.append(
            ("median_imputer", SimpleImputer(strategy="median"), median_features)
        )
    if mode_features:
        transformers.append(
            ("mode_imputer", SimpleImputer(strategy="most_frequent"), mode_features)
        )

    column_transformer = ColumnTransformer(transformers=transformers)
    all_features = mean_features + median_features + mode_features

    return column_transformer.fit(df), all_features, datetime_features


def fillna(df, processor, all_features, datetime_features):
    """
    Fill missing values in a DataFrame using a specified processor and mode imputation.

    Parameters:
    df (pd.DataFrame): The input DataFrame with missing values.
    processor (object): An object with a `transform` method that processes the DataFrame.
    all_features (list): List of all features supported by SimpleImputer in the DataFrame.
    datetime_features (list): List of datetime features in the DataFrame.

    Returns:
    pd.DataFrame: A DataFrame with missing values filled.
    """
    filled_array = processor.transform(df)
    filled_df = pd.DataFrame(filled_array, columns=all_features)
    if datetime_features:
        datetime_data = df[datetime_features]
        datetime_data.ffill()
        filled_df = pd.concat([datetime_data, filled_df], axis=1)
    for col in df.columns:
        filled_df[col].fillna(filled_df[col].mode()[0], inplace=True)

    return filled_df


In [None]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer


# convert object type to nearest dtype
X = X.convert_dtypes()
X = X.dropna(axis=1, how='all')

# select columns for model training
X = X.select_dtypes(include=['number', 'datetime', 'category'])

from sklearn.model_selection import train_test_split

# You may need to update the test_size based on your scenario
X_train, X_test = train_test_split(X, test_size=0.2, random_state=41)

mean_features, median_features, mode_features = [], [], []
 
preprocessor, all_features, datetime_features = create_fillna_processor(X_train, mean_features, median_features, mode_features)
X_train = fillna(X_train, preprocessor, all_features, datetime_features)
X_test = fillna(X_test, preprocessor, all_features, datetime_features)
 
y_train = X_train.pop(target_col)
y_test = X_test.pop(target_col)

display(X_train[:10])


## Step 3: Use AutoML to find your best model

We will now use FLAML's AutoML to automatically find the best machine learning model for our data. AutoML (Automated Machine Learning) simplifies the model selection process by automatically testing and tuning various algorithms and configurations, helping us quickly identify the most effective model with minimal manual effort.

### Tracking results with experiments in Fabric

Experiments in Fabric let you track the results of your AutoML process, providing a comprehensive view of all the metrics and parameters from your trials.

In [None]:
# MLFlow Logging Related

import mlflow

mlflow.autolog(exclusive=False)
mlflow.set_experiment("Customer-Churn-Prediction")


#### Configure the AutoML trial and settings

These configurations are driven by the AutoML mode and task selected in the wizard. For example, if you select "quick prototype", you'll see a setting for time budget.

In [None]:
# Import the AutoML class from the FLAML package
import flaml
from flaml import AutoML

# Define AutoML settings
settings = {
    "time_budget": 3600, # Total running time in seconds
    "task": "binary", 
    "log_file_name": "flaml_experiment.log",  # FLAML log file
    "eval_method": "cv",
    "n_splits": 3,
    "seed": 41 , # Random seed 
    "mlflow_exp_name": "Customer-Churn-Prediction",  # MLflow experiment name
    "use_spark": True, # whether to use Spark for distributed training
    "n_concurrent_trials": 3,  # the maximum number of concurrent trials 
    "verbose": 1, 
    "featurization": "auto", 
}

if flaml.__version__ > "2.3.3":
    settings["entrypoint"] = "low-code"

# Create an AutoML instance
automl = AutoML(**settings)


#### Run the AutoML trial

Run the AutoML trial, with all trials being tracked as experiment runs. The trial is performed on the processed dataset, using the `Exited` variable as the target, and applying the defined configurations for optimal model selection.

In [None]:
with mlflow.start_run(nested=True, run_name="Customer-Churn-Prediction-Model"):
    automl.fit(
        X_train=X_train, 
        y_train=y_train,  # target column of the training data 
    )

## Step 4: Save the final machine learning model

Upon completing the AutoML trial, you can now save the final, tuned model as an ML model in Fabric.

In [None]:
model_path = f"runs:/{automl.best_run_id}/model"

# Register the model to the MLflow registry
registered_model = mlflow.register_model(model_uri=model_path, name="Customer-Churn-Prediction-Model")

# Print the registered model's name and version
print(f"Model '{registered_model.name}' version {registered_model.version} registered successfully.")

## Step 5: Generate predictions

Microsoft Fabric lets you operationalize machine learning models with a scalable function called `PREDICT`, which supports batch scoring (or batch inferencing) in any compute engine. You can generate batch predictions directly from the Microsoft Fabric notebook or from a given ML model's item page. For more information on how to use `PREDICT`, see [Model scoring with PREDICT in Microsoft Fabric](https://aka.ms/fabric-predict).

1. Generate predictions.

In [None]:
model_name = "Customer-Churn-Prediction-Model"
from synapse.ml.predict import MLFlowTransformer

feature_cols = X_train.columns.to_list()
model = MLFlowTransformer(
    inputCols=feature_cols,
    outputCol=target_col,
    modelName=model_name,
    modelVersion=registered_model.version,
)

df_test = spark.createDataFrame(X_test)
batch_predictions = model.transform(df_test)


In [None]:
display(batch_predictions)

2. Save the predictions to a table.

In [None]:
saved_name = "Tables/customer_churn_test_predictions".replace(".", "_")
batch_predictions.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(saved_name)