The aim of this notebook will be to find the following data:
* decision tree to plot in the visualization
* the generated neighbourhood generated by LORE

Everything will be tested on the iris dataset for quick running times

### dataset import

In [140]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
import json
from sklearn.preprocessing import LabelEncoder

In [141]:
data_file = "iris.json"
# Load and prepare data
data = load_iris()
X = data.data
y = data.target
feature_names = list(data.feature_names)
target_names = list(data.target_names)

### LORE initial

read data

In [142]:
from lore_sa.dataset import TabularDataset
import pandas as pd

In [143]:
data_dict = {name: X[:, i] for i, name in enumerate(feature_names)}
target_name = 'target'
data_dict[target_name] = [target_names[i] for i in y]  # Map numerical targets to names

dataset = TabularDataset.from_dict(data_dict, 'target')
dataset.df.dropna(inplace = True)
dataset.df

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,virginica
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica


train model

In [144]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from lore_sa.bbox import sklearn_classifier_bbox

In [145]:

def tutorial_train_model_generalized(dataset: TabularDataset, target_name:str, ):
    numeric_indices = [v['index'] for k, v in dataset.descriptor['numeric'].items()]
    categorical_indices = [v['index'] for k, v in dataset.descriptor['categorical'].items()]

    # Create preprocessor using dynamic indices
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numeric_indices),
            ('cat', OrdinalEncoder(), categorical_indices)
        ]
    )

    # Remove rare classes with fewer than 2 instances
    valid_classes = dataset.df[target_name].value_counts()[dataset.df[target_name].value_counts() > 1].index
    dataset.df = dataset.df[dataset.df[target_name].isin(valid_classes)]

        # Select features and target
    X = dataset.df.iloc[:, numeric_indices + categorical_indices]  # Select all features
    y = dataset.df[target_name]

    # Split dataset
    X_train, X_test, y_train, y_test = train_test_split(X, y,
                test_size=0.3, random_state=42, stratify=y)

    model = make_pipeline(preprocessor, RandomForestClassifier(n_estimators=100, random_state=42))
    
    model.fit(X_train, y_train)
    
    return sklearn_classifier_bbox.sklearnBBox(model)

In [146]:
bbox = tutorial_train_model_generalized(dataset, target_name)

### encoding decoding

In [147]:
from lore_sa.encoder_decoder import ColumnTransformerEnc

tabular_enc = ColumnTransformerEnc(dataset.descriptor)
ref_value = dataset.df.iloc[0].values[:-1]
encoded = tabular_enc.encode([ref_value])
decoded = tabular_enc.decode(encoded)

print(f"Original value: {ref_value}")
print(f"Encoded value: {encoded}")
print(f"Decoded value: {decoded}")

Original value: [5.1 3.5 1.4 0.2]
Encoded value: [[5.1 3.5 1.4 0.2]]
Decoded value: [[5.1 3.5 1.4 0.2]]


the encoder is created using the dataset.descriptor, which is a dict so it can be saved to json and the encoder is then created based on the content read from it

In [148]:
dataset.descriptor

{'numeric': {'sepal length (cm)': {'index': 0,
   'min': 4.3,
   'max': 7.9,
   'mean': 5.843333333333334,
   'std': 0.828066127977863,
   'median': 5.8,
   'q1': 5.1,
   'q3': 6.4},
  'sepal width (cm)': {'index': 1,
   'min': 2.0,
   'max': 4.4,
   'mean': 3.0573333333333337,
   'std': 0.4358662849366982,
   'median': 3.0,
   'q1': 2.8,
   'q3': 3.3},
  'petal length (cm)': {'index': 2,
   'min': 1.0,
   'max': 6.9,
   'mean': 3.7580000000000005,
   'std': 1.7652982332594662,
   'median': 4.35,
   'q1': 1.6,
   'q3': 5.1},
  'petal width (cm)': {'index': 3,
   'min': 0.1,
   'max': 2.5,
   'mean': 1.1993333333333336,
   'std': 0.7622376689603465,
   'median': 1.3,
   'q1': 0.3,
   'q3': 1.8}},
 'categorical': {},
 'ordinal': {},
 'target': {'target': {'index': 4,
   'distinct_values': ['setosa', 'versicolor', 'virginica'],
   'count': {'setosa': 50, 'versicolor': 50, 'virginica': 50}}}}

