In [1]:
import pandas as pd
import numpy as np

In [2]:
def encode_data(df):
    encoded = df.copy()
    for col in df.columns:
        encoded[col] = encoded[col].factorize()[0]
    return encoded

In [3]:
# Load and preprocess data
train_df = pd.read_csv('mushroom_train.data', header=None)
test_df = pd.read_csv('mushroom_test.data', header=None)

train_encoded = encode_data(train_df)
test_encoded = encode_data(test_df)

X_train = train_encoded.iloc[:, 1:].values
y_train = train_encoded.iloc[:, 0].values
X_test = test_encoded.iloc[:, 1:].values
y_test = test_encoded.iloc[:, 0].values

n_features = X_train.shape[1]

In [4]:
cmi_matrix = np.zeros((n_features, n_features))
for i in range(n_features):
    for j in range(i + 1, n_features):
        cmi = 0
        for y in np.unique(y_train):
            mask = y_train == y
            x_i = X_train[mask, i]
            x_j = X_train[mask, j]

            joint = pd.crosstab(x_i, x_j).values / len(x_i)

            p_i = np.bincount(x_i) / len(x_i)
            p_j = np.bincount(x_j) / len(x_j)

            mi = 0
            for xi in range(joint.shape[0]):
                for xj in range(joint.shape[1]):
                    if joint[xi, xj] > 0:
                        mi += joint[xi, xj] * np.log(joint[xi, xj] / (p_i[xi] * p_j[xj] + 1e-10))
            cmi += (np.sum(mask) / len(y_train)) * mi
        cmi_matrix[i, j] = cmi_matrix[j, i] = cmi

In [5]:
def minimum_spanning_tree(neg_cmi):
    n = neg_cmi.shape[0]
    edges = []
    for i in range(n):
        for j in range(i + 1, n):
            if neg_cmi[i, j] != 0:
                edges.append((-neg_cmi[i, j], i, j))

    edges.sort(reverse=True)
    parent = list(range(n))

    def find(u):
        while parent[u] != u:
            parent[u] = parent[parent[u]]
            u = parent[u]
        return u

    mst = np.zeros((n, n))
    for weight, u, v in edges:
        root_u = find(u)
        root_v = find(v)
        if root_u != root_v:
            mst[u, v] = -weight
            parent[root_v] = root_u
    return mst

In [6]:
mst = minimum_spanning_tree(-cmi_matrix)

In [7]:
mi_with_y = []
for i in range(n_features):
    contingency = pd.crosstab(y_train, X_train[:, i]).values
    p_joint = contingency / contingency.sum()
    p_y = p_joint.sum(axis=1)
    p_x = p_joint.sum(axis=0)
    mi = np.sum(p_joint * np.log((p_joint + 1e-10) / (np.outer(p_y, p_x) + 1e-10)))
    mi_with_y.append(mi)
root = np.argmax(mi_with_y)

parent = np.full(n_features, -1)
visited = set([root])
queue = [root]

while queue:
    current = queue.pop(0)
    neighbors = np.where(mst[current] > 0)[0]
    for neighbor in neighbors:
        if neighbor not in visited:
            parent[neighbor] = current
            visited.add(neighbor)
            queue.append(neighbor)

In [8]:
cpt = {}
for feat in range(n_features):
    cpt[feat] = {}
    parent_feat = parent[feat]

    for y in np.unique(y_train):
        mask = y_train == y
        if parent_feat == -1:  # Root node depends only on Y
            counts = np.bincount(X_train[mask, feat], minlength=len(np.unique(X_train[:, feat])))
            cpt[feat][y] = (counts + 1) / (counts.sum() + len(np.unique(X_train[:, feat])))
        else:
            parent_vals = X_train[mask, parent_feat]
            unique_parents = np.unique(parent_vals)
            cpt[feat][y] = {}
            for p_val in unique_parents:
                sub_mask = parent_vals == p_val
                counts = np.bincount(X_train[mask, feat][sub_mask],
                                     minlength=len(np.unique(X_train[:, feat])))
                cpt[feat][y][p_val] = (counts + 1) / (counts.sum() + len(np.unique(X_train[:, feat])))

In [9]:
def predict(X):
    log_probs = []
    for y in np.unique(y_train):
        log_prob = np.log((np.sum(y_train == y) + 1) / (len(y_train) + len(np.unique(y_train))))

        for feat in range(n_features):
            if parent[feat] == -1:
                prob = cpt[feat][y][X[feat]]
            else:
                p_val = X[parent[feat]]
                num_categories = len(np.unique(X_train[:, feat]))
                default_prob = np.ones(num_categories) / num_categories
                prob = cpt[feat][y].get(p_val, default_prob)[X[feat]]

            log_prob += np.log(prob + 1e-10)

        log_probs.append(log_prob)
    return np.unique(y_train)[np.argmax(log_probs)]

In [10]:
y_pred = np.array([predict(x) for x in X_test])
accuracy = np.mean(y_pred == y_test)
print(f"Chow-Liu Accuracy: {accuracy:.4f}")

Chow-Liu Accuracy: 0.8949
