In [17]:
import numpy as np
import pandas as pd
import os

import tensorflow as tf
import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

from tensorflow.keras import layers

from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import MinMaxScaler

import shap

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

np.random.seed(1)

In [18]:
# Import dataset
filepath = os.path.join('..', 'datasets', 'brand_effect_res_cust.csv')
df = pd.read_csv(filepath)

y = df['target']

# Drop `ID` column
df = df.drop(columns=['customer_id'])
df.head()

Unnamed: 0,inductively_brand_aware,spontaneously_brand_aware,brand_distinct,brand_quality,intend_to_buy,reason_try,reason_competition,reason_repeat,reason_premium,reason_superiority,after_purchase_satisfiedness,recommend,brandful,target
0,0.0,1.0,1.0,0.0,0.0,0.0,0.5,1.0,0.0,0.0,1.0,1.0,1.0,2
1,1.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,1
2,0.5,0.0,1.0,0.0,1.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,2
3,0.0,0.5,1.0,1.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0,1.0,2
4,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1


In [19]:
# Get class balance of 'target' col
df['target'].value_counts()

target
2    4086
3    1662
1    1188
4      32
0      32
Name: count, dtype: int64

In [20]:
# Define number of synthetic samples per class
num_samples_per_class = 300

# Define weight dictionary for the 13 features
feature_weights = {
    "inductively_brand_aware": 0.80, "spontaneously_brand_aware": 0.85, "brand_distinct": 0.78, "brand_quality": 0.82,
    "intend_to_buy": 0.88, "reason_try": 0.65, "reason_competition": 0.70, "reason_repeat": 0.90,
    "reason_premium": 0.45, "reason_superiority": 0.75, "after_purchase_satisfiedness": 0.92, "recommend": 0.85, "brandful": 0.87
}

# Function to compute normalized score
def compute_normalized_score(row):
    row_values = np.array([row[feature] for feature in feature_weights.keys()])
    weight_values = np.array(list(feature_weights.values()))
    return np.dot(row_values, weight_values) / np.sum(weight_values)


# Function to generate synthetic data ensuring exactly 150 samples per class
def generate_synthetic_data(num_samples, low_values, high_values, condition, target_label):
    synthetic_samples = []
    while len(synthetic_samples) < num_samples:
        sample = np.random.choice(low_values if target_label == 0 else high_values, size=(13,))
        normalized_score = np.dot(sample, list(feature_weights.values())) / np.sum(list(feature_weights.values()))
        if condition(normalized_score):
            synthetic_samples.append(list(sample) + [normalized_score, target_label])
    
    # Convert the synthetic_samples list into a pandas DataFrame
    column_names = list(feature_weights.keys()) + ['normalized_score', 'target']
    return pd.DataFrame(synthetic_samples, columns=column_names)


# Generate synthetic data for class 0 (low scores, choose from 0 and 0.5)
synthetic_class_0 = generate_synthetic_data(num_samples_per_class, [0, 0.5], [0.5, 1], lambda x: x < 0.2, 0)
synthetic_class_0["target"] = 0

# Generate synthetic data for class 4 (high scores, choose from 0.5 and 1)
synthetic_class_4 = generate_synthetic_data(num_samples_per_class, [0, 0.5], [0.5, 1], lambda x: x >= 0.8, 4)
synthetic_class_4["target"] = 4


# Merge synthetic samples into dataset
df = pd.concat([df, synthetic_class_0, synthetic_class_4], ignore_index=True)

# Drop temporary score column
df.drop(columns=["normalized_score"], inplace=True)

# Get class balance of 'target' col
df['target'].value_counts()

target
2    4086
3    1662
1    1188
4     332
0     332
Name: count, dtype: int64

In [21]:
# 1. Isolate X variables
X = df.drop(columns=['target'])

# 2. Isolate y variable
y = df['target']

# 3. Split into train and test sets
X_tr, X_test, y_tr, y_test = train_test_split(X, y, stratify=y,
                                              test_size=0.15, random_state=42)

# 4. Split into train and validate sets
X_train, X_val, y_train, y_val = train_test_split(X_tr, y_tr, stratify=y_tr,
                                                  test_size=0.17, random_state=42)

# Print class distributions
print("Train class distribution:", np.unique(y_train, return_counts=True))
print("Validation class distribution:", np.unique(y_val, return_counts=True))
print("Test class distribution:", np.unique(y_test, return_counts=True))

Train class distribution: (array([0, 1, 2, 3, 4], dtype=int64), array([ 234,  838, 2882, 1173,  234], dtype=int64))
Validation class distribution: (array([0, 1, 2, 3, 4], dtype=int64), array([ 48, 172, 591, 240,  48], dtype=int64))
Test class distribution: (array([0, 1, 2, 3, 4], dtype=int64), array([ 50, 178, 613, 249,  50], dtype=int64))


In [22]:
for x in [X_train, X_val, X_test]:
    print(len(x))

5361
1099
1140


