# Shapley Value Analysis for Customer Satisfaction

This notebook implements the Shapley Value analysis to identify key drivers of customer satisfaction (or dissatisfaction) based on the methodology described in the paper "Customer Satisfaction Analysis: Identification of Key Drivers" by Conklin, Powaga, and Lipovetsky.

**Steps:**
1. **Setup & Configuration**: Define file paths, column names, and thresholds.
2. **Data Preprocessing**: Load and binarize data.
3. **Shapley Value Calculation**: Compute Shapley values for each feature.
4. **Key Driver Identification**: Determine the optimal set of key dissatisfiers.
5. **Results**: View the Shapley values and the identified key drivers.

## 1. Import Libraries

In [None]:
import pandas as pd
import numpy as np
import itertools
import math

## 2. Define Helper Functions

These functions are the core logic for preprocessing, calculating coalition values, Shapley values, and determining key drivers.

In [None]:
def preprocess_data(filepath, overall_satisfaction_col, feature_cols, 
                    dissatisfaction_threshold, failure_threshold_map,
                    overall_score_higher_is_better=True, 
                    feature_score_higher_is_better=True):
    """
    Loads and preprocesses the data.
    """
    df = pd.read_csv(filepath)

    # Binarize overall satisfaction
    binarized_overall_col = f"{overall_satisfaction_col}_Dissatisfied"
    if overall_score_higher_is_better:
        df[binarized_overall_col] = (df[overall_satisfaction_col] <= dissatisfaction_threshold).astype(int)
    else:
        df[binarized_overall_col] = (df[overall_satisfaction_col] >= dissatisfaction_threshold).astype(int)

    # Binarize feature columns
    binarized_feature_cols = []
    for col in feature_cols:
        bin_col_name = f"{col}_Failed"
        threshold_info = failure_threshold_map.get(col)
        
        current_feature_higher_is_better = feature_score_higher_is_better # Default
        current_failure_threshold = None

        if isinstance(threshold_info, tuple): # (threshold, specific_higher_is_better)
            current_failure_threshold = threshold_info[0]
            current_feature_higher_is_better = threshold_info[1]
        elif threshold_info is not None: # Just threshold, use global feature_score_higher_is_better
            current_failure_threshold = threshold_info
        else:
            raise ValueError(f"Failure threshold not defined for feature: {col}")

        if current_feature_higher_is_better:
            df[bin_col_name] = (df[col] <= current_failure_threshold).astype(int)
        else:
            df[bin_col_name] = (df[col] >= current_failure_threshold).astype(int)
        binarized_feature_cols.append(bin_col_name)
        
    return df, binarized_overall_col, binarized_feature_cols

In [None]:
def get_value_of_coalition(df, coalition_bin_feature_cols, binarized_overall_col):
    """
    Calculates the value v(M) = Reach_M - Noise_M for a given coalition M.
    M is represented by a list of binarized feature column names.
    """
    if not coalition_bin_feature_cols: # Empty coalition
        return 0.0

    # Mask for rows where at least one feature in the coalition has "Failed" (is 1)
    failed_on_any_in_coalition_mask = df[coalition_bin_feature_cols].any(axis=1)
    
    df_dissatisfied = df[df[binarized_overall_col] == 1]
    df_not_dissatisfied = df[df[binarized_overall_col] == 0]

    num_total_dissatisfied = len(df_dissatisfied)
    num_total_not_dissatisfied = len(df_not_dissatisfied)

    if num_total_dissatisfied == 0 and num_total_not_dissatisfied == 0:
        print("Warning: No data points found for calculating coalition value.")
        return 0.0
    
    if num_total_dissatisfied > 0:
        num_failed_and_dissatisfied = df_dissatisfied[failed_on_any_in_coalition_mask[df_dissatisfied.index]].shape[0]
        reach_M = num_failed_and_dissatisfied / num_total_dissatisfied
    else:
        reach_M = 0.0 

    if num_total_not_dissatisfied > 0:
        num_failed_and_not_dissatisfied = df_not_dissatisfied[failed_on_any_in_coalition_mask[df_not_dissatisfied.index]].shape[0]
        noise_M = num_failed_and_not_dissatisfied / num_total_not_dissatisfied
    else:
        noise_M = 0.0

    value_M = reach_M - noise_M
    return value_M

