# Disseration Experiment 
# XAI Metrics - Function Implementations
Ciaran Finnegan November 2023

## IDENTITY

### Pseudocode

- Start with first instance in test data
- Search all other instances in test data and calculate distance from first instance (feature distance)
- Select closest other instance to first instance, i
- Generate explanations for all instances in test data
- Calculate distance of first instance explanations from explanations in all other instances
- Select closest other instance to first instance (explanation distance), t
- Generate success if instance id (i) = instance id (t)
- Drop first instance from test data

### Implementation

In [1]:
from scipy.spatial import distance

In [2]:
def get_identity_metric(features_df, xai_values_df, XAI_Type):
    """
    For each instance in the feature dataframe, this function identifies the closest instance 
    based on Euclidean distance. It then does the same for the corresponding XAI Explainor values. 
    The function checks if the closest instances for both features and XAI Explainor values match.
    
    Returns:
        Percentage of instances where the closest feature and XAI Explainor value instances match.
    """

    # Initialize match count to zero
    match_count = 0
    
    # Loop through each instance in the feature dataframe
    for idx, instance in features_df.iterrows():
        # Compute the Euclidean distance between the current instance and all other instances
        feature_distances = features_df.drop(index=idx).apply(lambda row: distance.euclidean(row, instance), axis=1)
        
        # Identify the index of the closest instance
        closest_feature_idx = feature_distances.idxmin()
        
        # Repeat the process for XAI Explanations
        xai_instance = xai_values_df.loc[idx]
        xai_distances = xai_values_df.drop(index=idx).apply(lambda row: distance.euclidean(row, xai_instance), axis=1)
        closest_xai_idx = xai_distances.idxmin()
        
        # Check if the closest instances for both features and XAI Explanations match
        if closest_feature_idx == closest_xai_idx:
            match_count += 1
        
        # Print the distances for debugging purposes
        print(f"Instance {idx}:   Current matches: {match_count}")
        print(f"\tClosest feature instance: {closest_feature_idx} (Distance: {feature_distances[closest_feature_idx]:.4f})")
        print(f"\tClosest "+ XAI_Type + " instance: {closest_xai_idx} (Distance: {xai_distances[closest_xai_idx]:.4f})")

    # Compute the matching percentage
    percentage = (match_count / len(features_df)) * 100
    print(f"\n\nThis is the function in XAI_METRICS_FUNCTIONS -- IDENTITY for " + XAI_Type + "\n")
    print(f"\n\nPercentage of matches: {percentage:.2f}%   {match_count} Matches of {len(features_df)} Entries")
    
    return percentage

## STABILITY

### Pseudocode

"Stability" - this metric states that instances belonging to the same class must have comparable explanations

- Assume that the dataset has been balanced 50:50 for fraud/non-fraud.
- Cluster explanations of all instances in test data by k-means, include the 'predicted fraud' label.
- Number of clusters equals label values, in this case two (fraud/non-fraud)
- For each instance in test data
	- compare explanation cluster label to predicted class label
	- if match, then stability satisfied
	
	alternatively
	
	- compare explanation cluster label in largest cluster to predicted class label
	- Take ratio of majority predicted class label to minority class as the stability measure (the higher the value the closer the explanation clusters map to 
	predicted results).	
	
Question: how do we know which explanations cluster equates to 'fraud' and which cluster equates to 'non-fraud'? If dataset is a 50:50 label split and we use 
two clusters then we can just pick one cluster (use the largest).


The training data is balanced but the Test data is not. 
The majority class in the Test data will be non-Fraud, so assume that is 
always the largest cluster.

### Implementation

