In [2]:
# Churn prediction model with optimized data processing
# Author: Hose I. Rad
# Date: Oct. 19th 2024

"""
This code builds a churn prediction model for an eCommerce platform using user event data.
Due to the large size of the dataset, we tried to optimize data processing steps to handle it efficiently.

Key Features:
- Processes data files in chunks to manage memory usage.
- Utilizes multiprocessing to speed up file processing.
- Extracts meaningful features for churn prediction.
- Implements feature engineering techniques.
- Addresses class imbalance using SMOTE.
- Performs hyperparameter tuning for model optimization.
- Trains an XGBoost classifier for improved accuracy.
"""

# Import necessary libraries
import sys
import subprocess

# Function to install missing libraries
def install_library(package):
    """
    Installs a library using pip if it's not already installed.

    Args:
        package (str): The name of the package to install.
    """
    if package == 'sklearn':
        package = 'scikit-learn'
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# List of required libraries with optional aliases
required_libraries = {
    'pandas': 'pd',
    'numpy': 'np',
    'matplotlib': 'plt',
    'seaborn': 'sns',
    'scikit-learn': None,
    'xgboost': None,
    'os': None,
    'glob': None,
    'imblearn': None,
    'joblib': None,
    'multiprocessing': None,
    'warnings': None
}

# Check and install any missing libraries
print("Checking for missing libraries and installing them if necessary...")
for lib, alias in required_libraries.items():
    try:
        if alias:
            globals()[alias] = __import__(lib)
        else:
            __import__(lib)
    except ImportError:
        print(f"Installing {lib}...")
        install_library(lib)
print("All required libraries are installed.")

# Now import the libraries after installation
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from glob import glob
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from xgboost import XGBClassifier
from imblearn.pipeline import Pipeline  # Use Pipeline from imblearn
from imblearn.over_sampling import SMOTE
import joblib
from multiprocessing import Pool, cpu_count
from functools import reduce
import warnings

# Ignore warnings
warnings.filterwarnings('ignore')

# Set the plot style for seaborn
sns.set(style='whitegrid')


Checking for missing libraries and installing them if necessary...
Installing scikit-learn...
All required libraries are installed.


Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.


In [3]:
# Function to process a single file and extract features
def process_file(file_path):
    """
    Processes a single CSV file to extract features needed for churn prediction.
    
    Args:
        file_path (str): The path to the CSV file.
    
    Returns:
        pd.DataFrame: A DataFrame containing the extracted features.
    """
    print(f"Processing file: {file_path}")

    # Determine if the file is compressed
    compression = 'gzip' if file_path.endswith('.gz') else None

    # Initialize a list to store feature DataFrames from each chunk
    feature_chunks = []

    # Read the file in chunks to handle large files
    for chunk in pd.read_csv(
        file_path,
        compression=compression,
        parse_dates=['event_time'],
        low_memory=False,
        chunksize=5_000_000,  # Adjust this number based on your system's memory
        dtype={
            'event_type': 'category',
            'product_id': 'int32',
            'category_id': 'float32',
            'category_code': 'object',
            'brand': 'object',
            'price': 'float32',
            'user_id': 'int32',
            'user_session': 'object'
        }
    ):
        # Handle missing values in 'category_code' and 'brand'
        chunk['category_code'] = chunk['category_code'].fillna('unknown')
        chunk['brand'] = chunk['brand'].fillna('unknown')

        # Convert event time to datetime and extract additional time features
        chunk['event_time'] = pd.to_datetime(chunk['event_time']).dt.tz_localize(None)
        chunk['event_date'] = chunk['event_time'].dt.date
        chunk['month'] = chunk['event_time'].dt.to_period('M')

        # Group the data to compute features for each user in each month using the new syntax
        features = chunk.groupby(['user_id', 'month']).agg(
            total_events=('event_type', 'count'),
            unique_event_types=('event_type', 'nunique'),
            num_views=('event_type', lambda x: (x == 'view').sum()),
            num_carts=('event_type', lambda x: (x == 'cart').sum()),
            num_purchases=('event_type', lambda x: (x == 'purchase').sum()),
            num_remove_from_cart=('event_type', lambda x: (x == 'remove_from_cart').sum()),
            num_unique_products=('product_id', 'nunique'),
            num_unique_categories=('category_code', 'nunique'),
            avg_price=('price', 'mean'),
            max_price=('price', 'max'),
            min_price=('price', 'min'),
            num_sessions=('user_session', 'nunique'),
            active_days=('event_date', 'nunique'),
            first_event_time=('event_time', 'min'),
            last_event_time=('event_time', 'max')
        ).reset_index()

        # Append the features from this chunk to our list
        feature_chunks.append(features)

    # If we have collected any features, proceed to aggregate them
    if feature_chunks:
        # Concatenate all the feature DataFrames from the chunks
        df_features = pd.concat(feature_chunks, ignore_index=True)
        del feature_chunks  # Free up memory

        # Aggregate features across all chunks for the same user and month
        df_features = df_features.groupby(['user_id', 'month']).agg({
            'total_events': 'sum',
            'unique_event_types': 'sum',
            'num_views': 'sum',
            'num_carts': 'sum',
            'num_purchases': 'sum',
            'num_remove_from_cart': 'sum',
            'num_unique_products': 'sum',
            'num_unique_categories': 'sum',
            'avg_price': 'mean',
            'max_price': 'max',
            'min_price': 'min',
            'num_sessions': 'sum',
            'active_days': 'sum',
            'first_event_time': 'min',
            'last_event_time': 'max'
        }).reset_index()

        # Calculate additional ratios that might be useful
        df_features['view_to_purchase_ratio'] = df_features['num_views'] / df_features['num_purchases']
        df_features['cart_to_purchase_ratio'] = df_features['num_carts'] / df_features['num_purchases']
        df_features.replace([np.inf, np.nan], 0, inplace=True)

        # Calculate session duration
        df_features['session_duration'] = (df_features['last_event_time'] - df_features['first_event_time']).dt.total_seconds()
        df_features['session_duration'].fillna(0, inplace=True)

        # Calculate recency
        max_date = df_features['last_event_time'].max()
        df_features['recency'] = (max_date - df_features['last_event_time']).dt.days

        # Drop unneeded columns
        df_features.drop(['first_event_time', 'last_event_time'], axis=1, inplace=True)

        return df_features
    else:
        # If no features were extracted, return an empty DataFrame
        return pd.DataFrame()

