## + metrik evaluasi

In [2]:
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
from collections import Counter
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import silhouette_score
import warnings

In [None]:

def _impute_numeric(
    s: pd.Series,
    strategy: str = "median"
) -> pd.Series:
    if strategy == "median":
        return s.fillna(s.median())
    elif strategy == "mean":
        return s.fillna(s.mean())
    elif strategy == "zero":
        return s.fillna(0)
    else:
        return s

def _clip_outliers_quantile(
    s: pd.Series,
    q_low: float = 0.01,
    q_high: float = 0.99
) -> pd.Series:
    lo, hi = s.quantile(q_low), s.quantile(q_high)
    if pd.isna(lo) or pd.isna(hi):
        return s
    return s.clip(lo, hi)

def _consolidate_rare_categories(
    s: pd.Series,
    min_ratio: float = 0.01,
    other_label: str = "OTHER"
) -> pd.Series:
    counts = s.value_counts(dropna=False)
    total = counts.sum()
    rare = set(counts[counts / max(total, 1) <= min_ratio].index)
    return s.apply(lambda v: other_label if v in rare else v)

def _scale_numeric(
    df_num: pd.DataFrame,
    method: str = "minmax"
):
    if df_num.empty:
        return df_num, None
    if method == "standard":
        scaler = StandardScaler()
    else:
        scaler = MinMaxScaler()
    scaled = pd.DataFrame(
        scaler.fit_transform(df_num.values),
        columns=df_num.columns,
        index=df_num.index
    )
    return scaled, scaler

