In [1]:
from pgmpy.readwrite import XMLBIFReader

# Load the file
reader = XMLBIFReader("models/chessTAN.xml")
model = reader.get_model()

# 1. Get the Structure (Edges)
# Returns a list of tuples: (Parent, Child)
print("--- Structure (Edges) ---")
print(model.edges())

# 2. Get Variable States (Outcomes/Cardinality)
# This is crucial for parameter learning to know matrix dimensions
print("\n--- Variable States ---")
for node in model.nodes():
    states = model.get_cpds(node).state_names[node]
    print(f"Variable: {node}, Outcomes: {states}")

# 3. Get Parents for a specific node (useful for your CLL loop)
print("\n--- Specific Parent Retrieval ---")
target_node = 'x1'
parents = model.get_parents(target_node)
print(f"Parents of {target_node}: {parents}")

  from .autonotebook import tqdm as notebook_tqdm


--- Structure (Edges) ---
[('x1', 'x14'), ('x2', 'x16'), ('x2', 'x18'), ('x3', 'x24'), ('x5', 'x12'), ('x5', 'x23'), ('x7', 'x2'), ('x7', 'x5'), ('x8', 'x7'), ('x9', 'x8'), ('x9', 'x22'), ('x10', 'x21'), ('x11', 'x1'), ('x11', 'x15'), ('x11', 'x26'), ('x11', 'x36'), ('x13', 'x31'), ('x18', 'x13'), ('x18', 'x34'), ('x21', 'x33'), ('x22', 'x10'), ('x23', 'x17'), ('x26', 'x35'), ('x27', 'x30'), ('x30', 'x28'), ('x31', 'x11'), ('x31', 'x19'), ('x31', 'x20'), ('x31', 'x25'), ('x32', 'x6'), ('x32', 'x29'), ('x33', 'x27'), ('x34', 'x3'), ('x34', 'x4'), ('x35', 'x32'), ('Class', 'x1'), ('Class', 'x2'), ('Class', 'x3'), ('Class', 'x4'), ('Class', 'x5'), ('Class', 'x6'), ('Class', 'x7'), ('Class', 'x8'), ('Class', 'x9'), ('Class', 'x10'), ('Class', 'x11'), ('Class', 'x12'), ('Class', 'x13'), ('Class', 'x14'), ('Class', 'x15'), ('Class', 'x16'), ('Class', 'x17'), ('Class', 'x18'), ('Class', 'x19'), ('Class', 'x20'), ('Class', 'x21'), ('Class', 'x22'), ('Class', 'x23'), ('Class', 'x24'), ('Class',

In [3]:
import numpy as np
import pandas as pd
from pgmpy.readwrite import XMLBIFReader
import random
import copy

# ==========================================
# 1. The Bayesian Network & Objective Class
# ==========================================
class DiscriminativeBN:
    def __init__(self, xml_path, target_col):
        """
        xml_path: Path to the XMLBIF file
        target_col: The name of the variable we want to predict (e.g., 'Class')
        """
        self.target_col = target_col
        
        # 1. Parse Structure
        reader = XMLBIFReader(xml_path)
        self.model = reader.get_model()
        self.nodes = list(self.model.nodes())
        
        # 2. Extract Cardinality (Outcomes) & Parents
        self.var_info = {} # Stores parents and outcome list for each node
        self.param_map = [] # Maps flat beta vector index to (node, parent_config, outcome_idx)
        
        self.total_params = 0
        
        for node in self.nodes:
            cpd = self.model.get_cpds(node)
            outcomes = cpd.state_names[node]
            parents = self.model.get_parents(node)
            
            # For each node, we need to know the dimension of its parent configurations
            parent_cards = [len(self.model.get_cpds(p).state_names[p]) for p in parents]
            total_parent_configs = np.prod(parent_cards) if parent_cards else 1
            
            self.var_info[node] = {
                'parents': parents,
                'outcomes': outcomes,
                'cardinality': len(outcomes),
                'parent_cardinalities': parent_cards,
                'num_parent_configs': int(total_parent_configs)
            }
            
            # Map parameters for optimization vector beta
            # We need (Num_Parent_Configs * Num_Outcomes) weights per node
            self.total_params += int(total_parent_configs * len(outcomes))

    def _get_parent_config_index(self, row, node):
        """Helper to find the index of the parent configuration in the CPD table"""
        parents = self.var_info[node]['parents']
        if not parents:
            return 0
        
        # Calculate flattened index for parent combination
        # This assumes standard pgmpy/BIF ordering logic
        idx = 0
        stride = 1
        for p in reversed(parents):
            p_val = row[p]
            p_outcomes = self.var_info[p]['outcomes']
            p_idx = p_outcomes.index(str(p_val)) # Ensure string matching
            idx += p_idx * stride
            stride *= len(p_outcomes)
        return idx

    def betas_to_probabilities(self, betas):
        """
        The Greiner Transformation: Beta (Real) -> Theta (Probabilities)
        Uses Softmax per parent-configuration row.
        """
        # Reconstruct CPD tables from flat beta vector
        current_idx = 0
        tables = {}
        
        for node in self.nodes:
            info = self.var_info[node]
            n_rows = info['num_parent_configs']
            n_cols = info['cardinality']
            
            # Slice beta vector
            node_betas = betas[current_idx : current_idx + (n_rows * n_cols)]
            current_idx += (n_rows * n_cols)
            
            # Reshape to (Parent_Configs, Outcomes)
            beta_matrix = node_betas.reshape((n_rows, n_cols))
            
            # Softmax row-wise (axis 1)
            # exp(b) / sum(exp(b))
            max_b = np.max(beta_matrix, axis=1, keepdims=True) # Stability trick
            exp_b = np.exp(beta_matrix - max_b)
            probs = exp_b / np.sum(exp_b, axis=1, keepdims=True)
            
            tables[node] = probs
            
        return tables

    def calculate_cll(self, betas, data):
        """
        Calculates Conditional Log Likelihood: P(Target | Evidence)
        CLL = Sum_data [ log( P(Target, Evidence) / Sum_target'(P(Target', Evidence)) ) ]
        """
        tables = self.betas_to_probabilities(betas)
        log_likelihood_sum = 0
        
        # Pre-process data to list of dicts for speed
        records = data.to_dict('records')
        
        target_outcomes = self.var_info[self.target_col]['outcomes']
        
        for row in records:
            # 1. Calculate Joint Probability of the ACTUAL row: P(Target=t_actual, Evidence)
            # We compute joint as Product(P(node|parents))
            log_joint_actual = 0
            for node in self.nodes:
                p_idx = self._get_parent_config_index(row, node)
                val_idx = self.var_info[node]['outcomes'].index(str(row[node]))
                prob = tables[node][p_idx, val_idx]
                log_joint_actual += np.log(prob + 1e-10) # Small epsilon
            
            joint_actual = np.exp(log_joint_actual)

            # 2. Calculate Marginal Probability of Evidence: Sum_over_Target(P(Target=t, Evidence))
            # We iterate over all possible values of the Target variable, keeping evidence fixed
            marginal_sum = 0
            
            temp_row = row.copy()
            for t_val in target_outcomes:
                temp_row[self.target_col] = t_val
                
                # Compute joint for this hypothetical target value
                current_log_joint = 0
                for node in self.nodes:
                    p_idx = self._get_parent_config_index(temp_row, node)
                    val_idx = self.var_info[node]['outcomes'].index(str(temp_row[node]))
                    prob = tables[node][p_idx, val_idx]
                    current_log_joint += np.log(prob + 1e-10)
                
                marginal_sum += np.exp(current_log_joint)
            
            # 3. Conditional Probability = Joint_Actual / Marginal_Evidence
            # Log(CP) = Log(Joint_Actual) - Log(Marginal)
            if marginal_sum > 0:
                log_prob_cond = log_joint_actual - np.log(marginal_sum)
                log_likelihood_sum += log_prob_cond
            else:
                log_likelihood_sum -= 100 # Penalty for zero probability
                
        return log_likelihood_sum

# ==========================================
# 2. The Simplex (Nelder-Mead) Optimizer 
# ==========================================
class NelderMeadOptimizer:
    def __init__(self, func, dim, max_iter=5, alpha=1.0, gamma=2.0, rho=0.5, sigma=0.5):
        self.func = func # The objective function (to MINIMIZE)
        self.dim = dim
        self.max_iter = max_iter
        # Standard coefficients
        self.alpha = alpha # Reflection
        self.gamma = gamma # Expansion
        self.rho = rho     # Contraction
        self.sigma = sigma # Shrink

    def optimize(self, start_point):
        print(f"--- Starting Simplex Optimization (Dim: {self.dim}) ---")
        
        # 1. Initialize Simplex: N+1 points
        simplex = [start_point]
        for i in range(self.dim):
            point = np.copy(start_point)
            # Perturb one dimension significantly enough to create a volume
            point[i] = point[i] + 0.05 
            simplex.append(point)
        
        # Evaluate all points initially
        scores = [(self.func(p), p) for p in simplex]
        scores.sort(key=lambda x: x[0])
        
        print(f"Initial Best Score: {-scores[0][0]:.4f} (CLL)")
        print(f"Initial Worst Score: {-scores[-1][0]:.4f} (CLL)")
        print("-" * 30)

        for it in range(self.max_iter):
            # Sort: Best (lowest score) to Worst
            scores.sort(key=lambda x: x[0])
            
            best_score, best_point = scores[0]
            worst_score, worst_point = scores[-1]
            second_worst_score = scores[-2][0]
            
            # Print status at start of iteration
            print(f"Iter {it+1}: Best: {-best_score:.4f} | Worst: {-worst_score:.4f}", end=" | ")

            # Calculate Centroid of all except worst
            points_matrix = np.array([x[1] for x in scores[:-1]])
            centroid = np.mean(points_matrix, axis=0)
            
            # --- Attempt Reflection ---
            xr = centroid + self.alpha * (centroid - worst_point)
            r_score = self.func(xr)
            
            if best_score <= r_score < second_worst_score:
                scores[-1] = (r_score, xr)
                print(f"Action: REFLECTION (Accepted). New Score: {-r_score:.4f}")
                continue

            # --- Attempt Expansion ---
            if r_score < best_score:
                xe = centroid + self.gamma * (xr - centroid)
                e_score = self.func(xe)
                
                if e_score < r_score:
                    scores[-1] = (e_score, xe)
                    print(f"Action: EXPANSION (Accepted). New Score: {-e_score:.4f}")
                else:
                    scores[-1] = (r_score, xr)
                    print(f"Action: EXPANSION (Reverted to Reflection). New Score: {-r_score:.4f}")
                continue
                
            # --- Attempt Contraction ---
            # If reflection was worse than the second worst point, try contracting
            xc = centroid + self.rho * (worst_point - centroid)
            c_score = self.func(xc)
            
            if c_score < worst_score:
                scores[-1] = (c_score, xc)
                print(f"Action: CONTRACTION (Accepted). New Score: {-c_score:.4f}")
                continue
            
            # --- Shrink ---
            # If all else fails, shrink the whole simplex towards the best point
            print(f"Action: SHRINK (Simplex Reduction)")
            new_scores = [(scores[0][0], scores[0][1])]
            for i in range(1, len(scores)):
                p = scores[0][1] + self.sigma * (scores[i][1] - scores[0][1])
                new_scores.append((self.func(p), p))
            scores = new_scores
            
        print("-" * 30)
        print(f"Optimization Finished. Best Score found: {-scores[0][0]:.4f}")
        return scores[0][1] # Return best point


In [3]:
if __name__ == "__main__":
    # Settings
    xml_file = "models/chessTAN.xml" 
    csv_file = "datasets/chess_data_3196.csv" 
    target_var = "Class"        
    
    # 1. Load BN Wrapper
    # Note: Ensure your XML and CSV have matching column names/outcomes
    bn = DiscriminativeBN(xml_file, target_var)
    
    # 2. Load Data
    # For demo purposes, create dummy data if file doesn't exist
    try:
        df = pd.read_csv(csv_file)
    except:
        print("CSV not found, creating dummy data for test...")
        # Create fake data matching the structure logic
        data_dict = {
            'x1': ['t', 'f', 't', 't'],
            'x2': ['f', 't', 'f', 'f'],
            'Class': ['won', 'nowin', 'won', 'won']
        }
        # Add other nodes if they exist in your XML
        for n in bn.nodes:
            if n not in data_dict:
                data_dict[n] = ['f'] * 4 
        df = pd.DataFrame(data_dict)

    # 3. Define Objective Function
    # We want to Maximize CLL, but Optimizer Minimizes. So minimize Negative CLL.
    def objective(betas):
        cll = bn.calculate_cll(betas, df)
        return -cll # Negative because we minimize

    # 4. Initialize Parameters (Betas)
    # Start with random small values close to 0 (implies roughly uniform probs)
    start_betas = np.random.normal(0, 0.1, bn.total_params)
    
    print(f"Starting Optimization with {bn.total_params} parameters...")
    
    # 5. Run Simplex
    optimizer = NelderMeadOptimizer(objective, dim=bn.total_params, max_iter=1000) # Low iters for demo
    best_betas = optimizer.optimize(start_betas)
    
    # 6. Result
    print("\nOptimization Complete.")
    final_cll = bn.calculate_cll(best_betas, df)
    print(f"Final Conditional Log Likelihood: {final_cll:.4f}")
    
    # Show one resulting table
    final_tables = bn.betas_to_probabilities(best_betas)
    print(f"\nLearned Table for {target_var}:")
    print(final_tables[target_var])

Starting Optimization with 290 parameters...
--- Starting Simplex Optimization (Dim: 290) ---
Initial Best Score: -2595.6080 (CLL)
Initial Worst Score: -2641.1760 (CLL)
------------------------------
Iter 1: Best: -2595.6080 | Worst: -2641.1760 | Action: EXPANSION (Accepted). New Score: -2575.2924
Iter 2: Best: -2575.2924 | Worst: -2635.8633 | Action: REFLECTION (Accepted). New Score: -2598.9636
Iter 3: Best: -2575.2924 | Worst: -2635.7199 | Action: REFLECTION (Accepted). New Score: -2598.7642
Iter 4: Best: -2575.2924 | Worst: -2635.0637 | Action: REFLECTION (Accepted). New Score: -2599.6242
Iter 5: Best: -2575.2924 | Worst: -2633.8663 | Action: REFLECTION (Accepted). New Score: -2600.5158
Iter 6: Best: -2575.2924 | Worst: -2633.6575 | Action: REFLECTION (Accepted). New Score: -2600.5172
Iter 7: Best: -2575.2924 | Worst: -2633.4047 | Action: REFLECTION (Accepted). New Score: -2600.6906
Iter 8: Best: -2575.2924 | Worst: -2632.2870 | Action: REFLECTION (Accepted). New Score: -2600.9716
I

In [4]:
final_tables

{'x1': array([[0.53329908, 0.46670092],
        [0.51498592, 0.48501408],
        [0.52617098, 0.47382902],
        [0.49041015, 0.50958985]]),
 'x2': array([[0.39775364, 0.60224636],
        [0.49547003, 0.50452997],
        [0.54815427, 0.45184573],
        [0.51311901, 0.48688099]]),
 'x3': array([[0.50874917, 0.49125083],
        [0.52477916, 0.47522084],
        [0.51068908, 0.48931092],
        [0.46887745, 0.53112255]]),
 'x4': array([[0.48486197, 0.51513803],
        [0.48994339, 0.51005661],
        [0.48997242, 0.51002758],
        [0.5379462 , 0.4620538 ]]),
 'x5': array([[0.51640856, 0.48359144],
        [0.50069394, 0.49930606],
        [0.54813606, 0.45186394],
        [0.49734513, 0.50265487]]),
 'x6': array([[0.53438849, 0.46561151],
        [0.49178851, 0.50821149],
        [0.5093142 , 0.4906858 ],
        [0.50312918, 0.49687082]]),
 'x7': array([[0.40000517, 0.59999483],
        [0.47107968, 0.52892032],
        [0.53214135, 0.46785865],
        [0.53899771, 0.46100

In [5]:
import pickle

with open("final_tablesChessTAN.pkl", "wb") as f:
    pickle.dump(final_tables, f)

In [5]:
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix

def predict_row(bn, tables, row):
    """
    Predicts the Class for a single row of data.
    bn: The DiscriminativeBN object (needed for structure/parents)
    tables: The dictionary of learned probabilities (final_tables)
    row: A dictionary or pandas Series containing the features (x1, x2, ...)
    """
    target = bn.target_col
    target_outcomes = bn.var_info[target]['outcomes'] # e.g., ['f', 't'] or ['nowin', 'won']
    
    best_class = None
    best_log_prob = -float('inf')
    
    # Try every possible class value
    for t_val in target_outcomes:
        # Create a temporary row with this class assumption
        temp_row = row.copy()
        temp_row[target] = t_val
        
        # Calculate Log Joint Probability for this assumption
        # Log P(All Variables) = Sum( Log P(Node | Parents) )
        current_log_prob = 0
        
        for node in bn.nodes:
            # 1. Identify which row in the probability table to look at (based on parents)
            p_idx = bn._get_parent_config_index(temp_row, node)
            
            # 2. Identify which column (outcome) to look at
            # We must handle cases where test data has values not seen in training
            try:
                val_idx = bn.var_info[node]['outcomes'].index(str(temp_row[node]))
            except ValueError:
                # Fallback for unseen values (rare, but good for safety)
                val_idx = 0 
            
            # 3. Get probability
            prob = tables[node][p_idx, val_idx]
            current_log_prob += np.log(prob + 1e-10) # Avoid log(0)
            
        # Keep track of the best class
        if current_log_prob > best_log_prob:
            best_log_prob = current_log_prob
            best_class = t_val
            
    return best_class

def calculate_accuracy(bn, tables, df):
    """
    Iterates over the entire DataFrame, predicts, and returns accuracy.
    """
    y_true = df[bn.target_col].astype(str).tolist()
    y_pred = []
    
    # Iterate over every row
    records = df.to_dict('records')
    for row in records:
        prediction = predict_row(bn, tables, row)
        y_pred.append(prediction)
        
    # Calculate metrics
    acc = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=bn.var_info[bn.target_col]['outcomes'])
    
    return acc, cm, y_pred

# ==========================================
# 3. Usage Example
# ==========================================

# Assuming you ran the optimization from the previous step:
# bn = DiscriminativeBN(...)
# best_betas = ... from optimizer
# final_tables = bn.betas_to_probabilities(best_betas)
# df = pd.read_csv("chess_data.csv")

print("\n--- Starting Evaluation ---")

# Run prediction
accuracy, conf_matrix, predictions = calculate_accuracy(bn, final_tables, df)

print(f"Accuracy: {accuracy * 100:.2f}%")
print("\nConfusion Matrix:")
print(conf_matrix)

# Example of a single prediction comparison
print(f"\nRow 0 Actual: {df.iloc[0][bn.target_col]}")
print(f"Row 0 Pred  : {predictions[0]}")


--- Starting Evaluation ---


NameError: name 'bn' is not defined

In [1]:
def extract_tables_from_model(pgmpy_model):
    """
    Extracts CPDs from a pgmpy model and formats them for the custom
    predict_row function.
    
    Returns:
        tables (dict): { node_name: numpy_array of shape (num_parent_configs, num_outcomes) }
    """
    tables = {}
    for node in pgmpy_model.nodes():
        try:
            cpd = pgmpy_model.get_cpds(node)
            
            # pgmpy stores values as (num_outcomes, parent_card_1, parent_card_2, ...)
            # We reshape it to (num_outcomes, total_parent_configs) to flatten parent dims
            # Then Transpose (.T) so rows = parent_configs and cols = outcomes
            tables[node] = cpd.values.reshape(cpd.variable_card, -1).T
            
        except ValueError:
            print(f"Warning: No CPD found for node {node}")
            tables[node] = None
            
    return tables

In [6]:
import pandas as pd
import numpy as np
from pgmpy.readwrite import XMLBIFReader
from sklearn.metrics import accuracy_score, confusion_matrix

# --- [Insert your DiscriminativeBN class and predict functions here] ---
# (I am assuming the class and functions you provided are defined above this block)

# ==========================================
# Main Execution Flow
# ==========================================

# 1. Configuration
xml_file = "models/chessTAN.xml"
test_data_file = "datasets/chess_data_3196.csv" 
target_variable = "Class"

# 2. Load the Model & Structure
print("Loading XML Model...")
bn_structure = DiscriminativeBN(xml_file, target_variable)

# 3. Extract the Parameters (The Tables)
# We use the internal pgmpy model from your class to get the values
print("Extracting Parameters...")
tables = extract_tables_from_model(bn_structure.model)

# 4. Load Data
# Assuming your data has headers matching the node names in the XML
print("Loading Data...")
# Example: reading a CSV. Ensure columns match your BN node names exactly.
df = pd.read_csv(test_data_file) 

# 5. Compute Accuracy
print("Computing Accuracy...")
acc, cm, predictions = calculate_accuracy(bn_structure, tables, df)

# 6. Results
print("\n" + "="*30)
print(f"Final Accuracy: {acc:.4f}")
print("="*30)
print("\nConfusion Matrix:")
print(cm)

# Optional: Show a few predictions
print("\nSample Predictions:")
print(df[[target_variable]].assign(Predicted=predictions).head())

Loading XML Model...
Extracting Parameters...
Loading Data...
Computing Accuracy...

Final Accuracy: 0.9224

Confusion Matrix:
[[1526  143]
 [ 105 1422]]

Sample Predictions:
  Class Predicted
0   won       won
1   won       won
2   won       won
3   won       won
4   won       won