# Function to process all files and prepare the dataset
def process_data(data_path):
    """
    Processes all data files in the specified directory to extract features.

    Args:
        data_path (str): The directory containing the data files.

    Returns:
        pd.DataFrame: A DataFrame containing features from all files.
    """
    # Get a sorted list of all CSV files in the data directory
    files = sorted(glob(os.path.join(data_path, '*.csv*')))

    # Use multiprocessing to process files in parallel
    with Pool(cpu_count()) as pool:
        features_list = pool.map(process_file, files)

    # Filter out any empty DataFrames
    features_list = [df for df in features_list if not df.empty]

    # Concatenate features from all files
    if features_list:
        features_df = pd.concat(features_list, ignore_index=True)
        # Aggregate features across all files
        features_df = features_df.groupby(['user_id', 'month']).sum().reset_index()
        return features_df
    else:
        print("No data processed.")
        return pd.DataFrame()

# Function to define churn based on user activity
def define_churn(features_df):
    """
    Defines churn for each user based on their activity in the subsequent months.

    Args:
        features_df (pd.DataFrame): The DataFrame containing user features.

    Returns:
        pd.DataFrame: A DataFrame with churn labels for each user.
    """
    # Get a sorted list of all months in the data
    months = sorted(features_df['month'].unique())

    churn_data = []

    # For each month, determine if users have churned
    for idx, month in enumerate(months[:-2]):
        current_month = month
        next_months = months[idx + 1: idx + 3]

        users_current = set(features_df[features_df['month'] == current_month]['user_id'])
        users_next = set(features_df[features_df['month'].isin(next_months)]['user_id'])

        for user in users_current:
            # A user is considered churned if they are not active in the next two months
            churn = 1 if user not in users_next else 0
            churn_data.append({'user_id': user, 'month': current_month, 'churn': churn})

    churn_df = pd.DataFrame(churn_data)
    return churn_df

# Function to prepare the dataset for modeling
def prepare_data(features_df, churn_df):
    """
    Merges the features and churn labels into a single DataFrame for modeling.

    Args:
        features_df (pd.DataFrame): The DataFrame containing user features.
        churn_df (pd.DataFrame): The DataFrame containing churn labels.

    Returns:
        pd.DataFrame: The merged DataFrame ready for modeling.
    """
    # Merge features with churn labels
    data = pd.merge(features_df, churn_df, on=['user_id', 'month'], how='inner')
    # Fill any missing values with zero
    data.fillna(0, inplace=True)
    return data

