# Data Handle, Model Train and Visualization

In [2]:
import sys  
sys.path.insert(1, '/Users/asifahmed/Documents/Codes/MyRecourseProject')

from models.model_trainer import ModelTrainer
from evaluation.evaluator import Evaluator
from visualization.visualizer import Visualizer
from data_handling.dataset import Dataset

data_instance = Dataset(target_column='y')
data_instance.load_csv('/Users/asifahmed/Documents/Codes/MyRecourseProject/notebooks/synthetic_data_with_bias.csv')
data_instance.encode_categorical_columns()
sensitive_features = data_instance.data['Gender']

# data_instance.remove_outliers()
# data_instance.balanced_sample(500)
data_instance.select_features(['y', 'X1', 'X2'])

trainer = ModelTrainer()
X_train, X_test, y_train, y_test = trainer.split_data(data_instance.data, target_column='y')
X_train_scaled, X_test_scaled = trainer.scale_features(X_train, X_test)

CSV file loaded successfully with delimiter: ','
Selected features are now active: ['y', 'X1', 'X2']
Data split into train and test sets.


In [3]:
model = trainer.train('logistic_regression', X_train_scaled, y_train)

evaluator = Evaluator(model, X_test_scaled, y_test)
evaluator.report()

gender_test = sensitive_features[X_test.index]
y_pred = model.predict(X_test_scaled)
false_negatives = (y_test == 1) & (y_pred == 0)
false_negative_counts = gender_test[false_negatives].value_counts()
print("False Negative Counts by Gender:")
print(false_negative_counts)

misclassified = (y_test) != (y_pred)
sensitive_features_test = sensitive_features.loc[X_test.index]
X_misclassified = X_test_scaled[misclassified]  
y_misclassified = y_test[misclassified]  
gender_misclassified = sensitive_features_test[misclassified]

visualizer = Visualizer(model, X_misclassified, y_misclassified, 
                        X_original=X_test[misclassified],
                        sensitive_attr=gender_misclassified)
visualizer.plot_decision_boundary_interactive(('X1', 'X2'), 'Logistic Regression Plot')

logistic_regression model trained successfully.
Accuracy: 0.765
Precision: 0.74
Recall: 0.7789473684210526
F1 Score: 0.7589743589743589
Confusion Matrix:
 [[79 26]
 [21 74]]
Classification Report:
               precision    recall  f1-score   support

           0       0.79      0.75      0.77       105
           1       0.74      0.78      0.76        95

    accuracy                           0.77       200
   macro avg       0.77      0.77      0.76       200
weighted avg       0.77      0.77      0.77       200

False Negative Counts by Gender:
Gender
2    14
1     7
Name: count, dtype: int64


In [None]:
gender_test = sensitive_features[X_test.index]

y_pred = model.predict(X_test_scaled)

# Identify false negatives (actual is 1, predicted is 0)
false_negatives = (y_test == 1) & (y_pred == 0)

# Count the number of false negatives for each gender
false_negative_counts = gender_test[false_negatives].value_counts()

# Print the results
print("False Negative Counts by Gender:")
print(false_negative_counts)

In [None]:
y_pred = model.predict(X_test_scaled)

misclassified = (y_test) != (y_pred)

sensitive_features_test = sensitive_features.loc[X_test.index]

X_false_negatives = X_test_scaled[misclassified]  
y_false_negatives = y_test[misclassified]  
gender_false_negatives = sensitive_features_test[misclassified]

visualizer = Visualizer(model, X_false_negatives, y_false_negatives, 
                        X_original=X_test[misclassified],
                        sensitive_attr=gender_false_negatives)
visualizer.plot_decision_boundary_interactive(('X1', 'X2'))

# Recourse Calculation

In [None]:
gender = X_test['Gender']  

recourse_calculator = RecourseCostCalculator(model)

group_recourse_costs = recourse_calculator.calculate_group_recourse_costs(
    X_test_scaled,  # Can be a DataFrame or numpy array
    gender           # Can be a Series or numpy array
)

for group, costs in group_recourse_costs.items():
    print(f"Group {group} Recourse Costs: Mean = {np.mean(costs)}, Median = {np.median(costs)}")

print(recourse_calculator.calculate_recourse_costs(X_test_scaled))

In [None]:
# Calculate distance for a specific test point
test_point = X_test_scaled[misclassified_indices[2]]  # Example test point
print(X_test.iloc[misclassified_indices[2]])
distance = trainer.distance_to_hyperplane(test_point)
print("Distance to the decision boundary:", distance)

In [None]:
# Prediction and identification of misclassified points
y_pred = model.predict(X_test_scaled)
misclassified = y_test != y_pred
misclassified_indices = np.where(misclassified)[0]

# Filter misclassified samples
X_misclassified_scaled = X_test_scaled[misclassified_indices]
y_misclassified = y_test.iloc[misclassified_indices]

# Find indices of class 0 misclassified points within the misclassified array
class_0_misclassified_indices = np.where(y_misclassified == 0)[0]  # This will be relative to y_misclassified

