I wanted to analyse how feedback effect impact the outcome of a model.
A feedback loop is, here, defined as the "contamination" of training data with past model prediction. 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import precision_recall_curve
from sklearn.linear_model import LogisticRegression

import shap

from helpers import *

In [None]:
# generate data using sklearn classification utilities

num_features = 10

X, y = generate_data_sklearn(num_features=num_features)

data = pd.DataFrame()
data["y"] = y
data[[f"feature {i}" for i in range(X.shape[1])]] = X

for col in data.columns:
    if col != "y":
        plt.hist(data[col], label=col, alpha=0.4)
plt.legend()
plt.show()

In [None]:
clf = LogisticRegression().fit(X, y)
clf_pred = clf.predict(X)

print("label 1 count = ", clf_pred.sum())
print("false_positive = ",  ((clf_pred == 1) & (y == 0)).sum().item())
print("false_negative = ", ((clf_pred == 0) & (y == 1)).sum().item())
print("f1 score = ", f1_score(y_true=y, y_pred=clf_pred))
print("accuracy = ", accuracy_score(y_true=y, y_pred=clf_pred))

In [None]:
X_train_tensor = torch.tensor(X, dtype=torch.float32)
y_train_tensor = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

early_stopping(model, X_train_tensor, y_train_tensor, pos_weight=0.9/0.1)

outputs = model(X_train_tensor).round()

print("label 1 count = ", outputs.sum())
print("false_positive = ",  ((outputs == 1) & (y_train_tensor == 0)).sum().item())
print("false_negative = ", ((outputs == 0) & (y_train_tensor == 1)).sum().item())
print("f1 score = ", f1_score(y_true=y_train_tensor.detach().numpy(), y_pred=outputs.detach().numpy()))
print("accuracy = ", accuracy_score(y_true=y_train_tensor.detach().numpy(), y_pred=outputs.detach().numpy()))