# Function to train the churn prediction model
def train_model(X_train, y_train):
    """
    Trains an XGBoost classifier to predict churn.

    Args:
        X_train (pd.DataFrame): The training features.
        y_train (pd.Series): The training labels.

    Returns:
        Pipeline: The trained model pipeline.
    """
    # Handle class imbalance using SMOTE
    smote = SMOTE(random_state=42)

    # Define the classifier
    xgb = XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42, n_jobs=-1)

    # Create a pipeline with scaling, SMOTE, and the classifier
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('smote', smote),
        ('classifier', xgb)
    ])

    # Hyperparameter tuning
    param_grid = {
        'classifier__n_estimators': [100, 200],
        'classifier__max_depth': [3, 5, 7],
        'classifier__learning_rate': [0.01, 0.1, 0.2],
        'classifier__subsample': [0.8, 1.0],
        'classifier__colsample_bytree': [0.8, 1.0],
    }

    search = RandomizedSearchCV(
        pipeline,
        param_distributions=param_grid,
        n_iter=10,
        scoring='roc_auc',
        cv=3,
        random_state=42,
        n_jobs=-1
    )

    # Fit the model
    search.fit(X_train, y_train)
    print(f"Best parameters found: {search.best_params_}")
    return search.best_estimator_


In [10]:
# Function to train the churn prediction model
def train_model(X_train, y_train):
    """
    Trains an XGBoost classifier to predict churn.

    Args:
        X_train (pd.DataFrame): The training features.
        y_train (pd.Series): The training labels.

    Returns:
        Pipeline: The trained model pipeline.
    """
    # Handle class imbalance using SMOTE
    smote = SMOTE(random_state=42)

    # Define the classifier
    xgb = XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42, n_jobs=-1)

    # Create a pipeline with scaling, SMOTE, and the classifier
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('smote', smote),
        ('classifier', xgb)
    ])

    # Hyperparameter tuning
    param_grid = {
        'classifier__n_estimators': [100, 200],
        'classifier__max_depth': [3, 5, 7],
        'classifier__learning_rate': [0.01, 0.1, 0.2],
        'classifier__subsample': [0.8, 1.0],
        'classifier__colsample_bytree': [0.8, 1.0],
    }

    search = RandomizedSearchCV(
        pipeline,
        param_distributions=param_grid,
        n_iter=10,
        scoring='roc_auc',
        cv=3,
        random_state=42,
        n_jobs=-1
    )

    # Fit the model
    search.fit(X_train, y_train)
    print(f"Best parameters found: {search.best_params_}")
    return search.best_estimator_


In [None]:
# Main Execution Block

print("Starting churn prediction workflow...")
# Set the path to your data directory
data_path = '/sas_new/sasuser/CMJP/hrd/churn_predict'  # Replace with your actual data path
print(f"Data path set to: {data_path}")

# Check if features DataFrame exists
features_file = 'features_df.pkl'
if os.path.exists(features_file):
    print(f"Features DataFrame found at {features_file}. Loading it...")
    features_df = pd.read_pickle(features_file)
else:
    # Process the data files and extract features
    print("Processing data files...")
    features_df = process_data(data_path)
    # Save the features DataFrame to a file
    features_df.to_pickle(features_file)
    print(f"Features DataFrame saved to {features_file}.")


In [13]:
def prepare_data(features_df, churn_df):
    """
    Merges the features and churn labels into a single DataFrame for modeling.

    Args:
        features_df (pd.DataFrame): The DataFrame containing user features.
        churn_df (pd.DataFrame): The DataFrame containing churn labels.

    Returns:
        pd.DataFrame: The merged DataFrame ready for modeling.
    """
    # Merge features with churn labels
    data = pd.merge(features_df, churn_df, on=['user_id', 'month'], how='inner')

    # Exclude 'month' from fillna by identifying non-Period columns manually
    # Assuming 'month' is the only Period dtype column
    non_period_cols = [col for col in data.columns if col != 'month']

    # Fill missing values only for non-Period columns
    data[non_period_cols] = data[non_period_cols].fillna(0)

    return data

In [None]:
# Check if any data was processed
if features_df.empty:
    print("No data available after processing. Exiting.")
else:
    print("\nFeatures DataFrame:")
    print(features_df.head())

