In [4]:
import time as tm
import numpy as num_py
import pandas as pan_lib
from sklearn.model_selection import StratifiedShuffleSplit as LayeredSplit

C_CYAN = '\033[96m'
C_GREEN = '\033[92m'
C_YELLOW = '\033[93m'
C_RED = '\033[91m'
C_BOLD = '\033[1m'
C_END = '\033[0m'

FEAT_SRC = "data.csv"
TARG_SRC = "label.csv"

data_arr = pan_lib.read_csv(FEAT_SRC, header=None).values.astype(num_py.float64)
target_vec = pan_lib.read_csv(TARG_SRC, header=None).values.squeeze().astype(int)

n_grp = num_py.unique(target_vec).size
pos_adj = max(0.0, -data_arr.min())
data_adj = data_arr + pos_adj
TOL = 1e-12

def calc_vec_len(mat: num_py.ndarray) -> num_py.ndarray:
    """Calculates row-wise vector norms."""
    return num_py.sqrt((mat * mat).sum(axis=1, keepdims=True)) + TOL

def pdist_sq_euc(m1: num_py.ndarray, m2: num_py.ndarray) -> num_py.ndarray:
    """Pairwise squared Euclidean distance."""
    m1_sq = (m1 * m1).sum(axis=1, keepdims=True)
    m2_sq = (m2 * m2).sum(axis=1)[None, :]
    dist_matrix = m1_sq + m2_sq - 2.0 * (m1 @ m2.T)
    return num_py.maximum(dist_matrix, 0.0)

def pdist_cos_dist(m1: num_py.ndarray, m2: num_py.ndarray) -> num_py.ndarray:
    """Pairwise 1 - Cosine Similarity."""
    m1_norm = m1 / calc_vec_len(m1)
    m2_norm = m2 / calc_vec_len(m2)
    sim_matrix = m1_norm @ m2_norm.T
    sim_matrix = num_py.clip(sim_matrix, -1.0, 1.0)
    dist_matrix = 1.0 - sim_matrix
    return num_py.clip(dist_matrix, 0.0, 2.0)

def pdist_gen_jac(m1: num_py.ndarray, m2: num_py.ndarray) -> num_py.ndarray:
    """Pairwise 1 - Generalized Jaccard."""
    n_rows, k_rows = m1.shape[0], m2.shape[0]
    res_mat = num_py.empty((n_rows, k_rows), dtype=num_py.float64)
    for idx in range(k_rows):
        numerator = num_py.minimum(m1, m2[idx:idx+1]).sum(axis=1) + TOL
        denominator = num_py.maximum(m1, m2[idx:idx+1]).sum(axis=1) + TOL
        res_mat[:, idx] = 1.0 - (numerator / denominator)
    res_mat = num_py.clip(res_mat, 0.0, 1.0)
    res_mat[~num_py.isfinite(res_mat)] = 0.0
    return res_mat

def get_dist_mat(m1: num_py.ndarray, m2: num_py.ndarray, m_type: str) -> num_py.ndarray:
    """Dispatcher for pairwise distance calculation."""
    if m_type == "euclidean":
        d_mat = pdist_sq_euc(m1, m2)
    elif m_type == "cosine":
        d_mat = pdist_cos_dist(m1, m2)
    elif m_type == "jaccard":
        d_mat = pdist_gen_jac(m1, m2)
    else:
        raise ValueError("m_type must be 'euclidean', 'cosine', or 'jaccard'")
    d_mat[~num_py.isfinite(d_mat)] = 0.0
    return num_py.maximum(d_mat, 0.0)

def update_centers(d_arr: num_py.ndarray, assigns: num_py.ndarray, n_grp: int,
                     m_type: str, rand_gen: num_py.random.Generator) -> num_py.ndarray:
    """Recomputes cluster centers based on current assignments."""
    n_feat = d_arr.shape[1]
    centers = num_py.empty((n_grp, n_feat), dtype=num_py.float64)
    for grp_idx in range(n_grp):
        grp_mask = (assigns == grp_idx)
        if not num_py.any(grp_mask):
            centers[grp_idx] = d_arr[rand_gen.integers(0, d_arr.shape[0])]
            continue
        if m_type == "cosine":
            vec_sum = d_arr[grp_mask].sum(axis=0)
            centers[grp_idx] = vec_sum / (num_py.linalg.norm(vec_sum) + TOL)
        else:
            centers[grp_idx] = d_arr[grp_mask].mean(axis=0)
    return centers