In [3]:
def get_stability_metric_y(xai_values_df, y_test, largest_label, XAI_Type):
    """
    This function performs the following steps:
    1. Clusters the XAI Explainor values into two clusters using the k-means algorithm.
    2. Assigns the actual target value from the test dataset to each instance in the XAI Explainor values dataframe.
    3. Calculates the percentage of rows where the target class '0' matches the cluster value '0'.
    4. Outputs the final dataframe with cluster assignments and actual target values to a CSV file.
    
    Returns:
        Percentage of instances where target class matches cluster value.
    """
    
    # Cluster the XAI Explainor values into two clusters
    kmeans = KMeans(n_clusters=2, random_state=42).fit(xai_values_df)
    
    # Get the cluster labels
    cluster_labels = kmeans.labels_
    
    # Create a new dataframe with an additional column indicating the cluster assignment
    clustered_df = xai_values_df.copy()
    clustered_df['Cluster'] = cluster_labels
    
    # Display Cluser before label change
    # print('\nDisplay Cluster before label change')
    # print(clustered_df)
    
    
    ######################
    
    # Rename clusters so that the largest cluster is always labeled '0'
    # if sum(cluster_labels) > len(cluster_labels) / 2:
    #    clustered_df['Cluster'] = clustered_df['Cluster'].map({0: '1', 1: '0'})
    
    # 1. Determine the majority and minority clusters
    cluster_counts = clustered_df['Cluster'].value_counts()
    majority_cluster = cluster_counts.idxmax()
    minority_cluster = 1 if majority_cluster == 0 else 0

    # 2. Assign largest_label to entries in the majority cluster
    #clustered_df['label'] = clustered_df['Cluster'].apply(lambda x: largest_label if x == majority_cluster else None)
    clustered_df['Cluster_adj'] = clustered_df['Cluster'].apply(lambda x: str(largest_label) if x == majority_cluster else None)

    # 3. Reverse the label in the minority cluster and convert to string
    minority_label = str(1 - largest_label)
    #clustered_df.loc[clustered_df['Cluster'] == minority_cluster, 'label'] = 1 - largest_label
    clustered_df.loc[clustered_df['Cluster'] == minority_cluster, 'Cluster_adj'] = minority_label
    
    
    ###################
    
    
    # Display Cluser after label change
    # print('\nDisplay Cluster after label change')
    # print(clustered_df)        
        
    
    # Print the number of instances assigned to each cluster
    # cluster_0_count = clustered_df[clustered_df['Cluster'] == '0'].shape[0]
    # cluster_1_count = clustered_df[clustered_df['Cluster'] == '1'].shape[0]
    
    cluster_0_count = clustered_df[clustered_df['Cluster_adj'] == '0'].shape[0]
    cluster_1_count = clustered_df[clustered_df['Cluster_adj'] == '1'].shape[0]
    print(f"Number of Instances in Cluster '0': {cluster_0_count}")
    print(f"Number of Instances in Cluster '1': {cluster_1_count}")
    
    # Assign the appropriate subset of y_test values to the dataframe based on the selected indices
    clustered_df['Actual'] = y_test.loc[clustered_df.index].values
    
    #######################
    
    # Display Cluser after 'Actual' value added
    # print('\nDisplay Cluster after -Actual- value added... (swithced off for now')
    #print(clustered_df)        
    
    #######################
    
    
    # Calculate the percentage of rows where the target class '0' matches the cluster value '0'
    # matches_0 = clustered_df[(clustered_df['Cluster'] == '0') & (clustered_df['Actual'] == 0)].shape[0]
    # total_class_0 = clustered_df[clustered_df['Actual'] == 0].shape[0]
    
    # Calculate the percentage of rows where the target class '1' matches the cluster value '1'
    # matches_1 = clustered_df[(clustered_df['Cluster'] == '1') & (clustered_df['Actual'] == 1)].shape[0]
    # total_class_1 = clustered_df[clustered_df['Actual'] == 1].shape[0]
    
    
    # Calculate the percentage of rows where the target class '0' matches the cluster value '0'
    matches_0 = clustered_df[(clustered_df['Cluster_adj'] == '0') & (clustered_df['Actual'] == 0)].shape[0]
    total_class_0 = clustered_df[clustered_df['Actual'] == 0].shape[0]
    
    # Calculate the percentage of rows where the target class '1' matches the cluster value '1'
    matches_1 = clustered_df[(clustered_df['Cluster_adj'] == '1') & (clustered_df['Actual'] == 1)].shape[0]
    total_class_1 = clustered_df[clustered_df['Actual'] == 1].shape[0]
    
    
    # Print the results for class '0'
    print(f"\nFor Class '0':")
    print(f"Total Instances: total_class_0: {total_class_0}")
    print(f"Matching Cluster '0' Instances (matches_0): {matches_0}")
    
    # Print the results for class '1'
    print(f"\nFor Class '1':")
    print(f"Total Instances: total_class_1: {total_class_1}")
    print(f"Matching Cluster '1' Instances (matches_1): {matches_1}")
    
    # Output the final dataframe to a CSV file
    #clustered_df.to_csv('clustered_stability.csv', index=True)
    #print("\nOutput saved to 'clustered_stability.csv'")
    
    # Compute the matching percentage
    #percentage = (matches_0 / total_class_0) * 100
    
    iOverallTotal = total_class_1 + total_class_0
    total_matches = (matches_0 + matches_1)
    percentage = ((total_matches) / iOverallTotal) * 100
    
    print(f"\n\nThis is the function in XAI_METRICS_FUNCTIONS -- STABILITY -- " + XAI_Type + "\n")
    print(f"\n\nPercentage of matches: {percentage:.2f}% : {total_matches} Matches of {iOverallTotal} Entries")
    
    return percentage

