In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
import pygad
import numpy as np
import pickle
import matplotlib.pyplot as plt
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import export_text

In [None]:
result_folder = 'ResidualProducts'

df = pd.read_csv(f'{result_folder}/ID-Features-Vote.csv')

# Define the bin thresholds
bins = [
    {"name": "bin1", "thresholds": [(0, 25), (26, 50)]},
    {"name": "bin2", "thresholds": [(0, 21), (32, 50)]},
    {"name": "bin3", "thresholds": [(0, 16), (37, 50)]},
    {"name": "bin4", "thresholds": [(0, 11), (42, 50)]}
]

# Initialize a list to store metrics for all bins
metrics_list = []

In [None]:
# Process each bin
for bin_info in bins:
    bin_name = bin_info["name"]
    thresholds = bin_info["thresholds"]
    
    # Binarize the target variable based on thresholds
    bin_data = df.copy()
    bin_data['vote'] = np.where(
        (bin_data['vote'] >= thresholds[0][0]) & (bin_data['vote'] <= thresholds[0][1]), 0,
        np.where(
            (bin_data['vote'] >= thresholds[1][0]) & (bin_data['vote'] <= thresholds[1][1]), 1, np.nan
        )
    )
    
    # Drop rows where the target is NaN (outside the defined bins)
    bin_data = bin_data.dropna(subset=['vote'])
    
    # Remove unnecessary columns
    bin_data = bin_data.drop(columns=['ID', 'PAINTING'])
    
    # Separate features (X) and target (y)
    X = bin_data.drop(columns=['vote'])
    y = bin_data['vote']
    
    # Split the dataset into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # Define the fitness function for the genetic algorithm
    def fitness_function(ga_instance, solution, solution_idx):
        selected_features = np.where(solution == 1)[0]
        if len(selected_features) == 0:
            return 0  # Avoid empty selections
        
        X_train_selected = X_train.iloc[:, selected_features]
        X_test_selected = X_test.iloc[:, selected_features]
        
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train_selected, y_train)
        predictions = model.predict(X_test_selected)
        return accuracy_score(y_test, predictions)
    
    # Set up the genetic algorithm
    num_generations = 100
    num_parents_mating = 5
    sol_per_pop = 20
    num_genes = X_train.shape[1]
    
    ga_instance = pygad.GA(
        num_generations=num_generations,
        num_parents_mating=num_parents_mating,
        fitness_func=fitness_function,
        sol_per_pop=sol_per_pop,
        num_genes=num_genes,
        gene_space=[0, 1]
    )
    
    # Run the genetic algorithm
    ga_instance.run()
    
    # Save the fitness convergence plot
    plt.figure(figsize=(10, 6))
    ga_instance.plot_fitness(title=f"Fitness Convergence for {bin_name}")
    plt.savefig(f"{result_folder}/{bin_name}_fitness_convergence.png")  # Save the plot to a file
    plt.close()  # Close the plot to free up memory
    
    # Get the best solution (selected features)
    solution, solution_fitness, solution_idx = ga_instance.best_solution()
    selected_features_indices = np.where(solution == 1)[0]
    selected_features = X.columns[selected_features_indices]
    
    # Convert selected features to a comma-separated string
    selected_features_str = ", ".join(selected_features)
    
    # Train the final model using the selected features
    X_train_selected = X_train.iloc[:, selected_features_indices]
    X_test_selected = X_test.iloc[:, selected_features_indices]
    
    final_model = RandomForestClassifier(n_estimators=100, random_state=42)
    final_model.fit(X_train_selected, y_train)
    
    # Save the final model to a file
    with open(f'{result_folder}/{bin_name}_random_forest_model.pkl', 'wb') as file:
        pickle.dump(final_model, file)
    
    # Evaluate the final model
    y_pred = final_model.predict(X_test_selected)
    
    # Calculate evaluation metrics
    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    
    # Extract values from the confusion matrix
    TN, FP, FN, TP = conf_matrix.ravel()
    
    # Calculate Sensitivity, Specificity, PPV, and NPV
    sensitivity = TP / (TP + FN)
    specificity = TN / (TN + FP)
    ppv = TP / (TP + FP)
    npv = TN / (TN + FN)
    
    # Store metrics in a dictionary
    metrics = {
        "bin_number": bin_name,
        "accuracy": accuracy,
        "sensitivity": sensitivity,
        "specificity": specificity,
        "ppv": ppv,
        "npv": npv,
        "selected_features": selected_features_str  # Add selected features as a string
    }
    
    # Append metrics to the list
    metrics_list.append(metrics)
    
    # Print metrics for the current bin
    print(f"Metrics for {bin_name}:")
    print(f"Accuracy: {accuracy}")
    print(f"Sensitivity: {sensitivity}")
    print(f"Specificity: {specificity}")
    print(f"PPV: {ppv}")
    print(f"NPV: {npv}")
    print("Selected Features:", selected_features_str)
    print("\n")

