In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score

df = pd.read_csv("Adult_clean.csv")

# Make sure all features are strings (categorical)
for col in df.columns:
    df[col] = df[col].astype(str)

train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)


parents = {
    'age_bin': [],
    'education': ['age_bin'],
    'workclass': ['education'],
    'occupation': ['education'],
    'hours_bin': ['occupation', 'workclass'],
    'income': ['education', 'occupation', 'hours_bin']
}

nodes = list(parents.keys())


CPT = {}

def get_counts(subset, col):
    """Return a normalized probability table for a single variable."""
    counts = subset[col].value_counts()
    total = counts.sum()
    return {v: counts.get(v, 0) / total for v in subset[col].unique()}

for node in nodes:
    node_parents = parents[node]
    CPT[node] = {}

    if len(node_parents) == 0:
        # No parents → unconditional distribution
        CPT[node][None] = train_df[node].value_counts(normalize=True).to_dict()
    else:
        # With parents → need conditional distribution
        parent_vals = train_df[node_parents].drop_duplicates()

        for _, parent_row in parent_vals.iterrows():
            key = tuple(parent_row[p] for p in node_parents)
            subset = train_df
            for p in node_parents:
                subset = subset[subset[p] == parent_row[p]]

            if len(subset) == 0:
                continue

            dist = subset[node].value_counts(normalize=True).to_dict()
            CPT[node][key] = dist


def predict_row(row):
    """Compute P(income | evidence) via BN factorization."""

    # Test both income states
    income_states = list(CPT['income'][list(CPT['income'].keys())[0]].keys())
    scores = {}

    for income_state in income_states:
        prob = 1.0

        # Evaluate BN probability product
        for node in nodes:
            node_parents = parents[node]

            # income gets manually set to the tested state
            if node == 'income':
                node_value = income_state
            else:
                node_value = row[node]

            # Parent key
            if len(node_parents) == 0:
                key = None
            else:
                key = tuple(income_state if p=='income' else row[p] for p in node_parents)
            if key not in CPT[node] or node_value not in CPT[node][key]:
                prob *= 1e-9  # smoothing for missing combinations
            else:
                prob *= CPT[node][key][node_value]

        scores[income_state] = prob

    return max(scores, key=scores.get)


test_df['pred'] = test_df.apply(predict_row, axis=1)

acc = accuracy_score(test_df['income'], test_df['pred'])
cm = confusion_matrix(test_df['income'], test_df['pred'])

print("Accuracy:", acc)
print("\nConfusion matrix:")
print(cm)


Accuracy: 0.7908690756474562

Confusion matrix:
[[7018  396]
 [1647  708]]


In [4]:
import math

def posterior_income(row, CPT, parents, nodes):
    income_states = list(CPT['income'][list(CPT['income'].keys())[0]].keys())
    log_scores = {}

    for income_state in income_states:
        logp = 0.0

        for node in nodes:
            node_parents = parents[node]

            if node == 'income':
                node_value = income_state
            else:
                node_value = row[node]

            if len(node_parents) == 0:
                key = None
            else:
                key = tuple(
                    income_state if p == 'income' else row[p]
                    for p in node_parents
                )

            if key not in CPT[node] or node_value not in CPT[node][key]:
                logp += math.log(1e-9)
            else:
                logp += math.log(max(CPT[node][key][node_value], 1e-12))

        log_scores[income_state] = logp

    max_log = max(log_scores.values())
    exps = {k: math.exp(v - max_log) for k, v in log_scores.items()}
    Z = sum(exps.values())
    return {k: v / Z for k, v in exps.items()}

In [5]:
def postTest(filter_cond):
    subset = test_df[filter_cond(test_df)]
    probs = []

    for _, row in subset.iterrows():
        post = posterior_income(row, CPT, parents, nodes)
        probs.append(post['>50K'])

    return len(subset), sum(probs) / len(probs)

n_bach, p_bach = postTest(lambda df: df['education'] == 'Bachelors')
n_grad, p_grad = postTest(lambda df: df['education'] == 'Grad')
n_ft, p_ft = postTest(lambda df: df['hours_bin'] == 'Full-time')
n_ext, p_ext = postTest(lambda df: df['hours_bin'] == 'Extreme')
n_exec, p_exec = postTest(lambda df: df['occupation'] == 'Exec-managerial')

print("education=Bachelors:", n_bach, p_bach)
print("education=Grad:", n_grad, p_grad)
print("hours_bin=Full-time:", n_ft, p_ft)
print("hours_bin=Extreme:", n_ext, p_ext)
print("occupation=Exec-managerial:", n_exec, p_exec)


education=Bachelors: 1604 0.4171271607077439
education=Grad: 816 0.6129614051864863
hours_bin=Full-time: 5304 0.20246576558217527
hours_bin=Extreme: 337 0.36230667959652
occupation=Exec-managerial: 1233 0.4830993152903306
