In [1]:
import pandas as pd
import re
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_curve, roc_auc_score, classification_report, log_loss
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from scipy.sparse import csr_matrix


# Load Data from PostgreSQL
db_config = {
    'host': 'localhost',
    'database': 'Liberty',
    'user': 'postgres',
    'password': 'abc',
    'port': '5432'
}

connection_string = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}"
engine = create_engine(connection_string)

query = 'SELECT * FROM public.policydata_with_fb_cc_pc_newfea_opti_correct;'
df = pd.read_sql(query, con=engine)

In [2]:
from sklearn.preprocessing import LabelEncoder
from sklearn.impute import SimpleImputer

df_filtered = df[df['Policy Status'].isin(['Renewed', 'Not Renewed'])]

df_filtered['Status_Binary'] = df_filtered['Policy Status'].apply(lambda x: 1 if x == 'Not Renewed' else 0)

# Select relevant columns
selected_columns = ['product name', 'biztype', 'age', 'vehicle idv', 'before gst add-on gwp', 'total od premium', 'total tp premium', 'gst', 
 'total premium payable', 'ncb % previous year', 'applicable discount with ncb', 'tie up',
 'Number of claims', 'approved', 'denied', 'Policy Tenure', 'Customer Tenure', 'New Customers', 'Claim Happaned/Not', 
 'Renewal Rate Status', 'policy_wise_purchase', 'Status_Binary']

df_selected = df_filtered[selected_columns]

# Convert numerical columns to float
numerical_columns = ['age', 'vehicle idv', 'before gst add-on gwp', 'total od premium', 'total tp premium', 'gst', 
 'total premium payable', 'ncb % previous year', 'applicable discount with ncb',
 'Number of claims', 'approved', 'denied', 'Policy Tenure', 'Customer Tenure', 'policy_wise_purchase', 'Status_Binary']


df_selected[numerical_columns] = df_selected[numerical_columns].apply(pd.to_numeric, errors='coerce')

# One-hot encode categorical columns
categorical_columns = ['product name', 'biztype', 'tie up', 'New Customers', 'Claim Happaned/Not', 
 'Renewal Rate Status']

df_selected = pd.get_dummies(df_selected, columns=categorical_columns, drop_first=True)

# Impute missing values
imputer = SimpleImputer(strategy='constant', fill_value=0)
df_selected = pd.DataFrame(imputer.fit_transform(df_selected), columns=df_selected.columns)

# Convert to sparse matrix
X = csr_matrix(df_selected.drop(columns=['Status_Binary']).values)
y = df_selected['Status_Binary']

# Debugging dimensions
print("Shape of X:", X.shape)
print("Shape of y:", y.shape)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['Status_Binary'] = df_filtered['Policy Status'].apply(lambda x: 1 if x == 'Not Renewed' else 0)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_selected[numerical_columns] = df_selected[numerical_columns].apply(pd.to_numeric, errors='coerce')


Shape of X: (1503014, 42)
Shape of y: (1503014,)


In [3]:
import numpy as np
import json
from collections import defaultdict
from sklearn.ensemble import GradientBoostingClassifier

# Prepare feature names
feature_names = df_selected.drop(columns=['Status_Binary']).columns.tolist()

# Train Gradient Boosting model
model = GradientBoostingClassifier(
    max_depth=6,
    learning_rate=0.1,
    n_estimators=100,
    random_state=42
)

model.fit(X, y)

In [4]:
# Initialize nested dictionaries to hold counts
# Structure: {class: {feature: {operator: {threshold: count}}}}
condition_counts = {
    0: defaultdict(lambda: defaultdict(lambda: defaultdict(int))),
    1: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
}

# Loop over each tree
for tree_idx, estimator in enumerate(model.estimators_.ravel()):
    tree = estimator.tree_
    n_nodes = tree.node_count

    # For each leaf node
    for node_id in range(n_nodes):
        if tree.children_left[node_id] == tree.children_right[node_id]:
            # Leaf node detected

            # Reconstruct decision path
            path = []
            current_node = node_id

            while current_node != 0:
                parent = np.where(
                    (tree.children_left == current_node) | (tree.children_right == current_node)
                )[0][0]

                threshold = tree.threshold[parent]
                feature_idx = tree.feature[parent]
                feature_name = feature_names[feature_idx]

                if tree.children_left[parent] == current_node:
                    operator = "<="
                else:
                    operator = ">"

                path.append((feature_name, operator, threshold))
                current_node = parent

            # Reverse to root-to-leaf
            path = path[::-1]

            # Get predicted logit and convert to probability
            logit = tree.value[node_id][0][0]
            prob = 1 / (1 + np.exp(-logit))
            predicted_class = 1 if prob > 0.5 else 0

            # Count each unique condition in the path
            for feature_name, operator, threshold in set(path):
                condition_counts[predicted_class][feature_name][operator][threshold] += 1

# Write results to file
output_file = "feature_dependency_per_class_top15(main).txt"
with open(output_file, "w") as file:
    file.write("Top 5 Splits per Feature per Class:\n")
    file.write("=" * 100 + "\n\n")

    for feature in feature_names:
        file.write(f"Feature: {feature}\n")
        file.write("-" * 100 + "\n")
        for cls in (0, 1):
            file.write(f"Class {cls}:\n")

            # Collect splits for this feature and class
            splits = []
            for operator in ("<=", ">"):
                thresholds_counts = condition_counts[cls][feature][operator]
                for threshold, count in thresholds_counts.items():
                    splits.append((operator, threshold, count))

            # Sort by count descending
            splits = sorted(splits, key=lambda x: x[2], reverse=True)

            # Take top 5
            top_splits = splits[:15]

            if not top_splits:
                file.write("  (No splits for this class)\n")
            else:
                for operator, threshold, count in top_splits:
                    file.write(f"  - {feature} {operator} {threshold:.4f}  (Count: {count})\n")

            file.write("\n")
        file.write("=" * 100 + "\n\n")

print(f"\nTop 5 splits per feature per class saved to '{output_file}'")


Top 5 splits per feature per class saved to 'feature_dependency_per_class_top15(main).txt'
