In [4]:
# Data Manipulation and Handling
import polars as pl
import pandas as pd
import numpy as np
import psycopg2

# DB Credentials
from dotenv import load_dotenv
import os
import sys
from sqlalchemy import create_engine

# Machine Learning Libraries
import torch
import xgboost as xgb
import lightgbm as lgb
# from catboost import CatBoostClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score, roc_curve

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Handling Imbalanced Data
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline as ImbPipeline

# Gradient Boosting Libraries
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

# Model Lifecycle Management
import mlflow
import mlflow.sklearn

# Distributed Computing
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier as SparkRFClassifier

# Model Interpretability
import shap

# Hyperparameter Optimization
import optuna

# Automated Feature Engineering
import featuretools as ft

# Add parent directory to sys.path
current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)

# Custom Modules
from fetch_data_hook import fetch_sql_code, fetch_sql_file

IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.


# ML Pipeline Flow

In [1]:
# 1. EDA Analysis
# 2. Feature Engineering
# 3. Train-Test Split
# 4. Feature Scaling: {normalization/standardization, dimension reduction techniques, handling imbalance datasets/sampling}
# 5. Model Training -> Tuning -> Evaluating
# 6. Model Prediction
# 7. Model Deployment

In [6]:
churn_df = fetch_sql_code('''
WITH temp1 AS (
    SELECT
        *,
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp) AS rn,
        timestamp::date - ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY timestamp)::int AS streak_id
    FROM
        equity_value_data
),
temp2 AS (
    SELECT
        user_id,
        MIN(timestamp::date) AS start_streak_date,
        MAX(timestamp::date) AS end_streak_date,
        COUNT(*) AS duration_of_above10_streak
    FROM
        temp1
    GROUP BY
        user_id, streak_id
),
temp3 AS (
    SELECT
        *,
        LAG(end_streak_date) OVER (PARTITION BY user_id ORDER BY start_streak_date ASC) AS prev_above10_streak_date,
        start_streak_date - LAG(end_streak_date) OVER (PARTITION BY user_id ORDER BY start_streak_date ASC) AS duration_between_above10_streaks
    FROM
        temp2
)
SELECT distinct user_id
FROM temp3
WHERE duration_between_above10_streaks >= 28
''')
churn_df
churn_users = set(churn_df['user_id'].tolist())

df = fetch_sql_code('''
select * from features_data
''')

df['churn_flag'] = df['user_id'].apply(lambda x: 1 if x in churn_users else 0 )
df


Unnamed: 0,risk_tolerance,investment_experience,liquidity_needs,platform,time_spent,instrument_type_first_traded,first_deposit_amount,time_horizon,user_id,churn_flag
0,high_risk_tolerance,limited_investment_exp,very_important_liq_need,Android,33.129417,stock,40.0,med_time_horizon,895044c23edc821881e87da749c01034,0
1,med_risk_tolerance,limited_investment_exp,very_important_liq_need,Android,16.573517,stock,200.0,short_time_horizon,458b1d95441ced242949deefe8e4b638,0
2,med_risk_tolerance,limited_investment_exp,very_important_liq_need,iOS,10.008367,stock,25.0,long_time_horizon,c7936f653d293479e034865db9bb932f,0
3,med_risk_tolerance,limited_investment_exp,very_important_liq_need,Android,1.031633,stock,100.0,short_time_horizon,b255d4bd6c9ba194d3a350b3e76c6393,0
4,high_risk_tolerance,limited_investment_exp,very_important_liq_need,Android,8.187250,stock,20.0,long_time_horizon,4a168225e89375b8de605cbc0977ae91,0
...,...,...,...,...,...,...,...,...,...,...
5579,high_risk_tolerance,limited_investment_exp,very_important_liq_need,Android,8.339283,stock,300.0,long_time_horizon,03880c726d8a4e5db006afe4119ad974,0
5580,med_risk_tolerance,limited_investment_exp,somewhat_important_liq_need,iOS,7.241383,stock,100.0,short_time_horizon,ae8315109657f44852b24c6bca4decd6,1
5581,med_risk_tolerance,no_investment_exp,very_important_liq_need,both,22.967167,stock,50.0,short_time_horizon,f29c174989f9737058fe808fcf264135,0
5582,med_risk_tolerance,limited_investment_exp,somewhat_important_liq_need,iOS,10.338417,stock,100.0,long_time_horizon,24843497d1de88b2e7233f694436cb3a,0


In [None]:
'''
This pipeline includes the following:

Data Preprocessing: Label encoding, scaling, and feature engineering.
Handling Imbalance: Using SMOTE for oversampling the minority class.
Modeling and Hyperparameter Tuning: Using Optuna to tune the hyperparameters of models like XGBoost, LightGBM, RandomForest, and Logistic Regression.
Evaluation: Automatically selecting the best model with the highest AUC score.
Explainability: Using SHAP to interpret the model.'''

### Step 1: Data Preprocessing and Feature Engineering

In [None]:
# Label encoding for categorical variables
label_enc = LabelEncoder()
df['risk_tolerance'] = label_enc.fit_transform(df['risk_tolerance'])
df['investment_experience'] = label_enc.fit_transform(df['investment_experience'])
df['liquidity_needs'] = label_enc.fit_transform(df['liquidity_needs'])
df['platform'] = label_enc.fit_transform(df['platform'])
df['time_horizon'] = label_enc.fit_transform(df['time_horizon'])