In [None]:
# Save all metrics to a CSV file
metrics_df = pd.DataFrame(metrics_list)
metrics_df.to_csv(f'{result_folder}/bin_metrics.csv', index=False)

print(f"Metrics saved to '{result_folder}/bin_metrics.csv'.")
print("Fitness convergence plots saved for each bin.")

Focusing on [0,11]->0 [42,50]->1

In [None]:
df = pd.read_csv(f'{result_folder}/ID-Features-Vote.csv')

# Define the bin thresholds for the specific bin
bin_thresholds = [(0, 11), (42, 50)]

# Initialize a list to store metrics for all iterations
metrics_list = []

# Number of iterations
num_iterations = 10

In [None]:
# Process the bin for each iteration
for iteration in range(num_iterations):
    # Binarize the target variable based on thresholds
    bin_data = df.copy()
    bin_data['vote'] = np.where(
        (bin_data['vote'] >= bin_thresholds[0][0]) & (bin_data['vote'] <= bin_thresholds[0][1]), 0,
        np.where(
            (bin_data['vote'] >= bin_thresholds[1][0]) & (bin_data['vote'] <= bin_thresholds[1][1]), 1, np.nan
        )
    )
    
    # Drop rows where the target is NaN (outside the defined bins)
    bin_data = bin_data.dropna(subset=['vote'])
    
    # Remove unnecessary columns
    bin_data = bin_data.drop(columns=['ID', 'PAINTING'])
    
    # Separate features (X) and target (y)
    X = bin_data.drop(columns=['vote'])
    y = bin_data['vote']
    
    # Split the dataset into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42 + iteration)  # Change random state for each iteration
    
    # Define the fitness function for the genetic algorithm
    def fitness_function(ga_instance, solution, solution_idx):
        selected_features = np.where(solution == 1)[0]
        if len(selected_features) == 0:
            return 0  # Avoid empty selections
        
        X_train_selected = X_train.iloc[:, selected_features]
        X_test_selected = X_test.iloc[:, selected_features]
        
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train_selected, y_train)
        predictions = model.predict(X_test_selected)
        return accuracy_score(y_test, predictions)
    
    # Set up the genetic algorithm
    num_generations = 100
    num_parents_mating = 5
    sol_per_pop = 20
    num_genes = X_train.shape[1]
    
    ga_instance = pygad.GA(
        num_generations=num_generations,
        num_parents_mating=num_parents_mating,
        fitness_func=fitness_function,
        sol_per_pop=sol_per_pop,
        num_genes=num_genes,
        gene_space=[0, 1]
    )
    
    # Run the genetic algorithm
    ga_instance.run()
    
    # Get the best solution (selected features)
    solution, solution_fitness, solution_idx = ga_instance.best_solution()
    selected_features_indices = np.where(solution == 1)[0]
    selected_features = X.columns[selected_features_indices]
    
    # Convert selected features to a comma-separated string
    selected_features_str = ", ".join(selected_features)
    
    # Train the final model using the selected features
    X_train_selected = X_train.iloc[:, selected_features_indices]
    X_test_selected = X_test.iloc[:, selected_features_indices]
    
    final_model = RandomForestClassifier(n_estimators=100, random_state=42)
    final_model.fit(X_train_selected, y_train)
    
    # Save the final model to a file
    with open(f'{result_folder}/random_forest_model_iteration_{iteration + 1}.pkl', 'wb') as file:
        pickle.dump(final_model, file)
    
    # Evaluate the final model
    y_pred = final_model.predict(X_test_selected)
    
    # Calculate evaluation metrics
    accuracy = accuracy_score(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)
    
    # Extract values from the confusion matrix
    TN, FP, FN, TP = conf_matrix.ravel()
    
    # Calculate Sensitivity, Specificity, PPV, and NPV
    sensitivity = TP / (TP + FN)
    specificity = TN / (TN + FP)
    ppv = TP / (TP + FP)
    npv = TN / (TN + FN)
    
    # Store metrics in a dictionary
    metrics = {
        "iteration": iteration + 1,
        "accuracy": accuracy,
        "sensitivity": sensitivity,
        "specificity": specificity,
        "ppv": ppv,
        "npv": npv,
        "selected_features": selected_features_str,  # Add selected features as a string
    }
    
    # Append metrics to the list
    metrics_list.append(metrics)
    
    # Print metrics for the current iteration
    print(f"Metrics for iteration {iteration + 1}:")
    print(f"Accuracy: {accuracy}")
    print(f"Sensitivity: {sensitivity}")
    print(f"Specificity: {specificity}")
    print(f"PPV: {ppv}")
    print(f"NPV: {npv}")
    print("Selected Features:", selected_features_str)
    print("Model saved")
    print("\n")