## SEPERABILITY

### Pseudocode

"Seperability" - two dissimilar instances muat have dissimilar explanations

Take subset of test data and determine for each individual instance the number of duplicate
explanations in entire subset, if any.

To measure the separability metric, we choose a subset S of the testing data set that has no duplicates and get
their explanations. Then for every instance s in S, we compare its explanation with all other explanations
of instances in S and if such explanation has no duplicate then it satisfies the separability metric.



 - Choose subset S of test data
		
		-  ensure no duplicate instances exist. This is a comparison of features, 
		   as no explanations have been generated yet.
		-  remove any instances with duplicated features
		-  generate explanations for each remaining instance in the subset of test data
		
 - For every instance in S
 
		- compare explanations with all other instance explanations
		- if no duplicates are found; mark instance as 'success'

### Implementation

Create a function to check cell values for levels of difference across XAI results data rows

In [4]:
# Define a function to calculate row similarity
def is_similar_old(row1, row2, threshold=0.85):
    similarity = (row1 == row2).mean()
    print(f'XAI row similarity is: {similarity}')
        
    return similarity >= threshold

In [5]:
def is_similar(row1, row2, threshold=0.51, tolerance=0.35):  #threshold=0.85, tolerance=0.80
    
    # Function to calculate similarity between two elements
    def is_element_similar(element1, element2, tolerance):
        if element1 == 0 and element2 == 0:
            return True  # Avoid division by zero
        relative_difference = abs(element1 - element2) / max(abs(element1), abs(element2))
        #print(f'XAI relative difference (new function) is: {relative_difference}')
        #print(f'Return value from is_element_similar(): {(relative_difference <= (1 - tolerance))}')
        
        return relative_difference <= (1 - tolerance)

    # Calculate similarity for each element pair
    similarities = [is_element_similar(e1, e2, tolerance) for e1, e2 in zip(row1, row2)]

    # Calculate overall row similarity
    similarity = sum(similarities) / len(similarities)
    #print('\n\n###############')
    #print(f'XAI Row similarity (new function) is: {similarity}')
    #print(f'Return value for row from is_similar(): {(similarity >= threshold)}')
    #print('###############\n\n')
    
    return similarity >= threshold