# Scaling the numerical features
scaler = StandardScaler()
df[['time_spent', 'first_deposit_amount']] = scaler.fit_transform(df[['time_spent', 'first_deposit_amount']])

# Feature Engineering: Additional features
df['deposit_per_time'] = df['first_deposit_amount'] / (df['time_spent'] + 1)  # Avoid division by zero
df['is_high_risk'] = (df['risk_tolerance'] == 0).astype(int)  # Assuming 0 = high risk

# Prepare X and y
X = df.drop(columns=['user_id', 'churn_flag'])
y = df['churn_flag']

# Handle class imbalance using SMOTE
smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(X, y)


### Step 2: Model Definition and Objective Function for Optuna

In [None]:
#Here, we define the models: XGBoost, LightGBM, RandomForest, and Logistic Regression with L1/L2 regularization.
# Optuna will optimize each model's hyperparameters.

def objective(trial):
    model_type = trial.suggest_categorical('model_type', ['xgboost', 'lightgbm', 'randomforest', 'logistic'])

    if model_type == 'xgboost':
        model = xgb.XGBClassifier(
            max_depth=trial.suggest_int('max_depth', 3, 10),
            learning_rate=trial.suggest_loguniform('learning_rate', 0.01, 0.3),
            n_estimators=trial.suggest_int('n_estimators', 50, 300),
            subsample=trial.suggest_uniform('subsample', 0.6, 1.0)
        )

    elif model_type == 'lightgbm':
        model = lgb.LGBMClassifier(
            num_leaves=trial.suggest_int('num_leaves', 20, 150),
            learning_rate=trial.suggest_loguniform('learning_rate', 0.01, 0.3),
            n_estimators=trial.suggest_int('n_estimators', 50, 300),
            feature_fraction=trial.suggest_uniform('feature_fraction', 0.6, 1.0)
        )

    elif model_type == 'randomforest':
        model = RandomForestClassifier(
            max_depth=trial.suggest_int('max_depth', 3, 10),
            n_estimators=trial.suggest_int('n_estimators', 50, 300),
            max_features=trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2'])
        )

    elif model_type == 'logistic':
        model = LogisticRegression(
            penalty=trial.suggest_categorical('penalty', ['l1', 'l2']),
            C=trial.suggest_loguniform('C', 0.01, 10),
            solver='liblinear'
        )

    # Train-Test split
    X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=0.3, random_state=42)

    # Fit the model
    model.fit(X_train, y_train)

    # Predict on test set
    y_pred = model.predict(X_test)

    # Calculate AUC score
    auc = roc_auc_score(y_test, y_pred)

    return auc


### Step 3: Running Optuna for Hyperparameter Tuning

In [None]:
# Run Optuna for hyperparameter tuning
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)

# Print the best trial and parameters
print(f"Best Trial: {study.best_trial}")
print(f"Best Parameters: {study.best_params}")


### Step 4: Final Model Selection and Evaluation

In [None]:
#After Optuna identifies the best model and parameters, we will retrain the model and evaluate it on the test set
# with detailed metrics.
# Train the final model based on best parameters
best_params = study.best_params
if best_params['model_type'] == 'xgboost':
    best_model = xgb.XGBClassifier(**best_params)
elif best_params['model_type'] == 'lightgbm':
    best_model = lgb.LGBMClassifier(**best_params)
elif best_params['model_type'] == 'randomforest':
    best_model = RandomForestClassifier(**best_params)
else:
    best_model = LogisticRegression(**best_params)

# Train on the full resampled data
best_model.fit(X_resampled, y_resampled)

# Predict on test set
y_pred = best_model.predict(X_test)

# Evaluate final model performance
accuracy = accuracy_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred)
conf_matrix = confusion_matrix(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print(f'Final Model Accuracy: {accuracy}')
print(f'Final Model ROC AUC: {roc_auc}')
print(conf_matrix)
print(class_report)


### Step 5: Model Explainability with SHAP

In [None]:
# SHAP for feature importance
explainer = shap.TreeExplainer(best_model)
shap_values = explainer.shap_values(X_test)

# SHAP Summary Plot
shap.summary_plot(shap_values, X_test, feature_names=X.columns)


### Step 6: Visualization

In [None]:
# Distribution of time spent
sns.histplot(df['time_spent'], kde=True)
plt.title('Distribution of Time Spent')
plt.show()

# Boxplot of first deposit amount by churn flag
sns.boxplot(x='churn_flag', y='first_deposit_amount', data=df)
plt.title('First Deposit by Churn Flag')
plt.show()

# Correlation heatmap
plt.figure(figsize=(12,8))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm')
plt.title('Correlation Heatmap')
plt.show()


### Step 6: Model Explainability with SHAP

In [None]:
import shap

explainer = shap.TreeExplainer(best_model)
shap_values = explainer.shap_values(X_test)

# Plot SHAP summary
shap.summary_plot(shap_values, X_test)


## EDA

In [None]:
# Distribution of Numerical Features:
sns.histplot(df['time_spent'], kde=True)
plt.title('Distribution of Time Spent in App')
plt.show()

sns.boxplot(x='churn_flag', y='first_deposit_amount', data=df)
plt.title('First Deposit Amount by Churn Flag')
plt.show()


In [None]:
plt.figure(figsize=(12,8))
sns.heatmap(df.corr(), annot=True, fmt='.2f', cmap='coolwarm')
plt.title('Correlation Heatmap')
plt.show()


In [None]:
# categorical features
sns.countplot(x='platform', hue='churn_flag', data=df)
plt.title('Churn by Platform')
plt.show()