In [None]:
# Save all metrics to a CSV file
metrics_df = pd.DataFrame(metrics_list)
metrics_df.to_csv(f'{result_folder}/iteration_metrics.csv', index=False)

print(f"Metrics saved to '{result_folder}/iteration_metrics.csv'.")

Knowledge Distillation via Mimicking Decision Tree

In [None]:
# Load the original dataset
df = pd.read_csv(f'{result_folder}/ID-Features-Vote.csv')

# Load the trained Random Forest model
with open(f'{result_folder}/random_forest_model_iteration_9.pkl', 'rb') as file:
    rf_model = pickle.load(file)

# Select the features used by the Random Forest model 
# (assuming they are the same as those used during training)
# If the selected features were saved separately, they should be loaded from a file or a variable
selected_features = [
    'min_fixations_per_area',  # Minimum number of fixations per area
    'max_fixations_per_area',  # Maximum number of fixations per area
    'mean_fixations_per_area',  # Mean number of fixations per area
    'last_series_num_fixations',  # Number of fixations in the last series
    'last_is_longest_duration',  # Whether the last fixation series has the longest duration
    'total_duration_fixations',  # Total duration of fixations
    'min_dilatation',  # Minimum pupil dilation
    'max_dilatation',  # Maximum pupil dilation
    'mean_dilatation',  # Mean pupil dilation
    'std_dilatation',  # Standard deviation of pupil dilation
    'instance_area_mean_dilatation_min',  # Minimum mean pupil dilation in an instance area
    'instance_area_mean_dilatation_std',  # Standard deviation of mean pupil dilation in an instance area
    'is_max_dilatation_in_first_instance_area',  # Whether the maximum pupil dilation occurs in the first instance area
    'returns_to_initial_area',  # Number of times the gaze returns to the initial area
    'initial_instance_area_duration_ms',  # Duration of the first instance area in milliseconds
    'initial_instance_area_n_fixations'  # Number of fixations in the first instance area
]


In [None]:
# Iterate over different maximum depths for the Decision Tree (ranging from 3 to 5)
for depth in range(3, 6):  

    # Step 1: Prepare the dataset

    # Extract the selected features from the dataset
    X = df[selected_features]  

    # Use the pre-trained Random Forest model to predict labels (0 or 1) for each instance in the dataset
    y_pred = rf_model.predict(X)  

    # Append the predicted labels as a new column in the dataset
    df['RFLabel'] = y_pred  

    # Save the updated dataset, now including the predicted labels, to a CSV file
    df.to_csv(f'{result_folder}/RFLabeled_ID-Features-Vote.csv', index=False)  
    print(f"Labeled dataset saved in '{result_folder}/RFLabeled_ID-Features-Vote.csv'.")

    # Step 2: Train a Decision Tree to mimic the Random Forest model

    # Define the feature matrix (X_train) and target labels (y_train) for training the Decision Tree
    X_train = df[selected_features]  
    y_train = df['RFLabel']  

    # Create a Decision Tree classifier with a specific maximum depth
    # The depth is controlled by the loop variable `depth`, allowing experimentation with different tree depths
    dt_model = DecisionTreeClassifier(random_state=42, max_depth=depth)  

    # Train the Decision Tree model on the dataset
    dt_model.fit(X_train, y_train)  

    # Step 3: Evaluate the Decision Tree’s performance

    # Predict labels on the training set to check how well the Decision Tree mimics the Random Forest model
    y_pred_dt = dt_model.predict(X_train)  

    # Compute the accuracy score to measure how well the Decision Tree replicates the Random Forest’s decisions
    accuracy = accuracy_score(y_train, y_pred_dt)  
    print(f"Decision Tree accuracy on the training set (depth={depth}): {accuracy}")

    # Step 4: Save the trained Decision Tree model

    # Serialize and save the trained Decision Tree model to a file
    with open(f'{result_folder}/dt_mimick_depth{depth}.pkl', 'wb') as file:  
        pickle.dump(dt_model, file)  

    print(f"Decision Tree model (depth={depth}) saved in '{result_folder}/dt_mimick_depth{depth}.pkl'.")