def init_centers_pp(d_arr: num_py.ndarray, n_grp: int, m_type: str, 
                      rand_gen: num_py.random.Generator) -> num_py.ndarray:
    """Initializes centers using K-Means++."""
    n_pts, n_dim = d_arr.shape
    if n_grp > n_pts:
        raise ValueError(f"n_grp={n_grp} cannot exceed n_pts={n_pts}.")
    
    centers = num_py.empty((n_grp, n_dim), dtype=num_py.float64)
    
    first_idx = rand_gen.integers(0, n_pts)
    centers[0] = d_arr[first_idx]
    
    min_dists = get_dist_mat(d_arr, centers[:1], m_type).squeeze()
    min_dists = num_py.maximum(min_dists, 0.0)
    
    for i_center in range(1, n_grp):
        dist_probs = num_py.maximum(min_dists.copy(), 0.0)
        prob_sum = dist_probs.sum()
        
        if not num_py.isfinite(prob_sum) or prob_sum <= 0.0:
            next_idx = rand_gen.integers(0, n_pts)
        else:
            dist_probs = dist_probs / prob_sum
            dist_probs = num_py.maximum(dist_probs, 0.0)
            s_check = dist_probs.sum()
            if s_check <= 0.0 or not num_py.isfinite(s_check):
                next_idx = rand_gen.integers(0, n_pts)
            else:
                dist_probs = dist_probs / s_check
                next_idx = rand_gen.choice(n_pts, p=dist_probs)
        
        centers[i_center] = d_arr[next_idx]
        new_dists = get_dist_mat(d_arr, centers[i_center:i_center+1], m_type).squeeze()
        new_dists = num_py.maximum(new_dists, 0.0)
        min_dists = num_py.minimum(min_dists, new_dists)
        
    return centers

def run_cluster_algo(d_arr: num_py.ndarray,
                       n_grp: int,
                       m_type: str = "euclidean",
                       max_loops: int = 200,
                       rand_seed: int = 0,
                       stop_on_no_move: bool = True,
                       stop_on_obj_rise: bool = True) -> dict:
    """Performs K-Means clustering."""
    rand_gen = num_py.random.default_rng(rand_seed)
    centers = init_centers_pp(d_arr, n_grp, m_type, rand_gen)
    
    obj_history = []
    start_t = tm.time()
    loop_num = 0
    
    for loop_num in range(1, max_loops + 1):
        dist_mat = get_dist_mat(d_arr, centers, m_type)
        assignments = dist_mat.argmin(axis=1)
        
        curr_obj = float(dist_mat[num_py.arange(d_arr.shape[0]), assignments].sum())
        obj_history.append(curr_obj)
        
        new_centers = update_centers(d_arr, assignments, n_grp, m_type, rand_gen)
        
        is_converged = stop_on_no_move and num_py.allclose(new_centers, centers, rtol=1e-7, atol=1e-9)
        is_worse = stop_on_obj_rise and (loop_num > 1) and (obj_history[-1] > obj_history[-2] + TOL)
        
        centers = new_centers
        
        if is_converged or is_worse:
            break
            
    end_t = tm.time()
    
    return {
        "centers": centers,
        "assignments": assignments,
        "final_obj": obj_history[-1],
        "obj_hist": obj_history,
        "total_loops": loop_num,
        "exec_time": end_t - start_t,
    }

def get_cluster_label_map(assigns: num_py.ndarray, targets: num_py.ndarray, n_grp: int) -> dict:
    """Maps cluster index to the majority true label within that cluster."""
    label_map = {}
    for grp_idx in range(n_grp):
        grp_mask = (assigns == grp_idx)
        if not num_py.any(grp_mask):
            label_map[grp_idx] = None
        else:
            uniq_labels, label_counts = num_py.unique(targets[grp_mask], return_counts=True)
            label_map[grp_idx] = uniq_labels[label_counts.argmax()]
    return label_map

def calc_in_sample_acc(assigns: num_py.ndarray, targets: num_py.ndarray, n_grp: int) -> float:
    """Calculates in-sample accuracy using the majority map."""
    c_map = get_cluster_label_map(assigns, targets, n_grp)
    pred_vec = num_py.array([c_map[lab] for lab in assigns])
    valid_mask = pred_vec != None
    return float((pred_vec[valid_mask] == targets[valid_mask]).mean())

def assign_clusters(new_data: num_py.ndarray, centers: num_py.ndarray, m_type: str) -> num_py.ndarray:
    """Assigns new data points to the closest cluster centers."""
    dist_mat = get_dist_mat(new_data, centers, m_type)
    return dist_mat.argmin(axis=1)

R_STATE = 42
dist_types = ["euclidean", "cosine", "jaccard"]
data_map = {"euclidean": data_arr, "cosine": data_arr, "jaccard": data_adj}

res1_list = []
for m_name in dist_types:
    run_result = run_cluster_algo(data_map[m_name], n_grp, m_type=m_name, max_loops=200, rand_seed=R_STATE)
    res1_list.append({"dist_metric": m_name, "final_obj_val": run_result["final_obj"]})

res1_df = pan_lib.DataFrame(res1_list).sort_values("final_obj_val")
print(f"\n{C_CYAN}{C_BOLD}Q1: Objective by Method (Lower is Better){C_END}")
print(res1_df.to_string(index=False))

res2_in_list = []
for m_name in dist_types:
    run_result = run_cluster_algo(data_map[m_name], n_grp, m_type=m_name, max_loops=200, rand_seed=R_STATE)
    accuracy_val = calc_in_sample_acc(run_result["assignments"], target_vec, n_grp)
    res2_in_list.append({"dist_metric": m_name, "in_sample_acc": accuracy_val})

res2_in_df = pan_lib.DataFrame(res2_in_list).sort_values("in_sample_acc", ascending=False)