In [23]:
# Min-Max Scaling
scaler = MinMaxScaler()
scaler.fit(X_train)

X_train_scaled = scaler.transform(X_train)
X_train = X_train_scaled

X_val_scaled = scaler.transform(X_val)
X_val = X_val_scaled

X_test_scaled = scaler.transform(X_test)
X_test = X_test_scaled

print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)
print("Shape of X_val:", X_val.shape)
print("Shape of y_val:", y_val.shape)
print("Shape of X_test:", X_test.shape)
print("Shape of y_test:", y_test.shape)

Shape of X_train: (5361, 13)
Shape of y_train: (5361,)
Shape of X_val: (1099, 13)
Shape of y_val: (1099,)
Shape of X_test: (1140, 13)
Shape of y_test: (1140,)


In [24]:
n_x = 13      # number of input features
n_y = 5       # number of ouputs

In [25]:
def predict(n_h1, n_h2):

    # Define the input layer separately
    input_layer = layers.Input(shape=(n_x,))  # Define the input layer explicitly

    # Define the rest of the model
    x = layers.Dense(n_h1, activation='relu')(input_layer)  # First hidden layer
    x = layers.Dense(n_h2, activation='relu')(x)  # Second hidden layer
    output_layer = layers.Dense(n_y, activation='softmax')(x)  # Output layer (binary classification)

    # Create the model by specifying inputs and outputs
    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)

    # Compile the model
    model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])


    # Train the model
    history = model.fit(
        X_train, y_train, 
        epochs=150, 
        batch_size=16, 
        validation_data=(X_val, y_val),
        verbose=0
    )
    
    # Training set
    
    # Get predictions
    y_train_pred_probs = model.predict(X_train)  # Probabilities
    y_train_pred = y_train_pred_probs.argmax(axis=1)  # Convert to class predictions
    
    # Compute metrics
    f1_macro_train = f1_score(y_train, y_train_pred, average="macro")
    f1_weighted_train = f1_score(y_train, y_train_pred, average="weighted")
    recall_macro_train = recall_score(y_train, y_train_pred, average="macro")
    roc_auc_ovr_train = roc_auc_score(y_train, y_train_pred_probs, average="macro", multi_class="ovr")
    
    print(f"Training Metrics:")
    print(f"F1 Macro: {f1_macro_train:.4f}")
    print(f"F1 Weighted: {f1_weighted_train:.4f}")
    print(f"Recall Macro: {recall_macro_train:.4f}")
    print(f"ROC AUC OVR: {roc_auc_ovr_train:.4f}")
    
    
    # Validation set
    
    # Get predictions
    y_val_pred_probs = model.predict(X_val)  # Probabilities
    y_val_pred = y_val_pred_probs.argmax(axis=1)  # Convert to class predictions
    
    # Compute metrics
    f1_macro_val = f1_score(y_val, y_val_pred, average="macro")
    f1_weighted_val = f1_score(y_val, y_val_pred, average="weighted")
    recall_macro_val = recall_score(y_val, y_val_pred, average="macro")
    roc_auc_ovr_val = roc_auc_score(y_val, y_val_pred_probs, average="macro", multi_class="ovr")
    
    print(f"Validation Metrics:")
    print(f"F1 Macro: {f1_macro_val:.4f}")
    print(f"F1 Weighted: {f1_weighted_val:.4f}")
    print(f"Recall Macro: {recall_macro_val:.4f}")
    print(f"ROC AUC OVR: {roc_auc_ovr_val:.4f}")
    
    
    # Test set
    
    # Get predictions
    y_test_pred_probs = model.predict(X_test)  # Probabilities
    y_test_pred = y_test_pred_probs.argmax(axis=1)  # Convert to class predictions
    
    # Compute metrics
    f1_macro_test = f1_score(y_test, y_test_pred, average="macro")
    f1_weighted_test = f1_score(y_test, y_test_pred, average="weighted")
    recall_macro_test = recall_score(y_test, y_test_pred, average="macro")
    roc_auc_ovr_test = roc_auc_score(y_test, y_test_pred_probs, average="macro", multi_class="ovr")
    
    print(f"Test Metrics:")
    print(f"F1 Macro: {f1_macro_test:.4f}")
    print(f"F1 Weighted: {f1_weighted_test:.4f}")
    print(f"Recall Macro: {recall_macro_test:.4f}")
    print(f"ROC AUC OVR: {roc_auc_ovr_test:.4f}")

    return model, f1_macro_test, f1_weighted_test, recall_macro_test, roc_auc_ovr_test 