# Check if churn labels DataFrame exists
churn_file = 'churn_df.pkl'
if os.path.exists(churn_file):
    print(f"Churn DataFrame found at {churn_file}. Loading it...")
    churn_df = pd.read_pickle(churn_file)
else:
    print("Defining churn labels...")
    churn_df = define_churn(features_df)
    # Save churn labels to file
    churn_df.to_pickle(churn_file)
    print(f"Churn DataFrame saved to {churn_file}.")

# Prepare the final dataset for modeling
print("Preparing final dataset for making the classification model...")
data = prepare_data(features_df, churn_df)
print("\nMerged DataFrame (Features + Churn Labels):")
print(data.head())

# Save the prepared data
prepared_data_file = 'prepared_data.pkl'
data.to_pickle(prepared_data_file)
print(f"Prepared data saved to {prepared_data_file}.")

# Load prepared data if available
prepared_data_file = 'prepared_data.pkl'
if os.path.exists(prepared_data_file):
    print(f"Prepared data found at {prepared_data_file}. Loading it...")
    data = pd.read_pickle(prepared_data_file)
else:
    print("Prepared data not found. Please run the previous cell to prepare data.")

# Split the data into training and testing sets based on months
print("Splitting data into training and testing sets...")
months = sorted(data['month'].unique())
train_months = months[:-2]  # Use all months except the last two for training
test_months = months[-2:]   # Use the last two months for testing

# Create training and testing datasets
train_data = data[data['month'].isin(train_months)].reset_index(drop=True)
test_data = data[data['month'].isin(test_months)].reset_index(drop=True)

# Separate features and target variable
print("Separating features and target variable...")
X_train = train_data.drop(['user_id', 'month', 'churn'], axis=1)
y_train = train_data['churn']
X_test = test_data.drop(['user_id', 'month', 'churn'], axis=1)
y_test = test_data['churn']

# Train the model or load it if already trained
model_file = 'churn_prediction_model.pkl'
if os.path.exists(model_file):
    print(f"Model found at {model_file}. Loading it...")
    model = joblib.load(model_file)
else:
    print("Training the model...")
    model = train_model(X_train, y_train)
    # Save the trained model for future use
    joblib.dump(model, model_file)
    print(f"\nModel training complete and saved as '{model_file}'.")

# Make predictions on the test set
print("Making predictions on the test set...")
y_pred = model.predict(X_test)

# Evaluate the model
print("\nClassification Report:\n")
print(classification_report(y_test, y_pred, target_names=['Retained', 'Churned']))

# Calculate ROC-AUC Score
if hasattr(model.named_steps['classifier'], 'predict_proba'):
    y_proba = model.predict_proba(X_test)[:, 1]
    print(f"ROC-AUC Score: {roc_auc_score(y_test, y_proba):.4f}")
else:
    print("ROC-AUC Score cannot be calculated for this classifier.")

# Plot the confusion matrix
print("Plotting confusion matrix...")
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6, 4))
sns.heatmap(
    cm,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=['Retained', 'Churned'],
    yticklabels=['Retained', 'Churned']
)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

# Display feature importance
if hasattr(model.named_steps['classifier'], 'feature_importances_'):
    print("Displaying feature importance...")
    importances = model.named_steps['classifier'].feature_importances_
    feat_importances = pd.Series(importances, index=X_train.columns)
    feat_importances = feat_importances.sort_values(ascending=False)

    # Plot the top 10 important features
    plt.figure(figsize=(8, 6))
    feat_importances.head(10).plot(kind='barh')
    plt.xlabel('Feature Importance Score')
    plt.ylabel('Features')
    plt.title('Top 10 Important Features')
    plt.gca().invert_yaxis()
    plt.show()
else:
    print("Feature importance is not available for this classifier.")


Features DataFrame:
    user_id    month  total_events  unique_event_types  num_views  num_carts  \
0  10300217  2019-11             1                   1          1          0   
1  12511517  2020-02             1                   1          1          0   
2  12511517  2020-03             1                   1          1          0   
3  22165363  2020-01             2                   1          2          0   
4  22165363  2020-02            10                   4         10          0   

   num_purchases  num_remove_from_cart  num_unique_products  \
0              0                     0                    1   
1              0                     0                    1   
2              0                     0                    1   
3              0                     0                    1   
4              0                     0                    8   

   num_unique_categories   avg_price   max_price   min_price  num_sessions  \
0                      1   40.540001   40