In [None]:
def load_tree_model(pickle_file):
    """
    Load a decision tree model from a pickle file.

    This function deserializes and loads a trained Decision Tree model
    that has been previously saved using Python's `pickle` module.

    Args:
        pickle_file (str): The path to the pickle file containing the saved model.

    Returns:
        DecisionTreeClassifier: The deserialized Decision Tree model.
    """
    # Open the specified pickle file in read-binary mode
    with open(pickle_file, "rb") as f:
        # Load the serialized Decision Tree model from the file
        tree = pickle.load(f)
    
    # Return the loaded model to be used for predictions or further analysis
    return tree


In [None]:
def extract_text_rules(tree, feature_names=None):
    """
    Extracts decision rules from a trained Decision Tree model in a human-readable text format.

    This function uses `export_text` from `sklearn.tree` to generate a textual representation
    of the decision rules learned by the tree. It is useful for understanding how the 
    Decision Tree makes decisions and for debugging or model interpretation.

    Args:
        tree (DecisionTreeClassifier or DecisionTreeRegressor): 
            A trained scikit-learn Decision Tree model.
        feature_names (list of str, optional): 
            A list of feature names corresponding to the input features used in training the tree.
            If not provided, default feature indices will be used.

    Returns:
        str: A textual representation of the tree's decision rules.
    """
    return export_text(tree, feature_names=feature_names)


In [None]:
def extract_rules_cnf(decision_tree, feature_names):
    """
    Extracts the decision rules of a trained Decision Tree model in Conjunctive Normal Form (CNF) 
    along with the corresponding decision path.

    This function traverses the Decision Tree recursively, extracting the rules that define 
    each decision path. It constructs logical conditions in CNF, which makes it useful 
    for interpretability and logical reasoning.

    Args:
        decision_tree (DecisionTreeClassifier or DecisionTreeRegressor): 
            A trained scikit-learn Decision Tree model.
        feature_names (list of str): 
            A list of feature names corresponding to the input features used in training the tree.

    Returns:
        dict: A nested dictionary where each leaf node contains:
              - `"class"`: The predicted class (for classification trees) or regression value.
              - `"rule"`: The corresponding CNF-style rule that leads to that prediction.
    """

    # Access the internal tree structure of the trained Decision Tree model
    tree_ = decision_tree.tree_

    def recurse(node, path):
        """
        Recursively traverses the Decision Tree to construct CNF rules.

        Args:
            node (int): The current node index in the Decision Tree structure.
            path (list of str): A list of conditions accumulated along the path.

        Returns:
            dict: A dictionary containing the decision rule and class at each leaf node.
        """
        if tree_.feature[node] != -2:  # Internal node
            # Extract the feature name and threshold used for splitting at this node
            name = feature_names[tree_.feature[node]]
            threshold = tree_.threshold[node]

            # Construct the left and right paths based on the threshold condition
            left_path = path + [f"{name} <= {threshold}"]
            right_path = path + [f"{name} > {threshold}"]

            # Recursively process the left and right child nodes
            return {
                "left": recurse(tree_.children_left[node], left_path),
                "right": recurse(tree_.children_right[node], right_path)
            }
        else:  # Leaf node
            # Extract the class prediction at this leaf (the class with the highest probability)
            return {"class": int(np.argmax(tree_.value[node])), "rule": " AND ".join(path)}

    # Start recursion from the root node (index 0)
    return recurse(0, [])


