# Loading Dataset

In [1]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")
# Load dataset
df = pd.read_csv("winequality-white.csv", sep=";")  # replace with actual filename

df

  from pandas.core import (


Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6
1,6.3,0.30,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6
2,8.1,0.28,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
...,...,...,...,...,...,...,...,...,...,...,...,...
4893,6.2,0.21,0.29,1.6,0.039,24.0,92.0,0.99114,3.27,0.50,11.2,6
4894,6.6,0.32,0.36,8.0,0.047,57.0,168.0,0.99490,3.15,0.46,9.6,5
4895,6.5,0.24,0.19,1.2,0.041,30.0,111.0,0.99254,2.99,0.46,9.4,6
4896,5.5,0.29,0.30,1.1,0.022,20.0,110.0,0.98869,3.34,0.38,12.8,7


In [2]:
df["quality"].value_counts()

quality
6    2198
5    1457
7     880
8     175
4     163
3      20
9       5
Name: count, dtype: int64

# target transform: quality -> {0,1,2}

In [3]:

df = pd.read_csv("winequality-white.csv", sep=";")

# 1a) 
def quality_to_class(q):
    if q < 5:
        return 0
    elif q in (5, 6):
        return 1
    else:
        return 2

df['quality'] = df['quality'].apply(quality_to_class)



# Z-score normalize all other attributes (exclude 'quality')

In [4]:

features = [c for c in df.columns if c != 'quality']
df_z = df[features].apply(lambda col: (col - col.mean()) / (col.std(ddof=0) if col.std(ddof=0) != 0 else 1.0))

# Discretize each normalized column into 4 equal-width bins indexed 0 to 3

In [5]:

def discretize_to_4_equal_width_bins(col):
    mn, mx = col.min(), col.max()
    if mx == mn:
        return pd.Series(np.zeros(len(col), dtype=int), index=col.index)
    width = (mx - mn) / 4.0
    idx = np.floor((col - mn) / width).astype(int)
    idx = np.clip(idx, 0, 3)
    return pd.Series(idx.values, index=col.index)

df_binned = pd.DataFrame({c: discretize_to_4_equal_width_bins(df_z[c]) for c in df_z.columns})
df_binned['quality'] = df['quality'].values

X = df_binned.drop('quality', axis=1).values.astype(int)
y = df_binned['quality'].values.astype(int)

print("Preprocessing done. X shape:", X.shape, "class dist:", np.bincount(y))

Preprocessing done. X shape: (4898, 11) class dist: [ 183 3655 1060]


# Custom ID3-like Decision Tree (discrete attributes) 

In [6]:

class Node:
    def __init__(self, attribute=None, children=None, label=None, samples_idx=None):
        self.attribute = attribute
        self.children = children or {}
        self.label = label
        self.samples_idx = samples_idx  

    def is_leaf(self):
        return self.label is not None

# Entropy and Gini calculation

In [7]:
def entropy(arr):
    vals, counts = np.unique(arr, return_counts=True)
    p = counts / counts.sum()
    return -np.sum(p * np.log2(p + 1e-12))

def gini(arr):
    vals, counts = np.unique(arr, return_counts=True)
    p = counts / counts.sum()
    return 1.0 - np.sum(p**2)

#  Best attribute (col index) and its gain Calculation

In [8]:
def best_attribute(X_all, y_all, samples_idx, candidate_attrs, criterion):
    
    X_sub = X_all[samples_idx]
    y_sub = y_all[samples_idx]
    if criterion == 'entropy':
        base_imp = entropy(y_sub)
    else:
        base_imp = gini(y_sub)
    best_gain = -np.inf
    best_attr = None
    for a in candidate_attrs:
        vals, counts = np.unique(X_sub[:, a], return_counts=True)
        weighted = 0.0
        for v, c in zip(vals, counts):
            idx = samples_idx[X_sub[:, a] == v]
            yv = y_all[idx]
            imp = entropy(yv) if criterion == 'entropy' else gini(yv)
            weighted += (len(yv) / len(y_sub)) * imp
        gain = base_imp - weighted
        if gain > best_gain:
            best_gain = gain
            best_attr = a
    return best_attr, best_gain

In [9]:
def build_tree(X_all, y_all, samples_idx=None, candidate_attrs=None, min_samples=10, criterion='entropy'):
    if samples_idx is None:
        samples_idx = np.arange(len(y_all))
    if candidate_attrs is None:
        candidate_attrs = list(range(X_all.shape[1]))

    y_sub = y_all[samples_idx]
    majority = Counter(y_sub).most_common(1)[0][0]

    # stopping conditions
    if len(y_sub) == 0:
        return Node(label=majority, samples_idx=samples_idx)
    if len(np.unique(y_sub)) == 1:
        return Node(label=y_sub[0], samples_idx=samples_idx)
    if len(y_sub) < min_samples:
        return Node(label=majority, samples_idx=samples_idx)
    if len(candidate_attrs) == 0:
        return Node(label=majority, samples_idx=samples_idx)

    attr, gain = best_attribute(X_all, y_all, samples_idx, candidate_attrs, criterion)
    if attr is None:
        return Node(label=majority, samples_idx=samples_idx)

    node = Node(attribute=attr, samples_idx=samples_idx)
    node.children = {}
    X_sub = X_all[samples_idx]
    for val in np.unique(X_sub[:, attr]):
        child_idx = samples_idx[X_sub[:, attr] == val]
        if child_idx.size == 0:
            node.children[val] = Node(label=majority, samples_idx=child_idx)
        else:
            new_attrs = [a for a in candidate_attrs if a != attr]
            node.children[val] = build_tree(X_all, y_all, samples_idx=child_idx, candidate_attrs=new_attrs, min_samples=min_samples, criterion=criterion)
    return node


In [10]:
def predict_one(node, x):
    while not node.is_leaf():
        a = node.attribute
        v = x[a]
        if v in node.children:
            node = node.children[v]
        else:
            # unseen value -> majority label among node.samples_idx
            return Counter(y[node.samples_idx]).most_common(1)[0][0]
    return node.label

def predict_batch(node, X_input):
    return np.array([predict_one(node, x) for x in X_input])

# Reducing error Pruning (custom tree)

In [11]:

def collect_nonleaf_nodes_postorder(node):
    nodes = []
    if node is None or node.is_leaf():
        return nodes
    for c in node.children.values():
        nodes.extend(collect_nonleaf_nodes_postorder(c))
    nodes.append(node)
    return nodes

def prune_custom_tree(root, X_val, y_val):
    root = copy.deepcopy(root)
    while True:
        changed = False
        nodes = collect_nonleaf_nodes_postorder(root)
        for node in nodes:
            # compute current accuracy on validation
            y_pred_before = predict_batch(root, X_val)
            acc_before = accuracy_score(y_val, y_pred_before)

            # try converting node to leaf with majority label among node.samples_idx
            maj = Counter(y[node.samples_idx]).most_common(1)[0][0]
            backup_children = node.children
            backup_attr = node.attribute
            backup_label = node.label

            node.children = {}
            node.attribute = None
            node.label = maj

            y_pred_after = predict_batch(root, X_val)
            acc_after = accuracy_score(y_val, y_pred_after)

            if acc_after >= acc_before:
                changed = True  # keep the pruning
            else:
                # revert
                node.children = backup_children
                node.attribute = backup_attr
                node.label = backup_label
        if not changed:
            break
    return root

# Reduced-error pruning for sklearn DecisionTreeClassifier

In [12]:

def get_node_sample_indices_sklearn(clf, X_train):
    node_indicator = clf.decision_path(X_train)  # sparse matrix
    n_nodes = clf.tree_.node_count
    samples_per_node = [ [] for _ in range(n_nodes) ]
    for i in range(X_train.shape[0]):
        node_ids = node_indicator[i].nonzero()[1]
        for nid in node_ids:
            samples_per_node[nid].append(i)
    return [np.array(li, dtype=int) for li in samples_per_node]


In [13]:
def prune_sklearn_tree_reduced_error(clf, X_val, y_val, X_train_for_nodes, y_train_for_nodes):
    # work on copy to be safe
    clf = copy.deepcopy(clf)
    tree = clf.tree_
    children_left = tree.children_left
    children_right = tree.children_right

    # postorder nodes
    postorder = []
    def recurse(n):
        if children_left[n] != -1:
            recurse(children_left[n])
        if children_right[n] != -1:
            recurse(children_right[n])
        postorder.append(n)
    recurse(0)

    # samples per node (indices into X_train_for_nodes)
    samples_per_node = get_node_sample_indices_sklearn(clf, X_train_for_nodes)

    changed_any = True
    while True:
        changed = False
        for node in reversed(postorder):
            # skip leaves
            if children_left[node] == -1 and children_right[node] == -1:
                continue

            acc_before = accuracy_score(y_val, clf.predict(X_val))

            # majority class for that node using y_train_for_nodes
            idx = samples_per_node[node]
            if idx.size == 0:
                maj = Counter(y_train_for_nodes).most_common(1)[0][0]
            else:
                maj = Counter(y_train_for_nodes[idx]).most_common(1)[0][0]

            # backup
            left_b, right_b = children_left[node], children_right[node]
            value_b = tree.value[node].copy()

            # mutate to leaf
            tree.children_left[node] = -1
            tree.children_right[node] = -1
            # set node.value to majority class counts (approx)
            # tree.value shape (n_nodes, 1, n_classes)
            class_idx = np.where(clf.classes_ == maj)[0][0]
            tree.value[node][0, :] = 0.0
            tree.value[node][0, class_idx] = 1.0

            acc_after = accuracy_score(y_val, clf.predict(X_val))

            if acc_after >= acc_before:
                changed = True  # keep change
            else:
                # revert
                tree.children_left[node] = left_b
                tree.children_right[node] = right_b
                tree.value[node] = value_b
        if not changed:
            break
        changed_any = changed_any or changed
    return clf


# Using full pipeline

In [14]:

import numpy as np
import pandas as pd
from collections import Counter
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.tree import DecisionTreeClassifier
import copy

# Parameters 
MIN_SAMPLES_SPLIT = 10
CV_FOLDS = 3
PRUNING_VAL_RATIO = 0.2       # inside each CV fold: 80% train, 20% val for pruning
RANDOM_STATE = 42

#  3-fold CV loop (train/prune/eval) 
kf = KFold(n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE)

results = {
    'ID3_entropy': {'acc': [], 'prec': [], 'rec': []},
    'ID3_gini': {'acc': [], 'prec': [], 'rec': []},
    'SK_entropy': {'acc': [], 'prec': [], 'rec': []},
    'SK_gini': {'acc': [], 'prec': [], 'rec': []}
}

fold = 0
for train_idx, test_idx in kf.split(X):
    fold += 1
    print(f"\n--- Fold {fold}/{CV_FOLDS} ---")
    X_train_full, X_test = X[train_idx], X[test_idx]
    y_train_full, y_test = y[train_idx], y[test_idx]

    # shuffle training before split for pruning
    rng = np.random.RandomState(RANDOM_STATE + fold)
    perm = rng.permutation(len(X_train_full))
    X_train_full = X_train_full[perm]
    y_train_full = y_train_full[perm]

    # split train -> train_main (80%) and val (20%) for pruning
    n_train_full = len(X_train_full)
    split_i = int((1 - PRUNING_VAL_RATIO) * n_train_full)
    X_tr_main = X_train_full[:split_i]
    y_tr_main = y_train_full[:split_i]
    X_val = X_train_full[split_i:]
    y_val = y_train_full[split_i:]

    print("sizes -> train_main:", X_tr_main.shape, "val:", X_val.shape, "test:", X_test.shape)

    # Custom ID3 (entropy)
    # build on the TRAIN MAIN portion; but build_tree expects samples_idx relative to full array X_all.
    # To avoid complexity, we will pass X_all = X_tr_main and y_all = y_tr_main and use full indices there.
    tree_ent = build_tree(X_tr_main, y_tr_main, samples_idx=np.arange(len(y_tr_main)), candidate_attrs=list(range(X.shape[1])), min_samples=MIN_SAMPLES_SPLIT, criterion='entropy')
    tree_ent_pruned = prune_custom_tree(tree_ent, X_val, y_val)
    y_pred_ent = predict_batch(tree_ent_pruned, X_test)
    results['ID3_entropy']['acc'].append(accuracy_score(y_test, y_pred_ent))
    results['ID3_entropy']['prec'].append(precision_score(y_test, y_pred_ent, average='macro', zero_division=0))
    results['ID3_entropy']['rec'].append(recall_score(y_test, y_pred_ent, average='macro', zero_division=0))
    print("ID3 entropy fold acc:", results['ID3_entropy']['acc'][-1])

    # Custom ID3 (gini)
    tree_gini = build_tree(X_tr_main, y_tr_main, samples_idx=np.arange(len(y_tr_main)), candidate_attrs=list(range(X.shape[1])), min_samples=MIN_SAMPLES_SPLIT, criterion='gini')
    tree_gini_pruned = prune_custom_tree(tree_gini, X_val, y_val)
    y_pred_gini = predict_batch(tree_gini_pruned, X_test)
    results['ID3_gini']['acc'].append(accuracy_score(y_test, y_pred_gini))
    results['ID3_gini']['prec'].append(precision_score(y_test, y_pred_gini, average='macro', zero_division=0))
    results['ID3_gini']['rec'].append(recall_score(y_test, y_pred_gini, average='macro', zero_division=0))
    print("ID3 gini fold acc:", results['ID3_gini']['acc'][-1])

    # sklearn DecisionTree (entropy) 
    clf_ent = DecisionTreeClassifier(criterion='entropy', min_samples_split=MIN_SAMPLES_SPLIT, random_state=RANDOM_STATE)
    clf_ent.fit(X_tr_main, y_tr_main)
    clf_ent_pruned = prune_sklearn_tree_reduced_error(clf_ent, X_val, y_val, X_tr_main, y_tr_main)
    y_pred_sk_ent = clf_ent_pruned.predict(X_test)
    results['SK_entropy']['acc'].append(accuracy_score(y_test, y_pred_sk_ent))
    results['SK_entropy']['prec'].append(precision_score(y_test, y_pred_sk_ent, average='macro', zero_division=0))
    results['SK_entropy']['rec'].append(recall_score(y_test, y_pred_sk_ent, average='macro', zero_division=0))
    print("SK entropy fold acc:", results['SK_entropy']['acc'][-1])

    # sklearn DecisionTree (gini) 
    clf_g = DecisionTreeClassifier(criterion='gini', min_samples_split=MIN_SAMPLES_SPLIT, random_state=RANDOM_STATE)
    clf_g.fit(X_tr_main, y_tr_main)
    clf_g_pruned = prune_sklearn_tree_reduced_error(clf_g, X_val, y_val, X_tr_main, y_tr_main)
    y_pred_sk_g = clf_g_pruned.predict(X_test)
    results['SK_gini']['acc'].append(accuracy_score(y_test, y_pred_sk_g))
    results['SK_gini']['prec'].append(precision_score(y_test, y_pred_sk_g, average='macro', zero_division=0))
    results['SK_gini']['rec'].append(recall_score(y_test, y_pred_sk_g, average='macro', zero_division=0))
    print("SK gini fold acc:", results['SK_gini']['acc'][-1])


print("\nCross-Validation Results (mean across folds) ")
for key in results:
    print(f"{key}: Accuracy={np.mean(results[key]['acc']):.4f}, Macro-Precision={np.mean(results[key]['prec']):.4f}, Macro-Recall={np.mean(results[key]['rec']):.4f}")



--- Fold 1/3 ---
sizes -> train_main: (2612, 11) val: (653, 11) test: (1633, 11)
ID3 entropy fold acc: 0.7672994488671158
ID3 gini fold acc: 0.7685241886099204
SK entropy fold acc: 0.7605633802816901
SK gini fold acc: 0.7617881200244948

--- Fold 2/3 ---
sizes -> train_main: (2612, 11) val: (653, 11) test: (1633, 11)
ID3 entropy fold acc: 0.7519902020820576
ID3 gini fold acc: 0.7513778322106552
SK entropy fold acc: 0.7372933251684017
SK gini fold acc: 0.7372933251684017

--- Fold 3/3 ---
sizes -> train_main: (2612, 11) val: (654, 11) test: (1632, 11)
ID3 entropy fold acc: 0.7720588235294118
ID3 gini fold acc: 0.7757352941176471
SK entropy fold acc: 0.7665441176470589
SK gini fold acc: 0.7714460784313726

Cross-Validation Results (mean across folds) 
ID3_entropy: Accuracy=0.7638, Macro-Precision=0.5039, Macro-Recall=0.3885
ID3_gini: Accuracy=0.7652, Macro-Precision=0.4987, Macro-Recall=0.3939
SK_entropy: Accuracy=0.7548, Macro-Precision=0.3788, Macro-Recall=0.3788
SK_gini: Accuracy=0.7