In [6]:
def get_seperability_metric(features_df, xai_values_df, XAI_Type, threshold=0.51, tolerance=0.35):
    # Ensure that features_df and xai_values_df have the same number of rows
    if len(features_df) != len(xai_values_df):
        raise ValueError("Seperability Metric: The two dataframes must have the same number of rows")
        
    print(f'1:Before Dup check:Lenght of features df: {len(features_df)}')    
    print(f'1:Before Dup check:Lenght of xai_values_df: {len(xai_values_df)}\n\n')    

    # Remove duplicate rows in features_df and corresponding rows in xai_values_df
    features_df_no_duplicates = features_df.drop_duplicates(keep='first')
    xai_values_df = xai_values_df.loc[features_df_no_duplicates.index]
    
    print(f'2:After Dup check:Lenght of features df: {len(features_df)}')    
    print(f'2:After Dup check:Lenght of xai_values_df: {len(xai_values_df)}')    
    
    

    # Initialize counters
    iSeperation_success = 0
    iSeperation_failure = 0


    # Iterate over each row in features_df
    for index, _ in features_df_no_duplicates.iterrows():
        # Get the corresponding row in xai_values_df
        xai_row = xai_values_df.loc[index]
        
        # Debug for Feature row
        # print(f'\nFeature Index is: {index}')

        # Check for similarity with other rows in xai_values_df
        similarity_count = sum(is_similar(xai_row, 
                                          other_row,
                                          threshold, 
                                          tolerance) for idx, other_row in xai_values_df.iterrows() if idx != index)

        # Check if similarity_count is zero (no similar rows found)
        if similarity_count == 0:
            iSeperation_success += 1
        else:
            iSeperation_failure += 1
            print('Failure')

    # Calculate the percentage of separation success
    separation_percentage = (iSeperation_success / len(features_df_no_duplicates)) * 100
    
    print(f'Result: {iSeperation_success} entries are shown to have dissimilar explanations in data block of {(len(features_df_no_duplicates))} rows')
    
    display_text("The Y Seperability Metric Score for " + XAI_Type + ": " + str(separation_percentage))

    return separation_percentage

## SIMILARITY

### Pseudocode

This metric states that the more similar the instances to be explained, the closer their explanations should be and vice versa.

To measure the similarity metric, we cluster instances in the testing data set, after
normalization using DBSCAN algorithm. For each framework, we normalize the explanations
and calculate the mean pairwise Euclidean distances between explanations of testing instances
in the same cluster. The framework with the smallest mean pairwise Euclidean distances across
its clusters is the best reflecting the similarity metric.

 - Pass instances and their respective explanations to a function

 - Normalise instances in the test data(DBSCAN)
 
 - Cluster instances in test data into clusters (Note:- not just two clusters, could be more)
 
 - Group the explanations based on the cluster to which their associated instance has been assigned
 
 - Calculate mean pairwise Euclidean distance between explanations in each of the  groups (Note:- not just two groups, could be more)
 
 - Calculate the average of the two distance values just generated

### Implementation

In [7]:
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, DBSCAN
from scipy.spatial.distance import pdist, squareform
from sklearn.metrics import davies_bouldin_score

In [8]:
import pandas as pd

def determine_optimal_clusters_davies_bouldin(features_df: pd.DataFrame, max_clusters: int = 10) -> int:
    """
    Determines the optimal number of clusters for a given DataFrame using the Davies-Bouldin Index.

    Parameters:
    features_df (pd.DataFrame): The DataFrame containing the features for clustering.
    max_clusters (int): The maximum number of clusters to consider.

    Returns:
    int: The optimal number of clusters.
    """

    best_db_index = float('inf')
    best_k = 2

    for k in range(2, max_clusters):
        kmeans = KMeans(n_clusters=k, init='k-means++', random_state=42)
        cluster_labels = kmeans.fit_predict(features_df)

        db_index = davies_bouldin_score(features_df, cluster_labels)
        if db_index < best_db_index:
            best_db_index = db_index
            best_k = k

    return best_k

# Example usage:
# optimal_clusters = determine_optimal_clusters_davies_bouldin(features_df)
# print(f'Optimal number of clusters: {optimal_clusters}')