In [None]:
def evaluate_rules_on_dataset(tree, dataset, feature_names):
    """
    Evaluates the support and confidence of decision rules extracted from a Decision Tree 
    on a given dataset.

    This function:
    1. Extracts the CNF-style rules from a trained Decision Tree.
    2. Matches each rule against the dataset to determine how frequently it applies (support).
    3. Computes the confidence of each rule by measuring how often it correctly predicts the class.

    Args:
        tree (DecisionTreeClassifier): 
            A trained scikit-learn Decision Tree model.
        dataset (pd.DataFrame): 
            A Pandas DataFrame containing the feature values and actual class labels.
            - The last column is assumed to contain the true class labels.
        feature_names (list of str): 
            A list of feature names corresponding to the input features used in training the tree.

    Returns:
        pd.DataFrame: A DataFrame containing:
            - `"rule"`: The logical condition defining the rule.
            - `"class"`: The predicted class according to the rule.
            - `"support"`: The proportion of instances in the dataset that satisfy the rule.
            - `"confidence"`: The proportion of instances satisfying the rule that also match the predicted class.
    """

    # Extract decision rules in CNF format from the Decision Tree
    rules = extract_rules_cnf(tree, feature_names)
    
    # Initialize a list to store the evaluation results
    results = []
    
    # Total number of samples in the dataset (for support calculation)
    total_samples = len(dataset)

    def match_rule(row, rule):
        """
        Checks whether a given dataset row satisfies a specific decision rule.

        Args:
            row (pd.Series): A single instance from the dataset.
            rule (dict): A rule dictionary containing a 'rule' key with logical conditions.

        Returns:
            bool: True if the row satisfies the rule, False otherwise.
        """
        if "rule" not in rule:
            return False  # If no rule is present, no match is possible
        
        # Split the rule into its individual conditions
        conditions = rule["rule"].split(" AND ")
        
        # Evaluate each condition on the dataset row
        for condition in conditions:
            feature, op, threshold = condition.split()  # Parse condition format: "Feature_X <= 2.5"
            threshold = float(threshold)  # Convert threshold to float
            
            # Check if the condition holds for the current row
            if op == "<=" and row[feature] > threshold:
                return False
            if op == ">" and row[feature] <= threshold:
                return False
        
        return True  # If all conditions are satisfied, return True

    def traverse_rules(node):
        """
        Recursively traverses the extracted decision tree rules and evaluates them on the dataset.

        Args:
            node (dict): A rule node from the extracted CNF rules.
        """
        if "class" in node:  # If the node is a leaf (i.e., a final decision rule)
            # Apply the rule to all rows in the dataset to determine how many match
            matching_rows = dataset.apply(lambda row: match_rule(row, node), axis=1)

            # Compute support: proportion of dataset instances satisfying the rule
            support = matching_rows.sum() / total_samples

            # Compute confidence: proportion of matching instances that also have the expected class
            if matching_rows.sum() > 0:
                confidence = (matching_rows & (dataset.iloc[:, -1] == node["class"]).values).sum() / matching_rows.sum()
            else:
                confidence = 0  # Avoid division by zero when no rows match

            # Store the rule evaluation results
            results.append({
                "rule": node["rule"], 
                "class": node["class"], 
                "support": support, 
                "confidence": confidence
            })
        else:
            # Recursively evaluate rules in left and right child nodes
            traverse_rules(node["left"])
            traverse_rules(node["right"])

    # Start traversing from the root of the rules dictionary
    traverse_rules(rules)

    # Return the results as a Pandas DataFrame
    return pd.DataFrame(results)


In [None]:
# Iterate over different depths of the trained Decision Tree models
for d in range(3, 6):
    # Construct the file path for the serialized Decision Tree model corresponding to the current depth
    pickle_file = f'{result_folder}/dt_mimick_depth{d}.pkl'  # Replace with actual file path

    # Define the dataset file containing features and predicted labels from the Random Forest model
    dataset_file = f'{result_folder}/RFLabeled_ID-Features-Vote.csv'

    # Load the dataset from the CSV file
    dataset = pd.read_csv(dataset_file)

    # Define the list of feature names (must match those used during Decision Tree training)
    feature_names = [
        'min_fixations_per_area', 'max_fixations_per_area', 'mean_fixations_per_area',
        'last_series_num_fixations', 'last_is_longest_duration', 'total_duration_fixations',
        'min_dilatation', 'max_dilatation', 'mean_dilatation', 'std_dilatation',
        'instance_area_mean_dilatation_min', 'instance_area_mean_dilatation_std',
        'is_max_dilatation_in_first_instance_area', 'returns_to_initial_area',
        'initial_instance_area_duration_ms', 'initial_instance_area_n_fixations'
    ]  # Replace with actual feature names if needed

    # Load the trained Decision Tree model from the corresponding pickle file
    tree = load_tree_model(pickle_file)

    # Evaluate the extracted decision rules on the dataset
    results = evaluate_rules_on_dataset(tree, dataset, feature_names)

    # Save the extracted rules and their evaluation metrics to a CSV file
    output_file = f"{result_folder}/rules_depth{d}.csv"
    results.to_csv(output_file, index=False)

    # Print confirmation message
    print(f"Rules saved in {output_file}")