In [35]:
n_h1=0
n_h2=0
best_results = {
    'f1_macro' : 0,
    'f1_weighted' : 0,
    'recall_macro' : 0,
    'roc_auc_ovr' : 0
}
sum_best_results = 0
h1_candidates = [2 * n_x, 3 * n_x, 4 * n_x]
h2_candidates = [h1 // 2 for h1 in h1_candidates]

for h1, h2 in zip(h1_candidates, h2_candidates):
    print(f"Training FNN with H1={h1}, H2={h2}")
    
    model, f1_macro, f1_weighted, recall_macro, roc_auc_ovr = predict(h1, h2)
    
    sum_results = f1_macro + f1_weighted + recall_macro + roc_auc_ovr
    
    if sum_results > sum_best_results:
        sum_best_results = sum_results
        best_results['f1_macro'] = f1_macro
        best_results['f1_weighted'] = f1_weighted
        best_results['recall_macro'] = recall_macro
        best_results['roc_auc_ovr'] = roc_auc_ovr
        n_h1 = h1
        n_h2 = h2
            
print("")
print(f'Best test results come from FNN ({n_x},{n_h1},{n_h2},{n_y})')

print(f'F1 Macro: {best_results["f1_macro"]:.4f}')
print(f'F1 Weighted: {best_results["f1_weighted"]:.4f}')
print(f'Recall Macro: {best_results["recall_macro"]:.4f}')
print(f'ROC AUC OVR: {best_results["roc_auc_ovr"]:.4f}')

results_df = pd.DataFrame([best_results])  # Use [best_results] to make it a single-row DataFrame

# Step 3: Export to Excel
results_df.to_excel('be_res_best_FNN_results.xlsx', index=False)

# Optional: Display DataFrame
print(results_df)


Training FNN with H1=26, H2=13
[1m168/168[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step  
Training Metrics:
F1 Macro: 0.9908
F1 Weighted: 0.9927
Recall Macro: 0.9914
ROC AUC OVR: 1.0000
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Validation Metrics:
F1 Macro: 0.9817
F1 Weighted: 0.9835
Recall Macro: 0.9820
ROC AUC OVR: 0.9997
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Test Metrics:
F1 Macro: 0.9851
F1 Weighted: 0.9851
Recall Macro: 0.9871
ROC AUC OVR: 0.9997
Training FNN with H1=39, H2=19
[1m168/168[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step
Training Metrics:
F1 Macro: 0.9973
F1 Weighted: 0.9989
Recall Macro: 0.9955
ROC AUC OVR: 1.0000
[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Validation Metrics:
F1 Macro: 0.9935
F1 Weighted: 0.9909
Recall Macro: 0.9927
ROC AUC OVR: 0.9997
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Test Metri

In [37]:
def shap_to_probability_delta(shap_val):
    return np.tanh(shap_val)  # maps raw SHAP value to ~[-1, 1]

In [39]:
# Sample background data from training set (DeepExplainer needs this for reference)
background = X_train[:100]

# Initialize SHAP DeepExplainer
explainer = shap.DeepExplainer(model, background)

# Compute SHAP values for the dataset you want to explain (usually X_train or X_test)
shap_values = explainer.shap_values(X_train)  # Ensure data is in NumPy format

# Extract SHAP values for class 1 (positive class) from shap_values
shap_values_class_1 = shap_values[:, :, 0]

# Compute the mean SHAP value per feature (across all samples)
mean_shap_values = np.mean(shap_values_class_1, axis=0)  # Mean SHAP value per feature for positive class

# Compute mean absolute SHAP values per feature
mean_abs_shap_values = np.mean(np.abs(shap_values_class_1), axis=0)  # Mean absolute SHAP values per feature

# Direction: 'Increases' if the mean SHAP value for a feature is positive, 'Decreases' otherwise
direction = ['Inc.' if val > 0 else 'Dec.' for val in mean_shap_values]

# Binning the impact into categories (Low, Medium, High) based on quantiles of the mean absolute SHAP values
impact_strength = pd.qcut(mean_abs_shap_values, q=3, labels=["Low", "Medium", "High"])

# Create a DataFrame to summarize the SHAP values for each feature
shap_summary_df = pd.DataFrame({
'Feature': X.columns,  # Features in the dataset
'Imp. %': np.round(shap_to_probability_delta(mean_abs_shap_values)*100, 1),  # Impact percentage for each feature
'Imp. Direct.': direction,  # Impact direction based on mean SHAP values
'Imp. Strength': impact_strength  # Categorized impact strength (Low, Medium, High)
}).sort_values(by=['Imp. Direct.', 'Imp. %'], ascending=False).reset_index(drop=True)

# Save the SHAP summary to Excel for further inspection
shap_summary_df.to_excel('be_res_shap_summary_fnn.xlsx', index=False)

# Display the summary DataFrame
shap_summary_df


Unnamed: 0,Feature,Imp. %,Imp. Direct.,Imp. Strength
0,reason_try,1.1,Inc.,Low
1,reason_competition,1.1,Inc.,Low
2,reason_premium,1.0,Inc.,Low
3,reason_superiority,0.5,Inc.,Low
4,brandful,2.4,Dec.,High
5,inductively_brand_aware,1.9,Dec.,High
6,brand_quality,1.9,Dec.,High
7,after_purchase_satisfiedness,1.9,Dec.,High
8,recommend,1.9,Dec.,Medium
9,spontaneously_brand_aware,1.8,Dec.,Medium