In [9]:
def get_similarity_metric(features_df, xai_values_df, XAI_Type, use_dbscan=True, eps=0.5, min_samples=5):
    
    # Ensure that features_df and xai_values_df have the same number of rows
    if len(features_df) != len(xai_values_df):
        raise ValueError("Similarity Metric: The two dataframes must have the same number of rows")
    
    # Step 1: Normalize features_df using DBSCAN for outlier detection and handling
    # Check if features_df is empty
    if features_df.empty:
        raise ValueError("Input features_df is empty.")

    # Optional DBSCAN for outlier detection
    if use_dbscan:
        dbscan = DBSCAN(eps=eps, min_samples=min_samples)
        labels = dbscan.fit_predict(features_df)
        features_df = features_df[labels != -1]
        xai_values_df = xai_values_df[labels != -1]

        # Check if features_df is empty after DBSCAN
        if features_df.empty:
            raise ValueError("All instances were filtered out as outliers by DBSCAN.")

    # Normalizing the features 
    # This is already done as part of pre=processing feature inputs?
    #scaler = StandardScaler()
    #normalized_features = scaler.fit_transform(features_df)
    
    # Option code to look at optimising cluster values
    optimal_clusters = determine_optimal_clusters_davies_bouldin(features_df)
    print(f'\nOptimal number of clusters: {optimal_clusters}')

    # Step 2: Cluster normalized data using K-Means
    # Here, we assume a fixed number of clusters or use a heuristic
    # This can be determined using methods like the elbow method
    #n_clusters = 3
    n_clusters = optimal_clusters
    kmeans = KMeans(n_clusters=n_clusters)
    clusters = kmeans.fit_predict(features_df)
    #clusters = kmeans.fit_predict(normalized_features)
                         
    # Show clusters...
    # print(f'\nClusters... (after kmeans fit): {clusters}')          
                         

    # Step 3: Group explanations in xai_values_df based on clusters
    grouped_explanations = [xai_values_df[clusters == k] for k in range(n_clusters)]
                         
    # Show Grouped Explanations
    # print(f'Grouped Explnations (grouped_explanations): {grouped_explanations}')      
                         

    # Step 4: Calculate mean pairwise Euclidean distance for each group
    group_distances = [np.mean(squareform(pdist(group))) for group in grouped_explanations]

    # Step 5: Calculate the overall average distance
    average_distance = np.mean(group_distances)

    
    similarity_measure = average_distance
    
    display_text("The Similarity Metric Score for " + XAI_Type + ": " + str(similarity_measure))
    
    
    return similarity_measure

## COMPUTATIONAL EFFICIENCY

### Decorator Function

To measure the execution time of the Python functions generating the XAI values, a decorator has been created that wraps around the XAI function. 

A decorator is a higher-order function in Python that allows you to extend or modify the behavior of other functions.

### Implementation

In [10]:
import time
from functools import wraps

In [11]:
def timeit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Function {func.__name__!r} executed in {execution_time:.4f} seconds")
        return result, execution_time
    return wrapper

In [12]:
# Example usage:
@timeit
def example_function():
    # Some time-consuming task
    time.sleep(2)  # Simulating a task taking 2 seconds
    return "Result"

In [13]:
# When calling the function, you get both the result and the execution time
result, exec_time = example_function()
print(f"Result: {result}, Execution Time: {exec_time} seconds")

Function 'example_function' executed in 2.0021 seconds
Result: Result, Execution Time: 2.002138137817383 seconds


## Test Utilities

### Check Euclidean distance

In [14]:
def get_euclidean_distance(instance1, instance2):
    """
    Compute the Euclidean distance between two instances, considering only numerical columns.
    
    Parameters:
    - instance1 (pd.Series): The first instance.
    - instance2 (pd.Series): The second instance.
    
    Returns:
    - float: The Euclidean distance between the two instances.
    """
    # Filter out non-numerical columns
    instance1_numeric = instance1[instance1.apply(lambda x: np.isreal(x) and not isinstance(x, bool))]
    instance2_numeric = instance2[instance2.apply(lambda x: np.isreal(x) and not isinstance(x, bool))]
    
    print("\nCalculating Euclidean distance...\n\n")
    
    return np.sqrt(np.sum((instance1_numeric - instance2_numeric) ** 2))