In [149]:
with open ("descriptor.json", "w") as f:
    json.dump(dataset.descriptor, f, indent=4)

the value to use for the prediction can be found via:

In [150]:
ref_value = dataset.df.iloc[0].values[:-1]
list(ref_value)

[5.1, 3.5, 1.4, 0.2]

and decoded using

In [151]:
encoded = tabular_enc.encode([ref_value])
decoded = tabular_enc.decode(encoded)
list(decoded[0])

[5.1, 3.5, 1.4, 0.2]

### neighborhood generation

In [152]:
from lore_sa.neighgen import RandomGenerator

select the istance to explain

In [153]:
num_row = 0
x = dataset.df.iloc[num_row][:-1]
x

sepal length (cm)    5.1
sepal width (cm)     3.5
petal length (cm)    1.4
petal width (cm)     0.2
Name: 0, dtype: object

encode it

In [154]:
z = tabular_enc.encode([x.values])[0] # remove the class feature from the input instance

creates the neighborhood

In [155]:
gen = RandomGenerator(bbox=bbox, dataset=dataset, encoder=tabular_enc, ocr=0.1)
neighbour = gen.generate(z, 100, dataset.descriptor, tabular_enc)
neighbour

array([[5.1, 3.5, 1.4, 0.2],
       [5.1, 3.5, 1.4, 0.2],
       [5.1, 3.5, 1.4, 0.2],
       [5.1, 3.5754504021422635, 1.4, 0.2],
       [5.1, 3.5754504021422635, 1.3605323895586359, 0.2],
       [5.1, 3.5754504021422635, 1.3605323895586359, 0.2],
       [5.1, 3.5754504021422635, 1.3605323895586359, 0.2],
       [4.379710182418424, 3.5754504021422635, 1.3605323895586359, 0.2],
       [4.379710182418424, 3.5754504021422635, 1.3605323895586359, 0.2],
       [4.379710182418424, 3.5754504021422635, 1.3605323895586359, 0.2],
       [5.768193963565839, 2.7885684336779435, 1.3605323895586359, 0.2],
       [5.768193963565839, 2.7885684336779435, 1.3605323895586359, 0.2],
       [5.768193963565839, 2.7885684336779435, 1.3605323895586359, 0.2],
       [5.768193963565839, 2.7885684336779435, 1.3605323895586359, 0.2],
       [5.768193963565839, 2.7885684336779435, 6.8200561196772425, 0.2],
       [5.768193963565839, 2.7885684336779435, 6.8200561196772425, 0.2],
       [5.768193963565839, 2.788568

### surrogate model

In [156]:
from lore_sa.surrogate import DecisionTreeSurrogate

create X, y and yz for the decision tree surrogate

In [157]:
# decode the neighborhood to be labeled by the blackbox model
neighb_train_X = tabular_enc.decode(neighbour)
neighb_train_y = bbox.predict(neighb_train_X)
# encode the target class to the surrogate model
neighb_train_yz = tabular_enc.encode_target_class(neighb_train_y.reshape(-1, 1)).squeeze()

train the surrogate model

In [158]:
dt = DecisionTreeSurrogate()
x = dt.train(neighbour, neighb_train_yz)

### data extraction for the plots

#### decision tree

get the decision tree _tree for plotting

In [159]:
dt.get_tree_structure()

<sklearn.tree._tree.Tree at 0x1945a513420>

In [160]:
#utils file in decision tree visualization prototype folder
from sklearn.tree import DecisionTreeClassifier
from dataclasses import dataclass
from typing import List, Optional
from dataclasses import asdict
import json
from sklearn.tree._tree import Tree 

@dataclass
class TreeNode:
    """Class to store decision tree node information"""
    # Unique identifier for the node in the tree
    node_id: int
    
    # The name of the feature used for the decision at this node. 
    # If the node is a leaf, this will be `None`.
    feature_name: Optional[str]
    
    # The threshold value for the feature used to split the data at this node. 
    # If the node is a leaf, this will be `None`.
    threshold: Optional[float]
    
    # The node ID of the left child node. If the node is a leaf, this will be `None`.
    left_child: Optional[int]
    
    # The node ID of the right child node. If the node is a leaf, this will be `None`.
    right_child: Optional[int]
    
    # Indicates whether this node is a leaf node (`True` if leaf, `False` if internal).
    is_leaf: bool
    
    # The class label predicted by the leaf node. 
    # Only set if the node is a leaf; otherwise, it is `None`.
    class_label: Optional[str]
    
    # The number of samples (data points) that reached this node during training.
    samples: int

def extract_tree_structure(tree_classifier: DecisionTreeClassifier, feature_names: List[str], target_names: List[str]) -> List[TreeNode]: 
    """
    Extract node information from a trained DecisionTreeClassifier

    Parameters:
    -----------
    tree_classifier : DecisionTreeClassifier
        A trained sklearn DecisionTreeClassifier
    feature_names : List[str]
        A list of feature names
    target_names : List[str]
        A list of target class labels

    Returns:
    --------
    List[TreeNode]
        List of TreeNode objects containing the tree structure
    """
    if isinstance(tree_classifier, DecisionTreeClassifier): #account for LORE DecisionTreeSurrogate class
        tree = tree_classifier.tree_
    else:
        tree = tree_classifier

    nodes = []

    for node_id in range(tree.node_count):
        # Check if node is leaf
        is_leaf = tree.children_left[node_id] == -1

        # Get node information
        if is_leaf:
            # Get the class label based on the majority class in the leaf
            class_label_index = int(tree.value[node_id].argmax())
            class_label = target_names[class_label_index]
            
            node = TreeNode(
                node_id=node_id,
                feature_name=None,
                threshold=None,
                left_child=None,
                right_child=None,
                is_leaf=True,
                class_label=class_label,
                samples=int(tree.n_node_samples[node_id])
            )
        else:
            feature_name = feature_names[int(tree.feature[node_id])]
            threshold = float(tree.threshold[node_id])
            left_child = int(tree.children_left[node_id])
            right_child = int(tree.children_right[node_id])

            node = TreeNode(
                node_id=node_id,
                feature_name=feature_name,
                threshold=threshold,
                left_child=left_child,
                right_child=right_child,
                is_leaf=False,
                class_label=None,
                samples=int(tree.n_node_samples[node_id])
            )

        nodes.append(node)

    return nodes

def save_tree_to_json(nodes, filename: str, indent: int = 4):
    """
    Save the tree structure to a JSON file
    
    Parameters:
    -----------
    nodes : List[TreeNode]
        List of TreeNode objects to save
    filename : str
        Path to save the JSON file
    indent : int
        Number of spaces for indentation
    """
    # Convert TreeNodes to dictionaries
    nodes_dict = [asdict(node) for node in nodes]
    
    # Save to file with indentation
    with open(filename, 'w') as f:
        json.dump(nodes_dict, f, indent=indent)

In [161]:
save_tree_to_json(extract_tree_structure(dt.get_tree_structure(), feature_names=['engine_age', 'length', 'power', 'month', 'weight', 'y_month',
       'year', 'surf_temp'], target_names=target_names), filename="loreTreeTest.json")

#### PCA

create the dataset out of the generated neighborhood

In [162]:
df = pd.DataFrame(neighb_train_X, columns=feature_names)  # X as features
df[target_name] = neighb_train_y  # y as target column
df

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,setosa
1,5.1,3.5,1.4,0.2,setosa
2,5.1,3.5,1.4,0.2,setosa
3,5.1,3.57545,1.4,0.2,setosa
4,5.1,3.57545,1.360532,0.2,setosa
...,...,...,...,...,...
95,7.600285,2.649756,5.691993,1.704839,virginica
96,5.177167,2.649756,5.691993,1.704839,virginica
97,5.177167,2.649756,5.691993,1.704839,virginica
98,5.177167,2.649756,5.691993,1.704839,virginica


In [163]:
dt.get_dt()

##### PCA steps

In [164]:
step = 0.1

In [165]:
# Standardize the dataset
scaler = StandardScaler()
X_scaled = scaler.fit_transform(neighb_train_X)

# Apply PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

In [166]:
# Split the data and train the tree (following your original script)
X_train_pca, X_test_pca, y_train, y_test = train_test_split(X_pca, neighb_train_y, test_size=0.3, random_state=42)
dt_classifier_pca = DecisionTreeClassifier(random_state=42)  # Removed max_depth constraint
dt_classifier_pca.fit(X_train_pca, y_train)

# Generate decision boundary data with finer grid
x_min, x_max = X_pca[:, 0].min() - 1, X_pca[:, 0].max() + 1
y_min, y_max = X_pca[:, 1].min() - 1, X_pca[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, step), np.arange(y_min, y_max, step))

In [167]:
# Get predictions for the grid
Z = dt_classifier_pca.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

In [168]:
from sklearn.cluster import KMeans
import numpy as np

def filter_points_by_class_kmeans(points, labels, threshold=500, thresholdMultiplierForClusteringSet=4, random_state=42):
    """
    For each class, if the number of instances exceeds a given threshold,
    cluster the points into k clusters and use the closest real points to the centroids
    as representative points. Otherwise, keep all points.

    Parameters:
        points (np.ndarray): Array of shape (n_points, 2) with PCA coordinates.
        labels (np.ndarray): Array of shape (n_points,) with class labels.
        threshold (int): Minimum number of points a class must have before applying clustering.
        thresholdMultiplierForClusteringSet (int): Multiplier for determining number of sampled points.
        random_state (int): Random seed for reproducibility.

    Returns:
        filtered_points (np.ndarray): Filtered array of representative points.
        filtered_labels (np.ndarray): Corresponding class labels.
    """
    filtered_points = []
    filtered_labels = []
    unique_classes = np.unique(labels)

    for cls in unique_classes:        # Get indices and points for this class
        class_indices = np.where(labels == cls)[0]
        class_points = points[class_indices]
        n_points = len(class_points)

        if n_points > threshold:
            # Ensure we don't sample more than available
            sample_size = min(threshold * thresholdMultiplierForClusteringSet, n_points)

            rng = np.random.RandomState(random_state)
            sampled_indices = rng.choice(n_points, size=sample_size, replace=False)
            sampled_points = class_points[sampled_indices]

            # Use k-means clustering to find cluster centers
            kmeans = KMeans(n_clusters=threshold, random_state=random_state, n_init=10)
            kmeans.fit(sampled_points)
            centroids = kmeans.cluster_centers_

            # Find the closest real point to each centroid
            selected_points = []
            for centroid in centroids:
                distances = np.linalg.norm(class_points - centroid, axis=1)
                closest_index = np.argmin(distances)
                selected_points.append(class_points[closest_index])

            selected_points = np.array(selected_points)
        else:
            # Keep original points if below threshold
            selected_points = class_points

        filtered_points.append(selected_points)
        filtered_labels.extend([cls] * len(selected_points))

    # Convert to numpy array
    filtered_points = np.vstack(filtered_points)
    filtered_labels = np.array(filtered_labels)

    return filtered_points, filtered_labels

In [169]:
filtered_pca_data, filtered_labels = filter_points_by_class_kmeans(X_pca, neighb_train_y, threshold=2000, thresholdMultiplierForClusteringSet = 5, random_state=42)

print("Original PCA points:", len(X_pca))
print("Filtered PCA points:", len(filtered_pca_data))

Original PCA points: 100
Filtered PCA points: 100


In [170]:
# Function to format all features for axis labels
def format_pc_label(pc_loadings, feature_names, pc_index):
    # Format as string: "PC1: feature1 (+0.62), feature2 (-0.43), ..."
    label = f"PC{pc_index + 1}: " + ", ".join([f"{name} ({value:+.2f})" for name, value in zip(feature_names, pc_loadings)])
    return label

In [171]:
# Generate full labels for PC1 and PC2
pc1_label = format_pc_label(pca.components_[0], feature_names, 0)
pc2_label = format_pc_label(pca.components_[1], feature_names, 1)

In [172]:
from scipy.spatial import Voronoi
from shapely.geometry import Polygon
from shapely.ops import unary_union
import networkx as nx
import numpy as np

# Create a graph to store adjacent Voronoi regions
G = nx.Graph()

# Build the Voronoi diagram
vor = Voronoi(np.c_[xx.ravel(), yy.ravel()])
regions, vertices = vor.regions, vor.vertices

# Create a mapping of region indices to their classes
region_class_map = {}
region_polygons = []
region_class_list = []
region_index_map = {}  # Maps Voronoi region index to polygon list index

polygon_idx = 0
for point_index, region_index in enumerate(vor.point_region):
    region = regions[region_index]
    if not -1 in region and len(region) > 0:  # Ignore infinite regions
        polygon = Polygon([vertices[i] for i in region])
        region_polygons.append(polygon)
        region_class_map[region_index] = Z.ravel()[point_index]
        region_class_list.append(Z.ravel()[point_index])
        region_index_map[region_index] = polygon_idx  # Store index
        G.add_node(region_index)  # Add region as a graph node
        polygon_idx += 1

# Find adjacent regions using Voronoi ridges
for (p1, p2), ridge_vertices in zip(vor.ridge_points, vor.ridge_vertices):
    if -1 in ridge_vertices:
        continue  # Ignore infinite regions
    r1, r2 = vor.point_region[p1], vor.point_region[p2]
    
    # Merge if they belong to the same class
    if region_class_map.get(r1) == region_class_map.get(r2):
        G.add_edge(r1, r2)

# Find connected components (groups of merged regions)
merged_regions = []
merged_classes = []

for component in nx.connected_components(G):
    merged_polygon = unary_union([region_polygons[region_index_map[i]] for i in component if i in region_index_map])
    merged_regions.append(merged_polygon)
    merged_classes.append(region_class_map[list(component)[0]])  # Assign class from any region

# Convert merged regions back to JSON format
merged_region_polygons = [list(p.exterior.coords) for p in merged_regions]

In [173]:
# Encode the merged classes using LabelEncoder
label_encoder = LabelEncoder()
label_encoder.fit(target_names)  # Fit on target names

encoded_merged_classes = label_encoder.transform(merged_classes)

In [175]:
# Save the data for D3 visualization
with open('lorePCATest.json', "w") as f:
    json.dump(
        {
            "pcaData": filtered_pca_data.tolist(),
            "targets": filtered_labels.tolist(),
            "targetNames": list(target_names),
            "decisionBoundary": {
                "regions": merged_region_polygons,
                "regionClasses": encoded_merged_classes.tolist(),  # Convert NumPy int32 to Python int
                "xRange": [float(x_min), float(x_max)],
                "yRange": [float(y_min), float(y_max)],
            },
            "xAxisLabel": pc1_label,
            "yAxisLabel": pc2_label,
        },
        f,
        indent=4
    )