# Now use these relative indices to select from X_misclassified_scaled
X_class_0_misclassified_scaled = X_misclassified_scaled[class_0_misclassified_indices]

# Calculate distances to the decision boundary for class 0 misclassified points
distances = []
for idx in class_0_misclassified_indices:
    original_index = misclassified_indices[idx]  # Get the original index of the test sample
    distance = trainer.distance_to_hyperplane(X_test_scaled[original_index])
    distances.append(distance)
    print(f"Distance for test sample index {original_index}: {distance}")

# Calculate the average distance
if distances:
    average_distance = np.mean(distances)
    print(f"Average distance to the decision boundary for class 0 misclassified points: {average_distance}")
else:
    print("No class 0 misclassified points to calculate distance.")

# Dice and other Explainer

In [None]:
# Generate explanations using DICE
continuous_features = ['A2', 'A14']
explainer = get_explainer('dice', model=model, data=data_instance.data,
                          continuous_features=continuous_features,
                          outcome_name='A15')
query_instance = trainer.X_test.iloc[0:5]  # Selecting the first test instance
counterfactuals = explainer.generate_explanation(query_instance, total_CFs=5)

# Automatic Numerical Columns Finding from real dataset

In [None]:
import sys  
sys.path.insert(1, '/Users/asifahmed/Documents/Codes/MyRecourseProject')

import pandas as pd
import itertools
from models.model_trainer import ModelTrainer
from data_handling.dataset import Dataset
from evaluation.evaluator import Evaluator

def is_numerical(data, column, target_column, threshold=20):
    """
    Consider a column numerical if it's not the target and has more than `threshold` unique values.
    """
    return data[column].nunique() > threshold and column != target_column

def automated_evaluation(file_path, target_column, model_type='svm', threshold=0.6, sample_size=300):
    # Initialize the model trainer
    trainer = ModelTrainer()

    dataset_name = file_path.split("/")[-1]
     # Initialize results list
    passed_pairs = []

    # Get all truly numerical columns from the original data
    original_data_instance = Dataset(target_column=target_column)
    original_data_instance.load_csv(file_path=file_path)
    original_data_instance.encode_categorical_columns()
    # original_data_instance.remove_outliers()
    original_data_instance.balanced_sample(sample_size)

    numerical_columns = [col for col in original_data_instance.data.columns if is_numerical(original_data_instance.data, col, target_column)]

    # Iterate over all pairs of numerical features
    for feature1, feature2 in itertools.combinations(numerical_columns, 2):
        # Reload data instance for each pair
        data_instance = Dataset(target_column=target_column)
        data_instance.load_csv(file_path)
        data_instance.encode_categorical_columns()
        # data_instance.remove_outliers()
        data_instance.balanced_sample(sample_size)

        # Select features and ensure they are present in the dataset
        data_instance.select_features([feature1, feature2, target_column])
        
        # if data_instance.data.empty:
        #     continue
        
        # Split and scale the data
        X_train, X_test, y_train, y_test = trainer.split_data(data_instance.data, target_column=target_column)

        if X_train.empty or X_test.empty:
            continue

        X_train_scaled, X_test_scaled = trainer.scale_features(X_train, X_test)
        
        # Train the model
        model = trainer.train(model_type, X_train_scaled, y_train)
        
        # Create an evaluator with the trained model and test data
        evaluator = Evaluator(model, X_test_scaled, y_test)
        
        # Obtain metrics using the Evaluator methods
        metrics = evaluator.get_evaluation_metrics()

        # Print necessary information if the threshold is met
        if metrics['Accuracy'] and metrics['Accuracy'] >= threshold:
            print(f"\033[1;32mPassed: {feature1}, {feature2} with accuracy {metrics['Accuracy']:.2f}\033[0m")
            evaluator.report()
            passed_pairs.append({
                'Dataset': dataset_name,
                'Feature1': feature1, 
                'Feature2': feature2,
                'Accuracy': metrics['Accuracy'],
                'Precision': metrics['Precision'], 
                'Recall': metrics['Recall'], 
                'F1 Score': metrics['F1 Score'],
                'Confusion Matrix': metrics['Confusion Matrix'].tolist(),
                'Classification Report': metrics['Classification Report']

                
            })
        else:
            # Print necessary information for all pairs
            print(f"Failed: {feature1}, {feature2} with accuracy {metrics['Accuracy']:.2f}")

        # Add space between iterations for readability
        print("\n" + "-" * 50 + "\n")

# Save passed pairs information to a text file]
    print("Saving passed pairs information to 'evaluation_results.txt'...")
    with open('evaluation_results.txt', 'w') as file:
        for pair in passed_pairs:
            file.write(f"Dataset: {pair['Dataset']}\n")
            file.write(f"Features: {pair['Feature1']}, {pair['Feature2']}\n")
            file.write(f"Accuracy: {pair['Accuracy']:.2f}\n")
            file.write(f"Precision: {pair['Precision']:.2f}\n")
            file.write(f"Recall: {pair['Recall']:.2f}\n")
            file.write(f"F1 Score: {pair['F1 Score']:.2f}\n")
            file.write(f"Confusion Matrix: {pair['Confusion Matrix']}\n")
            file.write("Classification Report:\n")
            file.write(f"{pair['Classification Report']}\n")
            file.write("-" * 50 + "\n")