In [None]:
def feedback_loop(model_type, X, y, iterations=20, retrain_epochs=100, ground_truth_prop_start_end=[1., 0.05]):
    
    # initilize model
    if model_type == "nn":
        model = SimpleNN()
    else:
        model = LogisticRegression()
    
    # Define the number of samples per iteration
    train_size = X.shape[0] // iterations
        
    # Lists for tracking accuracy
    acc = []
    acc_truth = []
    
    false_positive = []
    false_positive_truth = []
    
    false_negative = []
    false_negative_truth = []
    
    f1 = []
    f1_truth = []
    
    shap_values_over_time = []
    
    confidence = []
    
    # Define linear decreasing proportion of ground truth labels
    coeff = (ground_truth_prop_start_end[0] - ground_truth_prop_start_end[1]) / (iterations-1)
    ground_truth_prop = ground_truth_prop_start_end[0] - coeff * np.arange(iterations)
            
    for i in range(iterations):
        
        # Select the subset of data for the current iteration
        X_train = X[train_size * i : train_size * (i + 1)]
        y_truth = y[train_size * i : train_size * (i + 1)] 
        
        if not isinstance(model, LogisticRegression):
            X_train = torch.tensor(X_train, dtype=torch.float32)
            y_truth = torch.tensor(y_truth, dtype=torch.float32).unsqueeze(1)
        
        # get y_pseudo label from previous iteration
        if i==0:
            if isinstance(model, LogisticRegression):
                y_pseudo = y_truth.copy()
            else:
                y_pseudo = y_truth.clone().detach()
        else:
            if isinstance(model, LogisticRegression):
                y_pseudo = model.predict(X_train)
            else:
                with torch.no_grad():
                    y_pseudo = model(X_train).round()   
                    
            # Retrain the model using the mix of pseudo- and true labels
            idx_truth = np.random.choice(np.arange(len(y_truth)), size=int(ground_truth_prop[i] * len(y_truth)), replace=False)
            y_pseudo[idx_truth] = y_truth[idx_truth]  # Inject some true labels into pseudo-labels
                
        # weigth the loss in case of inbalance data
        pos_weight = (y_pseudo == 0).sum() / (y_pseudo == 1).sum()    
        
        if isinstance(model, LogisticRegression):
            model.fit(X_train, y_pseudo)
            outputs = model.predict_proba(X_train)
            y_pred = model.predict(X_train)
        
            # Accuracy metrics
            acc.append(accuracy_score(y_pseudo, y_pred))
            acc_truth.append(accuracy_score(y_truth, y_pred))
            
            f1.append(f1_score(y_pseudo, y_pred))
            f1_truth.append(f1_score(y_truth, y_pred))
            
            # False positives and false negatives
            fp_pseudo = ((y_pred == 1) & (y_pseudo == 0)).sum() # Predicted 1, but pseudo-label is 0
            fn_pseudo = ((y_pred == 0) & (y_pseudo == 1)).sum() # Predicted 0, but pseudo-label is 1
            fp_truth = ((y_pred == 1) & (y_truth == 0)).sum()    # Predicted 1, but true label is 0
            fn_truth = ((y_pred == 0) & (y_truth == 1)).sum()    # Predicted 0, but true label is 1
            
            confidence.append(outputs[:,1])  # Apply sigmoid for confidence interpretation
            
        else:
            # Train model with early stopping
            early_stopping(model, X_train, y_pseudo, epochs=retrain_epochs, pos_weight=pos_weight)
            with torch.no_grad():
                outputs = model(X_train)  # Model prediction (logits)
            y_pred = outputs.round()  # Predicted labels after rounding the logits
        
            # Accuracy metrics
            acc.append(accuracy_score(y_pseudo.numpy(), y_pred.numpy()))
            acc_truth.append(accuracy_score(y_truth.numpy(), y_pred.numpy()))
            
            f1.append(f1_score(y_pseudo, y_pred))
            f1_truth.append(f1_score(y_truth, y_pred))
            
            # False positives and false negatives
            fp_pseudo = ((y_pred == 1) & (y_pseudo == 0)).sum().item()  # Predicted 1, but pseudo-label is 0
            fn_pseudo = ((y_pred == 0) & (y_pseudo == 1)).sum().item()  # Predicted 0, but pseudo-label is 1
            fp_truth = ((y_pred == 1) & (y_truth == 0)).sum().item()    # Predicted 1, but true label is 0
            fn_truth = ((y_pred == 0) & (y_truth == 1)).sum().item()    # Predicted 0, but true label is 1
            
            confidence.append(outputs)  # Apply sigmoid for confidence interpretation
        
        false_positive.append(fp_pseudo)
        false_negative.append(fn_pseudo)
        false_positive_truth.append(fp_truth)
        false_negative_truth.append(fn_truth)
        
        # SHAP values
        if X_train.shape[0] > 100:  # Avoid index errors when sampling for SHAP
            X_background = X_train[torch.randint(X_train.shape[0], [100])]  # Smaller sample for SHAP background
        else:
            X_background = X_train  # Use full data if too small for 100 samples
        
        X_shap = X_train[torch.randint(X_train.shape[0], [20])]  # Sample 20 instances for SHAP calculation
        
        if isinstance(model, LogisticRegression):
            explainer = shap.LinearExplainer(model, X_background)
        else:
            explainer = shap.GradientExplainer(model, X_background)
        shap_values = explainer.shap_values(X_shap)
        shap_values_over_time.append(np.mean(np.abs(shap_values), axis=0))
                    
    return acc, acc_truth, f1, f1_truth, false_positive, false_positive_truth, false_negative, false_negative_truth, shap_values_over_time, confidence


In [None]:
# Run the feedback loop simulation for several class balance

weight_list = [0.9, 0.6, 0.5]
acc_list = []
acc_truth_list = []
f1_list = []
f1_truth_list = []
false_positive_list = []
false_negative_list = []
false_negative_truth_list = []
false_positive_truth_list = []
confidence_list = []
shap_list = []