def preprocess_for_kmedoids_gower(
    df: pd.DataFrame,
    categorical_cols: List[str],
    numeric_cols: List[str],
    *,
    fill_num: str = "median",            
    fill_cat_label: str = "MISSING",     
    rare_thresh: float = 0.01,           
    other_label: str = "OTHER",
    clip_outliers: bool = True,
    q_low: float = 0.01,
    q_high: float = 0.99,
    scale_method: str = "minmax",        
    force_upper: bool = False,           
    strip_space: bool = True
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    dfp = df.copy()

    
    categorical_cols = [c for c in categorical_cols if c in dfp.columns]
    numeric_cols     = [c for c in numeric_cols if c in dfp.columns]

    
    if numeric_cols:
        for c in numeric_cols:
            dfp[c] = pd.to_numeric(dfp[c], errors="coerce")
            if fill_num != "none":
                dfp[c] = _impute_numeric(dfp[c], strategy=fill_num)
            if clip_outliers:
                dfp[c] = _clip_outliers_quantile(dfp[c], q_low=q_low, q_high=q_high)

        scaled_block, scaler = _scale_numeric(dfp[numeric_cols], method=scale_method)
        dfp[numeric_cols] = scaled_block.values
    else:
        scaler = None

    
    for c in categorical_cols:
        dfp[c] = dfp[c].astype("string") 
        if strip_space:
            dfp[c] = dfp[c].str.strip()
        if force_upper:
            dfp[c] = dfp[c].str.upper()
        dfp[c] = dfp[c].fillna(fill_cat_label)
        dfp[c] = dfp[c].replace({"": fill_cat_label})

        if rare_thresh is not None and rare_thresh > 0:
            dfp[c] = _consolidate_rare_categories(dfp[c], min_ratio=rare_thresh, other_label=other_label)

        
        dfp[c] = dfp[c].astype(object)

    artifacts = {
        "scaler": scaler,
        "numeric_cols": numeric_cols,
        "categorical_cols": categorical_cols,
        "scale_method": scale_method,
        "fill_num": fill_num,
        "fill_cat_label": fill_cat_label,
        "rare_thresh": rare_thresh,
        "other_label": other_label,
        "clip_outliers": clip_outliers,
        "q_low": q_low,
        "q_high": q_high,
        "force_upper": force_upper,
        "strip_space": strip_space,
    }
    return dfp, artifacts


def prepare_for_kmedoids_gower(
    df: pd.DataFrame,
    categorical_cols: Optional[List[str]] = None,
    numeric_cols: Optional[List[str]] = None,
    force_cat_as_str: bool = True,
    *,
    
    preprocess: bool = True,
    fill_num: str = "median",
    fill_cat_label: str = "MISSING",
    rare_thresh: float = 0.01,
    other_label: str = "OTHER",
    clip_outliers: bool = True,
    q_low: float = 0.01,
    q_high: float = 0.99,
    scale_method: str = "minmax",
    force_upper: bool = False,
    strip_space: bool = True,
) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """
    Mengembalikan:
      - df siap Gower (numerik sudah bersih & skala konsisten, kategori object)
      - daftar kolom kategori & numerik
    """
    df_local = df.copy()

    
    if categorical_cols is None:
        categorical_cols = df_local.select_dtypes(
            include=["object", "string", "category"]
        ).columns.tolist()
    if numeric_cols is None:
        numeric_cols = df_local.select_dtypes(include=[np.number]).columns.tolist()

    
    for c in df_local.columns:
        if c not in numeric_cols and c not in categorical_cols:
            continue
        if pd.api.types.is_integer_dtype(df_local[c]) and df_local[c].nunique(dropna=True) <= 20:
            if c in numeric_cols:
                numeric_cols.remove(c)
            if c not in categorical_cols:
                categorical_cols.append(c)

    
    if force_cat_as_str:
        for c in categorical_cols:
            if c in df_local.columns:
                df_local[c] = df_local[c].astype("string").astype(object)

    # preprocessing
    if preprocess:
        df_local, _art = preprocess_for_kmedoids_gower(
            df_local,
            categorical_cols=categorical_cols,
            numeric_cols=numeric_cols,
            fill_num=fill_num,
            fill_cat_label=fill_cat_label,
            rare_thresh=rare_thresh,
            other_label=other_label,
            clip_outliers=clip_outliers,
            q_low=q_low,
            q_high=q_high,
            scale_method=scale_method,
            force_upper=force_upper,
            strip_space=strip_space,
        )

    return df_local, categorical_cols, numeric_cols

def choose_k_by_elbow(k_vals: np.ndarray, costs: np.ndarray) -> int:
    k_vals = np.asarray(k_vals, dtype=float)
    costs = np.asarray(costs, dtype=float)

    # print per iterasi
    print("Iterasi K dan Cost:")
    for k, c in zip(k_vals, costs):
        print(f"  k = {k:.0f}, cost = {c}")
    print("-----------------------------------")

    if k_vals.size < 2:
        return int(k_vals[0])

    if k_vals.size >= 3:
        k_norm = (k_vals - k_vals.min()) / (k_vals.max() - k_vals.min() + 1e-12)
        c_norm = (costs - costs.min()) / (costs.max() - costs.min() + 1e-12)

        p1 = np.array([k_norm[0], c_norm[0]])
        p2 = np.array([k_norm[-1], c_norm[-1]])
        v = p2 - p1
        v_len = np.linalg.norm(v) + 1e-12

        pts = np.stack([k_norm, c_norm], axis=1)

        dists = np.abs(
            v[0] * (p1[1] - pts[:, 1]) - 
            v[1] * (p1[0] - pts[:, 0])
        ) / v_len

        
        dists[0] = -np.inf
        dists[-1] = -np.inf

        idx = int(np.nanargmax(dists))
        print(f"Elbow ditemukan pada k = {int(k_vals[idx])} dengan jarak = {dists[idx]}")
        return int(k_vals[idx])

    # pilih delta terbesar
    deltas = -np.diff(costs)
    idx = int(np.nanargmax(deltas))
    print(f"Elbow (2 titik) pada k = {int(k_vals[idx + 1])}")
    return int(k_vals[idx + 1])


# Gower Distance 
def _gower_numeric_block(X: np.ndarray, Y: np.ndarray, minv: np.ndarray, maxv: np.ndarray) -> np.ndarray:
    rng = np.where((maxv - minv) > 0, (maxv - minv), 1.0)
    diff = np.abs(X[:, None, :] - Y[None, :, :]) / rng
    return diff

def _gower_categorical_block(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
    eq = (X[:, None, :] == Y[None, :, :])
    return (~eq).astype(float)

def gower_distance_matrix(
    df: pd.DataFrame,
    cat_cols: List[str],
    num_cols: List[str],
    *,
    chunk_size: int = 1000
) -> np.ndarray:
    m = df.shape[0]
    P = len(cat_cols) + len(num_cols)
    if P == 0:
        raise ValueError("Tidak ada kolom untuk menghitung jarak.")

    num_data = df[num_cols].to_numpy(dtype=float) if num_cols else None
    cat_data = df[cat_cols].to_numpy(dtype=object) if cat_cols else None

    if num_cols:
        minv = np.nanmin(num_data, axis=0)
        maxv = np.nanmax(num_data, axis=0)

    D = np.zeros((m, m), dtype=float)
    for start in range(0, m, chunk_size):
        end = min(m, start + chunk_size)
        part = 0.0
        if num_cols:
            part += _gower_numeric_block(num_data[start:end, :], num_data, minv, maxv).sum(axis=2)
        if cat_cols:
            part += _gower_categorical_block(cat_data[start:end, :], cat_data).sum(axis=2)
        D[start:end, :] = part / float(P)

    D = (D + D.T) / 2.0
    np.fill_diagonal(D, 0.0)
    return D

# K-Medoids 

def _kmedoids_plus_plus_init(D: np.ndarray, k: int, rng: np.random.RandomState) -> List[int]:
    n = D.shape[0]
    centers = [rng.randint(0, n)]
    for _ in range(1, k):
        dmin = np.min(D[:, centers], axis=1)
        probs = dmin ** 2
        s = probs.sum()
        if s <= 0 or not np.isfinite(s):
            cand = [i for i in range(n) if i not in centers]
            centers.append(rng.choice(cand))
        else:
            probs = probs / s
            centers.append(rng.choice(np.arange(n), p=probs))
    return centers

def _assign_labels(D: np.ndarray, medoids: List[int]) -> Tuple[np.ndarray, np.ndarray]:
    # k= len(medoids); untuk tiap i, pilih medoid terdekat
    dist_to_medoids = D[:, medoids]  
    labels = np.argmin(dist_to_medoids, axis=1)
    dists = dist_to_medoids[np.arange(D.shape[0]), labels]
    return labels, dists

def _pam_swap(D: np.ndarray, medoids: List[int], labels: np.ndarray, cur_cost: float, max_iter: int) -> Tuple[List[int], np.ndarray, float]:
    n = D.shape[0]
    medoids = medoids.copy()
    k = len(medoids)
    for _ in range(max_iter):
        improved = False
        dist_to_medoids = D[:, medoids]  
        best = np.argmin(dist_to_medoids, axis=1)
        best_dist = dist_to_medoids[np.arange(n), best]
        second_dist = np.partition(dist_to_medoids + np.eye(k)[best] * 1e12, 0, axis=1)[:, 1]

        for mi_idx in range(k):
            mi = medoids[mi_idx]
            # kandidat non-medoid
            for h in range(n):
                if h in medoids:
                    continue
                d_ih = D[:, h]
                new_dist = np.where(
                    best == mi_idx,
                    np.minimum(second_dist, d_ih),
                    np.minimum(best_dist, d_ih)
                )
                new_cost = float(new_dist.sum())
                if new_cost + 1e-12 < cur_cost:
                    medoids[mi_idx] = h
                    labels = np.argmin(D[:, medoids], axis=1)
                    cur_cost = new_cost
                    improved = True
                    break
            if improved:
                break
        if not improved:
            break
    return medoids, labels, cur_cost

def kmedoids_fit(
    D: np.ndarray,
    k: int,
    n_init: int = 5,
    max_iter: int = 100,
    random_state: int = 42
) -> Tuple[np.ndarray, List[int], float]:
    """
    K-Medoids (PAM) pada matriks jarak precomputed D.
    Mengembalikan: labels (n,), medoids (list idx), total_cost (∑ jarak ke medoid).
    """
    rng_master = np.random.RandomState(random_state)
    n = D.shape[0]
    best_labels, best_medoids, best_cost = None, None, np.inf

    for run in range(n_init):
        rng = np.random.RandomState(rng_master.randint(0, 10**9))
        medoids = _kmedoids_plus_plus_init(D, k, rng)
        labels, dists = _assign_labels(D, medoids)
        cost = float(dists.sum())
        medoids, labels, cost = _pam_swap(D, medoids, labels, cost, max_iter=max_iter)
        if cost < best_cost:
            best_labels, best_medoids, best_cost = labels.copy(), medoids.copy(), float(cost)
    return best_labels, best_medoids, best_cost

# METRICS untuk medoids (Silhouette & DB)
def davies_bouldin_medoids(D: np.ndarray, labels: np.ndarray, medoids: List[int]) -> float:
    labels = np.asarray(labels)
    clusters = np.unique(labels)
    k = clusters.size
    if k < 2:
        return np.nan

    # S_i: rata2 jarak anggota cluster i ke medoid cluster i
    S = {}
    for i, cid in enumerate(clusters):
        idx = np.where(labels == cid)[0]
        if idx.size == 0:
            S[cid] = 0.0
            continue
        med = medoids[i]  
        S[cid] = float(D[idx[:, None], med].mean())

    # jarak antar medoid
    medoid_dmat = np.zeros((k, k), dtype=float)
    for i in range(k):
        for j in range(i + 1, k):
            medoid_dmat[i, j] = medoid_dmat[j, i] = float(D[medoids[i], medoids[j]])

    # DB
    R = []
    for i, ci in enumerate(clusters):
        vals = []
        for j, cj in enumerate(clusters):
            if i == j:
                continue
            Mij = medoid_dmat[i, j]
            if Mij <= 0:
                continue
            vals.append((S[ci] + S[cj]) / Mij)
        R.append(max(vals) if len(vals) else 0.0)
    return float(np.mean(R))

# Konfigurasi preprocess
PREPROCESS_OPTS = dict(
    preprocess=True,           
    scale_method="minmax",     
    clip_outliers=True,        
    q_low=0.01, q_high=0.99,   
    fill_num="median",         
    fill_cat_label="MISSING",  
    rare_thresh=0.01,          
    other_label="OTHER",
    force_upper=True,
    strip_space=True           
)



In [None]:
from openpyxl import Workbook 

# Main function (loop per sheet) — K-Medoids + Gower
processed_any = False
errors = []

# Konfigurasi
FILE_PATH = r"D:/DATA SKRIPSI/kontrak_sewa_bersih.xlsx"  
PERIOD_SHEETS = ["Monthly_Fixed", "Daily_Fixed"]  
CATEGORICAL_COLS = [
    "BusinessType","LeaseYearStart","LeaseMonthStart","LeaseDayStart",
    "LeaseYearEnd","LeaseMonthEnd","LeaseDayEnd","TranCode",
    "ContractPeriod","ContractType","Building","UnitArea","UnitFloor"
]
NUMERIC_COLS = ["BuildingArea","LeaseDurationDays","LeaseDurationMonths","n_subunit"]
K_RANGE = range(2, 7)
DROP_COLS = ["UnitNum"]

# batasan untuk perhitungan silhouette di dataset 
MAX_SILHOUETTE_SAMPLES = 2000
RANDOM_STATE_SIL = 42

# Proses tiap sheet (periode)
in_path = Path(FILE_PATH)
out_path = in_path.with_name(f"{in_path.stem}_clustered_kmedoids_gower{in_path.suffix}")

with pd.ExcelWriter(out_path, engine="openpyxl") as ew:
    for SHEET_NAME in PERIOD_SHEETS:
        try:
            print(f"\n=== Proses periode: {SHEET_NAME} ===")

            df_raw = pd.read_excel(FILE_PATH, sheet_name=SHEET_NAME)

            # validasi minimum
            if df_raw.shape[0] < 2:
                raise ValueError("Baris data < 2")

            # buang kolom tak terpakai
            df_raw = df_raw.drop(columns=[c for c in DROP_COLS if c in df_raw.columns], errors="ignore")

            # coerce numerik (awal) agar konsisten
            for col in NUMERIC_COLS:
                if col in df_raw.columns:
                    df_raw[col] = pd.to_numeric(df_raw[col], errors="coerce").astype(float)

            # pilih kolom yang tersedia di sheet
            CATEG_COLS_USED = [c for c in CATEGORICAL_COLS if c in df_raw.columns]
            NUMERIC_COLS_USED = [c for c in NUMERIC_COLS if c in df_raw.columns]

            if len(CATEG_COLS_USED) == 0 and len(NUMERIC_COLS_USED) == 0:
                raise ValueError("Tidak ada kolom dari daftar yang ditemukan")

            # PREP + PREPROCESS untuk Gower & K-Medoids 
            df_prep, cat_cols, num_cols = prepare_for_kmedoids_gower(
                df_raw,
                categorical_cols=CATEG_COLS_USED,
                numeric_cols=NUMERIC_COLS_USED,
                force_cat_as_str=True,
                **PREPROCESS_OPTS
            )

            if max(K_RANGE) >= df_prep.shape[0]:
                raise ValueError(f"Jumlah baris ({df_prep.shape[0]}) harus > max K ({max(K_RANGE)})")

            # Matriks jarak Gower (precomputed) 
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", category=RuntimeWarning)
                D = gower_distance_matrix(df_prep, cat_cols=cat_cols, num_cols=num_cols, chunk_size=1000)

            # Cari K (Elbow) lewat biaya K-Medoids (total jarak ke medoid) 
            elbow_rows = []
            for k in K_RANGE:
                labels_k, medoids_k, cost_k = kmedoids_fit(D, k=int(k), n_init=10, max_iter=100, random_state=42)
                elbow_rows.append({"k": int(k), "cost": float(cost_k)})

            elbow_tbl = pd.DataFrame(elbow_rows).sort_values("k").reset_index(drop=True)
            FINAL_K = choose_k_by_elbow(elbow_tbl["k"].to_numpy(), elbow_tbl["cost"].to_numpy())

            # Model final dengan K terpilih 
            labels, medoids, total_cost = kmedoids_fit(D, k=int(FINAL_K), n_init=15, max_iter=200, random_state=42)

            # cek kluster kecil (untuk silhouette)
            counts = np.bincount(labels, minlength=FINAL_K)
            if np.any(counts < 2):
                print(f"[INFO] Ada kluster berisi <2 baris: {counts}. Silhouette mungkin di-skip.")

            # simpan hasil klaster
            df_out = df_raw.copy()
            df_out["cluster"] = labels

            # METRICS 
            try:
                n_rows = df_prep.shape[0]
                if n_rows > MAX_SILHOUETTE_SAMPLES:
                    rng = np.random.RandomState(RANDOM_STATE_SIL)
                    sample_idx = np.sort(rng.choice(n_rows, size=MAX_SILHOUETTE_SAMPLES, replace=False))
                    D_samp = D[np.ix_(sample_idx, sample_idx)]
                    labels_samp = np.asarray(labels)[sample_idx]
                else:
                    D_samp = D
                    labels_samp = np.asarray(labels)

                ok_for_sil = np.all(np.bincount(labels_samp, minlength=FINAL_K) >= 2)
                if ok_for_sil:
                    sil_score = silhouette_score(D_samp, labels_samp, metric='precomputed')
                else:
                    sil_score = np.nan
                    print("[INFO] Silhouette di-skip (ada kluster sample berisi <2).")
            except Exception as e:
                sil_score = np.nan
                print(f"[WARN] Gagal hitung Silhouette: {e}")

            try:
                uniq = np.unique(labels)
                medoids_ordered = []
                for cid in uniq:
                    idx = np.where(labels == cid)[0]
                    sums = D[np.ix_(idx, idx)].sum(axis=1)
                    medoids_ordered.append(idx[int(np.argmin(sums))])
                db_index = davies_bouldin_medoids(D, labels, medoids_ordered)
            except Exception as e:
                db_index = np.nan
                print(f"[WARN] Gagal hitung Davies–Bouldin: {e}")

            # Tulis output
            df_out.to_excel(ew, index=False, sheet_name=f"{SHEET_NAME}_clustered")
            pd.DataFrame({
                "Sheet": [SHEET_NAME],
                "FINAL_K": [FINAL_K],
                "Silhouette": [sil_score],
                "DaviesBouldin": [db_index],
                "TotalCost": [total_cost],
                "Rows": [df_prep.shape[0]]
            }).to_excel(ew, index=False, sheet_name=f"{SHEET_NAME}_metrics")

            medoid_rows = pd.DataFrame({"medoid_row_index": medoids})
            medoid_rows.to_excel(ew, index=False, sheet_name=f"{SHEET_NAME}_medoids")

            if "CuryUnitPrice" in df_raw.columns:
                df_raw.reset_index()[["index", "CuryUnitPrice"]].rename(columns={"index": "RowID"}).to_excel(
                    ew, index=False, sheet_name=f"{SHEET_NAME}_CuryUnitPrice"
                )

            processed_any = True

        except Exception as e:
            msg = f"{SHEET_NAME}: {type(e).__name__} - {e}"
            print("[ERROR]", msg)
            errors.append({"sheet": SHEET_NAME, "error": msg})

    if not processed_any:
        pd.DataFrame({"status": ["No valid sheets processed"]}).to_excel(ew, index=False, sheet_name="error_info")
    elif errors:
        pd.DataFrame(errors).to_excel(ew, index=False, sheet_name="error_info")

print("Selesai. Output:", out_path)



=== Proses periode: Monthly_Fixed ===
Iterasi K dan Cost:
  k = 2, cost = 1756.1488568428522
  k = 3, cost = 1623.8029381144336
  k = 4, cost = 1572.9048612737229
  k = 5, cost = 1534.9092339698827
  k = 6, cost = 1495.2415679936444
-----------------------------------
Elbow ditemukan pada k = 3 dengan jarak = 0.18190510699396664

=== Proses periode: Daily_Fixed ===
Iterasi K dan Cost:
  k = 2, cost = 1156.4646102343377
  k = 3, cost = 1097.7486848397896
  k = 4, cost = 1047.942763358115
  k = 5, cost = 1006.0227164036347
  k = 6, cost = 970.6829485674536
-----------------------------------
Elbow ditemukan pada k = 4 dengan jarak = 0.0594934792891184
Selesai. Output: D:\DATA SKRIPSI\kontrak_sewa_bersih_clustered_kmedoids_gower.xlsx