In [None]:
def calculate_shapley_values(df, binarized_feature_cols, binarized_overall_col):
    """
    Calculates Shapley values for each feature.
    """
    num_features = len(binarized_feature_cols)
    feature_indices = list(range(num_features))
    shapley_values = np.zeros(num_features)

    for i in feature_indices: 
        feature_k_col_name = binarized_feature_cols[i]
        remaining_feature_indices = [idx for idx in feature_indices if idx != i]
        
        for m_size in range(num_features): 
            if num_features - m_size - 1 < 0:
                 pass 

            gamma_weight = (math.factorial(m_size) * math.factorial(num_features - m_size - 1)) / math.factorial(num_features)

            for M_indices_tuple in itertools.combinations(remaining_feature_indices, m_size):
                M_cols = [binarized_feature_cols[idx] for idx in M_indices_tuple]
                M_union_k_cols = M_cols + [feature_k_col_name]

                v_M_union_k = get_value_of_coalition(df, M_union_k_cols, binarized_overall_col)
                v_M = get_value_of_coalition(df, M_cols, binarized_overall_col)
                
                shapley_values[i] += gamma_weight * (v_M_union_k - v_M)
                
    return {binarized_feature_cols[i]: shapley_values[i] for i in range(num_features)}

In [None]:
def get_reach_noise_success_for_set(df, set_bin_feature_cols, binarized_overall_col):
    """Calculates Reach, Noise, and Success for a given set of (binarized) features."""
    if not set_bin_feature_cols:
        return 0.0, 0.0, 0.0

    failed_on_any_in_set_mask = df[set_bin_feature_cols].any(axis=1)
    
    df_dissatisfied = df[df[binarized_overall_col] == 1]
    df_not_dissatisfied = df[df[binarized_overall_col] == 0]

    num_total_dissatisfied = len(df_dissatisfied)
    num_total_not_dissatisfied = len(df_not_dissatisfied)

    if num_total_dissatisfied == 0 and num_total_not_dissatisfied == 0:
        return 0.0, 0.0, 0.0

    if num_total_dissatisfied > 0:
        num_failed_and_dissatisfied = df_dissatisfied[failed_on_any_in_set_mask[df_dissatisfied.index]].shape[0]
        reach = num_failed_and_dissatisfied / num_total_dissatisfied
    else:
        reach = 0.0

    if num_total_not_dissatisfied > 0:
        num_failed_and_not_dissatisfied = df_not_dissatisfied[failed_on_any_in_set_mask[df_not_dissatisfied.index]].shape[0]
        noise = num_failed_and_not_dissatisfied / num_total_not_dissatisfied
    else:
        noise = 0.0
        
    success = reach - noise
    return reach, noise, success

In [None]:
def determine_key_drivers(df, binarized_feature_cols, binarized_overall_col, shapley_values_dict):
    """
    Determines the key dissatisfiers based on Shapley values and the Success metric.
    """
    sorted_features = sorted(shapley_values_dict.items(), key=lambda item: item[1], reverse=True)
    
    print("\n--- Determining Key Dissatisfiers (Cumulative Analysis) ---")
    print(f"{'Step':<5} {'Added Feature':<30} {'Cumulative Set Size':<20} {'Reach':<10} {'Noise':<10} {'Success':<10}")
    
    cumulative_set_cols = []
    optimal_set_cols = []
    max_success_achieved = -float('inf')
    results_log = []

    for i, (feature_col, sv) in enumerate(sorted_features):
        current_cumulative_cols_for_step = cumulative_set_cols + [feature_col]
        reach, noise, success = get_reach_noise_success_for_set(df, current_cumulative_cols_for_step, binarized_overall_col)
        
        results_log.append({
            'step': i + 1,
            'added_feature': feature_col.replace('_Failed', ''),
            'set_size': len(current_cumulative_cols_for_step),
            'reach': reach,
            'noise': noise,
            'success': success
        })
        print(f"{i+1:<5} {feature_col.replace('_Failed', ''):<30} {len(current_cumulative_cols_for_step):<20} {reach:<10.3f} {noise:<10.3f} {success:<10.3f}")

        if success >= max_success_achieved:
            max_success_achieved = success
            optimal_set_cols = list(current_cumulative_cols_for_step)
            cumulative_set_cols.append(feature_col)
        else:
            print(f"Success decreased. Optimal set identified before adding '{feature_col.replace('_Failed', '')}'.")
            break 
            
    return [col.replace('_Failed', '') for col in optimal_set_cols], results_log

## 3. Configuration and Execution

Modify the parameters in the next cell to match your dataset and analysis requirements.

In [None]:
# --- Configuration ---
filepath = 'sample_customer_data.csv' # Replace with your actual file path

overall_satisfaction_col = 'OverallSatisfaction' 
dissatisfaction_threshold = 5 
overall_score_higher_is_better = True

feature_cols = ['FeatureA', 'FeatureB', 'FeatureC', 'FeatureD']
failure_threshold_map = {
    'FeatureA': 3,
    'FeatureB': 3,
    'FeatureC': (7, False), 
    'FeatureD': 2
}
feature_score_higher_is_better = True 