for w in weight_list:
        
        X, y = make_data(weigths=[w, 1-w])
        acc, acc_truth,f1, f1_truth, false_positive, false_positive_truth, false_negative, false_negative_truth, shap_values_over_time, confidence = \
        feedback_loop("linear", X, y, iterations=50, ground_truth_prop_start_end=[1.0, 0.05])
        
        acc_list.append(acc)
        acc_truth_list.append(acc_truth)
        f1_list.append(f1)
        f1_truth_list.append(f1_truth)
        
        false_positive_list.append(false_positive)
        false_negative_list.append(false_negative)
        false_negative_truth_list.append(false_negative_truth)
        false_positive_truth_list.append(false_positive_truth)
        
        confidence_list.append(confidence)
        
        shap_list.append(shap_values_over_time)

In [None]:
# shap value
color1 = "#D4CC47"
color2 = "#7C4D8B"
col_list = plt_help.get_color_gradient(color1, color2, len(weight_list))
col_list = ["red", "blue", "black"]

# Plot the deviations from the ground truth over iterations
for a,at,c,w in zip(acc_list, acc_truth_list, col_list, weight_list):
    plt.plot(a, label=str(w), color=c)
    plt.plot(at, color=c, linestyle="dashed")
    
#plt.plot(acc_truth_list, color="red", linestyle="dashed", label="accuracy truth")
plt.xlabel("Iteration")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

# Plot the deviations from the ground truth over iterations
for a,at,c,w in zip(f1_list, f1_truth_list, col_list, weight_list):
    plt.plot(a, label=str(w), color=c)
    plt.plot(at, color=c, linestyle="dashed")
    
#plt.plot(acc_truth_list, color="red", linestyle="dashed", label="accuracy truth")
plt.xlabel("Iteration")
plt.ylabel("f1 score")
plt.legend()
plt.show()

# Plot the deviations from the ground truth over iterations
for a,at,c,w in zip(false_positive_list, false_positive_truth_list, col_list, weight_list):
    plt.plot(a, label=str(w), color=c)
    plt.plot(at, color=c, linestyle="dashed")
    
#plt.plot(acc_truth_list, color="red", linestyle="dashed", label="accuracy truth")
plt.xlabel("Iteration")
plt.ylabel("False positive count")
plt.legend()
plt.show()

# Plot the deviations from the ground truth over iterations
for a,at,c,w in zip(false_negative_list, false_negative_truth_list, col_list, weight_list):
    plt.plot(a, label=str(w), color=c)
    plt.plot(at, color=c, linestyle="dashed")
    
#plt.plot(acc_truth_list, color="red", linestyle="dashed", label="accuracy truth")
plt.xlabel("Iteration")
plt.ylabel("False negative count")
plt.legend()
plt.show()

In [None]:
# shap value analysis

# confidence
color1 = "#D4CC47"
color2 = "#7C4D8B"
col_list_tmp = plt_help.get_color_gradient(color1, color2, len(confidence_list[0]))

for sh,w in zip(shap_list, weight_list):
    for i in range(len(sh)):
        plt.plot(range(len(sh[i])), sh[i], color=col_list_tmp[i])
    plt.xlabel("features")
    plt.ylabel("shap value")
    plt.xticks(ticks=range(len(sh[-1])), labels=[f"feature {i}" for i in range(len(sh[-1]))], rotation=70)
    plt.title(f"Class inbalance = {w}")
    plt.show()

In [None]:
# confidence
color1 = "#D4CC47"
color2 = "#7C4D8B"
col_list_tmp = plt_help.get_color_gradient(color1, color2, len(confidence_list[0]))

for conf,w in zip(confidence_list, weight_list):
    for i in range(len(conf)):
        plt.hist(conf[i], color=col_list_tmp[i], bins=10, alpha=0.4)
    plt.title(f"Class inbalance = {w}")
    plt.show()