automated_evaluation('/Users/asifahmed/Documents/Codes/MyRecourseProject/datasets/processed/credit_processed.csv', 
                     target_column='NoDefaultNextMonth',
                     threshold=.65,
                     sample_size=1000)

# Synthetic Data Generation

In [None]:
# Synthetic Data Generation Code

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from scipy.stats import chi2_contingency

def generate_synthetic_data(n_samples=1000, seed=None):
    if seed is not None:
        np.random.seed(seed)
    
    genders = np.random.choice([1, 2], size=n_samples, p=[0.6, 0.4])
    
    X1 = np.zeros(n_samples)
    X2 = np.zeros(n_samples)
    
    for i in range(n_samples):
        if genders[i] == 1:
            X1[i] = np.random.normal(55, 10)
            X2[i] = np.random.normal(65, 15)
        else:
            X1[i] = np.random.normal(45, 12)
            X2[i] = np.random.normal(55, 15)
    

    X1 = np.round(X1).astype(int)
    X2 = np.round(X2).astype(int)

    noise = np.where(genders == 1, 
                     np.random.normal(0, 5, n_samples),
                     np.random.normal(0, 15, n_samples))
    noise = np.round(noise).astype(int)

    gender_effect = np.where(genders == 1, .1, -.08)
    
    decision_boundary = 0.5 * X1 + 0.3 * X2 + 0.2 * gender_effect + noise
    
    y = (decision_boundary > np.median(decision_boundary)).astype(int)

    # y = np.array([1 if (decision_boundary[i] + (5 if genders[i] == 2 else 0)) 
    #               > np.median(decision_boundary)
    #               else 0 for i in range(n_samples)])
    
    df = pd.DataFrame({
        'X1': X1,
        'X2': X2,
        'Gender': genders,
        'y': y
    })
    
    # print(df.sample(10))
    
    return df

def perform_chi_square_tests(data):
    data['Gender'] = pd.get_dummies(data['Gender'], drop_first=True)
    
    for column in data.columns:
        if column != 'y':
            # Create a contingency table
            contingency_table = pd.crosstab(data[column], data['y'])
            chi2, p, dof, expected = chi2_contingency(contingency_table)
            print(f"Chi-squared test for {column}:")
            print(f"  Chi2 Statistic: {chi2:.2f}, P-value: {p:.3f}\n")

data = generate_synthetic_data()

X = data[['X1', 'X2', 'Gender']]
y = data['y']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

model = LogisticRegression()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy:.2f}')

data.to_csv('synthetic_data_with_bias.csv', index=False)
print("Synthetic data with bias generated and saved to 'synthetic_data_with_bias.csv'.")

Accuracy: 0.73
Synthetic data with bias generated and saved to 'synthetic_data_with_bias.csv'.


# Miscellaneous

In [None]:
model2 = trainer.train('svm', X_train_scaled, y_train)

evaluator2 = Evaluator(model2, X_test_scaled, y_test)
evaluator2.report()

gender_test = sensitive_features[X_test.index]
y_pred = model2.predict(X_test_scaled)
false_negatives = (y_test == 1) & (y_pred == 0)
false_negative_counts = gender_test[false_negatives].value_counts()
print("False Negative Counts by Gender:")
print(false_negative_counts)

misclassified = (y_test) != (y_pred)
sensitive_features_test = sensitive_features.loc[X_test.index]
X_misclassified = X_test_scaled[misclassified]  
y_misclassified = y_test[misclassified]  
gender_misclassified = sensitive_features_test[misclassified]

visualizer2 = Visualizer(model2, X_misclassified, y_misclassified, 
                        X_original=X_test[misclassified],
                        sensitive_attr=gender_misclassified)
visualizer2.plot_decision_boundary_interactive(('X1', 'X2'), 'SVM Plot')

In [None]:
model3 = trainer.train('naive_bayes', X_train_scaled, y_train)

evaluator3 = Evaluator(model3, X_test_scaled, y_test)
evaluator3.report()

gender_test = sensitive_features[X_test.index]
y_pred = model3.predict(X_test_scaled)
false_negatives = (y_test == 1) & (y_pred == 0)
false_negative_counts = gender_test[false_negatives].value_counts()
print("False Negative Counts by Gender:")
print(false_negative_counts)

misclassified = (y_test) != (y_pred)
sensitive_features_test = sensitive_features.loc[X_test.index]
X_misclassified = X_test_scaled[misclassified]  
y_misclassified = y_test[misclassified]  
gender_misclassified = sensitive_features_test[misclassified]

visualizer3 = Visualizer(model3, X_misclassified, y_misclassified, 
                        X_original=X_test[misclassified],
                        sensitive_attr=gender_misclassified)
visualizer3.plot_decision_boundary_interactive(('X1', 'X2'), 'Naive Bayes Plot')