# --- Create a dummy sample_customer_data.csv for testing (Optional) ---
# You can comment this out if you have your own CSV file.
data = {
    'OverallSatisfaction': [2, 8, 5, 10, 3, 6, 1, 9, 4, 7, 2, 5, 8, 3, 6, 10, 1, 4, 9, 7],
    'FeatureA':            [1, 5, 3,  4, 2, 5, 1, 4, 2, 3, 1, 3, 5, 2, 4, 5, 1, 2, 5, 3],
    'FeatureB':            [2, 4, 2,  5, 1, 3, 2, 5, 1, 4, 2, 2, 4, 1, 3, 5, 2, 1, 5, 4],
    'FeatureC':            [8, 3, 6,  2, 9, 4, 10,1, 7, 5, 8, 6, 2, 9, 4, 1, 10,7, 3, 5],
    'FeatureD':            [1, 3, 1,  3, 1, 2, 1, 3, 1, 2, 1, 1, 3, 1, 2, 3, 1, 1, 3, 2] 
}
df_sample = pd.DataFrame(data)
df_sample.to_csv(filepath, index=False)
print(f"Created/Replaced dummy data at {filepath}")
# --- End of dummy data creation ---

# 1. Preprocess data
df_processed, bin_overall_col, bin_feature_cols = preprocess_data(
    filepath,
    overall_satisfaction_col,
    feature_cols,
    dissatisfaction_threshold,
    failure_threshold_map,
    overall_score_higher_is_better,
    feature_score_higher_is_better
)
print("\n--- Processed Data Head ---")
print(df_processed[[bin_overall_col] + bin_feature_cols].head())

if df_processed[bin_overall_col].nunique() < 2:
    print(f"\nWarning: The binarized overall satisfaction column '{bin_overall_col}' has only one unique value.")
    print("This will lead to Reach or Noise (or both) being undefined or zero for all coalitions.")
    print("Please check your dissatisfaction_threshold and data distribution.")
    if df_processed[df_processed[bin_overall_col] == 1].empty:
        print("No customers are marked as 'Dissatisfied'.")
    if df_processed[df_processed[bin_overall_col] == 0].empty:
        print("No customers are marked as 'Not Dissatisfied'.")
else:
    # 2. Calculate Shapley values
    print("\nCalculating Shapley values... (this may take time for many features)")
    shapley_values = calculate_shapley_values(df_processed, bin_feature_cols, bin_overall_col)
    
    print("\n--- Shapley Values ---")
    for feature, sv in sorted(shapley_values.items(), key=lambda item: item[1], reverse=True):
        print(f"{feature.replace('_Failed', ''):<30}: {sv:.4f}")

    # 3. Determine Key Dissatisfiers
    key_dissatisfiers, full_log = determine_key_drivers(df_processed, bin_feature_cols, bin_overall_col, shapley_values)
    
    print("\n--- Final Set of Key Dissatisfiers ---")
    if key_dissatisfiers:
        for kd in key_dissatisfiers:
            print(f"- {kd}")
    else:
        print("No key dissatisfiers identified based on the criteria (or no features provided).")

## 4. How to Use and Interpret

1.  **Modify Configuration**: 
    * Update `filepath` to point to your CSV data file.
    * Set `overall_satisfaction_col` to the name of your overall satisfaction score column.
    * Adjust `dissatisfaction_threshold` and `overall_score_higher_is_better` based on your overall satisfaction scale.
    * List your raw feature column names in `feature_cols`.
    * Carefully define `failure_threshold_map`. For each feature, specify the threshold that denotes a "failure". You can also specify if a lower score is better for a particular feature, e.g., `{'PriceComplaints': (1, False)}` might mean 1 or more complaints is a failure, and fewer complaints are better.
    * Set `feature_score_higher_is_better` as the default for your feature scales.
2.  **Run All Cells**: Execute the cells in order (e.g., by clicking "Run All" in your Jupyter environment).
3.  **Interpret Results**:
    * **Processed Data Head**: Shows the first few rows of your data after binarization. Check if `_Dissatisfied` and `_Failed` columns look correct.
    * **Shapley Values**: Lists each feature and its calculated Shapley value. Higher values indicate a greater contribution to the overall "Success" metric (Reach - Noise), meaning the feature is more important in distinguishing dissatisfied customers.
    * **Determining Key Dissatisfiers (Cumulative Analysis)**: This table shows the step-by-step process of adding features (ordered by Shapley value) to a potential set of key dissatisfiers. It tracks the cumulative Reach, Noise, and Success of the set at each step.
    * **Final Set of Key Dissatisfiers**: This is the primary output. It's the set of features that, when considered together, maximized the `Success = Reach - Noise` metric. These are the features your analysis suggests are the most critical drivers of dissatisfaction.

**Important Considerations:**
* **Computational Cost**: Calculating exact Shapley values is computationally intensive (factorial complexity with the number of features). For more than ~10-12 features, this script might become very slow. The original paper suggests sampling for larger datasets.
* **Data Quality**: The quality of your input data and the appropriateness of your thresholds significantly impact the results.
* **Definition of "Failure" and "Dissatisfaction"**: Carefully consider how you define these for your specific context. The thresholds directly influence the binarization and subsequent calculations.