splitter = LayeredSplit(n_splits=1, test_size=0.2, random_state=R_STATE)
(tr_idx, te_idx) = next(splitter.split(data_arr, target_vec))

data_train, target_train = data_arr[tr_idx], target_vec[tr_idx]
data_test, target_test = data_arr[te_idx], target_vec[te_idx]

pos_adj_train = max(0.0, -data_train.min())
data_train_adj = data_train + pos_adj_train  
data_test_adj = data_test + pos_adj_train

train_data_map = {"euclidean": data_train, "cosine": data_train, "jaccard": data_train_adj}
test_data_map = {"euclidean": data_test, "cosine": data_test, "jaccard": data_test_adj}

res2_pred_list = []
for m_name in dist_types:
    run_result = run_cluster_algo(train_data_map[m_name], n_grp, m_type=m_name, max_loops=200, rand_seed=R_STATE)
    c_map = get_cluster_label_map(run_result["assignments"], target_train, n_grp)
    
    test_assigns = assign_clusters(test_data_map[m_name], run_result["centers"], m_name)
    pred_labels = num_py.array([c_map.get(lbl, None) for lbl in test_assigns])
    
    valid_mask = pred_labels != None
    accuracy_val = float((pred_labels[valid_mask] == target_test[valid_mask]).mean())
    res2_pred_list.append({"dist_metric": m_name, "predict_acc": accuracy_val})

res2_pred_df = pan_lib.DataFrame(res2_pred_list).sort_values("predict_acc", ascending=False)

print(f"\n{C_GREEN}{C_BOLD}Q2: In-Sample Majority Vote Accuracy{C_END}")
print(res2_in_df.to_string(index=False))
print(f"\n{C_YELLOW}{C_BOLD}Q2: Predictive Accuracy (80/20 Split){C_END}")
print(res2_pred_df.to_string(index=False))

res3_list = []
for m_name in dist_types:
    run_result = run_cluster_algo(
        data_map[m_name], n_grp, m_type=m_name, max_loops=500, rand_seed=R_STATE,
        stop_on_no_move=True, stop_on_obj_rise=True
    )
    res3_list.append({
        "dist_metric": m_name, 
        "loop_count": run_result["total_loops"], 
        "time_elapsed": run_result["exec_time"]
    })

res3_df = pan_lib.DataFrame(res3_list).sort_values("loop_count", ascending=False)
print(f"\n{C_RED}{C_BOLD}Q3: Runtime and Iteration Analysis (Early Stop){C_END}")
print(res3_df.to_string(index=False))


[96m[1mQ1: Objective by Method (Lower is Better)[0m
dist_metric  final_obj_val
     cosine   2.456332e+03
    jaccard   6.445167e+03
  euclidean   2.532360e+10

[92m[1mQ2: In-Sample Majority Vote Accuracy[0m
dist_metric  in_sample_acc
     cosine         0.6139
  euclidean         0.6018
    jaccard         0.4480

[93m[1mQ2: Predictive Accuracy (80/20 Split)[0m
dist_metric  predict_acc
     cosine       0.6155
  euclidean       0.6105
    jaccard       0.5455

[91m[1mQ3: Runtime and Iteration Analysis (Early Stop)[0m
dist_metric  loop_count  time_elapsed
     cosine          99      9.920131
  euclidean          32      2.401999
    jaccard           2      0.903490


In [5]:
term_criteria = [
    ("no-change",          dict(stop_on_no_move=True,  stop_on_obj_rise=False, max_loops=500)),
    ("objective-increase", dict(stop_on_no_move=False, stop_on_obj_rise=True,  max_loops=500)),
    ("max-iter",           dict(stop_on_no_move=False, stop_on_obj_rise=False, max_loops=100)),
]

res4_list = []
for name, config in term_criteria:
    for m_name in dist_types:
        run_result = run_cluster_algo(
            data_map[m_name], n_grp, m_type=m_name, rand_seed=R_STATE,
            max_loops=config["max_loops"],
            stop_on_no_move=config["stop_on_no_move"],
            stop_on_obj_rise=config["stop_on_obj_rise"],
        )
        res4_list.append({
            "term_rule": name,
            "dist_metric": m_name,
            "final_obj_val": run_result["final_obj"],
            "loop_count": run_result["total_loops"],
        })

res4_df = pan_lib.DataFrame(res4_list).sort_values(["term_rule", "final_obj_val"])

print(f"\n{C_CYAN}{C_BOLD}Q4: Termination Rule Comparison{C_END}")
print(res4_df.to_string(index=False))


[96m[1mQ4: Termination Rule Comparison[0m
         term_rule dist_metric  final_obj_val  loop_count
          max-iter      cosine   2.456342e+03         100
          max-iter     jaccard   6.017834e+03         100
          max-iter   euclidean   2.532360e+10         100
         no-change      cosine   2.456339e+03         105
         no-change     jaccard   6.017834e+03          30
         no-change   euclidean   2.532360e+10          32
objective-increase      cosine   2.456332e+03          99
objective-increase     jaccard   6.445167e+03           2
objective-increase   euclidean   2.532360e+10         500
