In [None]:
import pandas as pd
import numpy as np
import re
from scipy.stats import entropy
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import StandardScaler

In [None]:
DATA_PATH = "../data/champions_group_data.csv"

df_raw = pd.read_csv(DATA_PATH)
df_raw_copy = df_raw.copy()

df_raw.shape, df_raw.columns[:10]

  df_raw = pd.read_csv(DATA_PATH)


((8559, 72),
 Index(['DUNS Number ', 'Company Sites', 'Website', 'Address Line 1', 'City',
        'State', 'State Or Province Abbreviation', 'Postal Code', 'Country',
        'Phone Number'],
       dtype='object'))

In [None]:
def is_abbreviation(text: str) -> bool:
    if not isinstance(text, str):
        return False
    t = text.strip()
    if not t:
        return False

    # 1) Áü≠ÁöÑÂÖ®Â§ßÂÜô token
    if t.isupper() and len(t) <= 12:
        return True

    # 2) Â§ßÂÜôÊØî‰æãÈ´ò‰∏îÁü≠
    uppercase_ratio = sum(c.isupper() for c in t) / max(1, len(t))
    if uppercase_ratio > 0.6 and len(t) <= 18:
        return True

    # 3) code-likeÔºàÂ§ßÂÜô+Êï∞Â≠ó+Á¨¶Âè∑Ôºâ
    if re.match(r"^[A-Z0-9][A-Z0-9\-\./]*$", t) and len(t) <= 20:
        return True

    return False


In [None]:
PSEUDO_MISSING = {"": pd.NA, "na": pd.NA, "n/a": pd.NA,
                  "none": pd.NA, "null": pd.NA, "-": pd.NA}

# Âú∞ÁêÜÂ≠óÊÆµÔºöTitle Case
GEO_COLUMNS = {
    "country",
    "country name",
    "country/region",
    "parent country",
    "parent country/region",
    "global ultimate country",
    "global ultimate country name",
    "region",
    "city"
}

def is_abbreviation(text: str) -> bool:
    if not isinstance(text, str):
        return False
    t = text.strip()
    if not t:
        return False
    if t.isupper() and len(t) <= 12:
        return True
    uppercase_ratio = sum(c.isupper() for c in t) / max(1, len(t))
    if uppercase_ratio > 0.6 and len(t) <= 18:
        return True
    if re.match(r"^[A-Z0-9][A-Z0-9\-\./]*$", t) and len(t) <= 20:
        return True
    return False

def normalize_franchise_status(v):
    """Áªü‰∏Ä Franchise StatusÔºöyes/no"""
    if not isinstance(v, str):
        return v
    t = v.strip().lower()
    if t in {"yes", "y", "true", "1"}:
        return "yes"
    if t in {"no", "n", "false", "0"}:
        return "no"
    return t  # ÂÖ∂‰ªñ‰øùÊåÅÂ∞èÂÜô

def clean_preserve_abbrev(df: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame(df).copy()

    # ÂéüÂßã object Âàó
    obj_cols = df.select_dtypes(include="object").columns.tolist()

    # (1) whitespace + pseudo missing
    for col in obj_cols:
        s = df[col].astype("string")
        s = s.str.strip()
        s = s.str.replace(r"\s+", " ", regex=True)
        s = s.replace(PSEUDO_MISSING, regex=False)
        df[col] = s

    # (2) safe numeric coercionÔºàÊòéÊòæÊòØÊï∞Â≠óÊâçËΩ¨Ôºâ
    for col in obj_cols:
        sample = df[col].dropna().astype(str).head(100)
        if sample.empty:
            continue
        numeric_like_ratio = sample.str.match(r"^-?\d+(\.\d+)?$").mean()
        if numeric_like_ratio > 0.85:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # (3) case normalization
    # ËøôÈáå‰∏ÄÂÆöË¶ÅÂåÖÂê´ ["object", "string"]ÔºåÂê¶Âàô Country Á≠âÂàó‰ºöÊºèÊéâ
    case_cols = df.select_dtypes(include=["object", "string"]).columns.tolist()

    for col in case_cols:
        s = df[col]
        non_null = s.dropna()
        if non_null.empty:
            continue

        col_lc = col.lower().strip()

        # 3A: Âú∞ÁêÜÂ≠óÊÆµ ‚Üí Âº∫Âà∂ Title Case
        if col_lc in GEO_COLUMNS:
            df[col] = s.apply(lambda v: v.title() if isinstance(v, str) else v)
            continue

        # üîß 3B: Franchise Status ‚Üí Áªü‰∏Ä yes/noÔºàÂ∞èÂÜôÔºâ
        if col_lc == "franchise status":
            df[col] = s.apply(normalize_franchise_status)
            continue

        # üîß 3C: SIC Description ‚Üí Áªü‰∏Ä Title Case
        if col_lc == "sic description":
            df[col] = s.apply(lambda v: v.title() if isinstance(v, str) else v)
            continue

        # 3D: ÂÖ∂‰ªñÁü≠Á±ªÂà´Â≠óÊÆµ ‚Üí lowercase + abbreviation ‰øùÊä§
        str_non_null = non_null[non_null.apply(lambda x: isinstance(x, str))]
        if str_non_null.empty:
            continue

        avg_len = str_non_null.map(len).mean()
        max_words = (str_non_null.str.count(" ").max() or 0) + 1

        if avg_len <= 30 and max_words <= 3:
            new_vals = []
            for v in s:
                if pd.isna(v):
                    new_vals.append(v)
                elif isinstance(v, str):
                    new_vals.append(v if is_abbreviation(v) else v.lower())
                else:
                    new_vals.append(v)
            df[col] = pd.Series(new_vals, index=df.index, dtype="string")

    return df

# ÈáçÊñ∞‰ªéÂéüÂßãÂ§á‰ªΩ cleanÔºà‰∏çË¶ÅÂú®Êóß df_clean ‰∏äÂè†Âä†Ôºâ
df_clean = clean_preserve_abbrev(df_raw_copy)

In [None]:
audit_rows = []
for col in df_raw_copy.columns:
    if df_raw_copy[col].dtype == "object":
        before = df_raw_copy[col].astype("string")
        after  = df_clean[col].astype("string")
        changed = ((before.fillna("<<NA>>") != after.fillna("<<NA>>"))).sum()
        audit_rows.append([col, int(changed), float(changed / len(df_raw_copy))])

audit = pd.DataFrame(audit_rows, columns=["column", "cells_changed_count", "cells_changed_ratio"]) \
          .sort_values("cells_changed_ratio", ascending=False)

audit.head(15)


Unnamed: 0,column,cells_changed_count,cells_changed_ratio
8,Entity Type,8559,1.0
36,Company Status (Active/Inactive),8559,1.0
10,SIC Description,8556,0.999649
6,Country,8556,0.999649
20,Parent Country/Region,8556,0.999649
27,Global Ultimate Country Name,8556,0.999649
15,Parent Company,8523,0.995794
28,Domestic Ultimate Company,8518,0.99521
21,Global Ultimate Company,8518,0.99521
18,Parent State/Province,8418,0.983526


In [None]:
def information_density(col: pd.Series) -> float:
    non_null = col.dropna()
    if non_null.empty or non_null.nunique() <= 1:
        return 0.0
    probs = non_null.value_counts(normalize=True)
    H = entropy(probs)
    H_norm = H / np.log(len(probs)) if len(probs) > 1 else 0.0
    return float((1 - col.isna().mean()) * H_norm)

summary = pd.DataFrame({
    "dtype": df_clean.dtypes.astype(str),
    "missing_ratio": df_clean.isna().mean(),
    "n_unique": df_clean.nunique(dropna=True),
})
summary["IDS"] = df_clean.apply(information_density)

summary.sort_values("IDS", ascending=False).head(10)


Unnamed: 0,dtype,missing_ratio,n_unique,IDS
DUNS Number,int64,0.0,8559,1.0
Company Sites,string,0.0,8557,0.99999
Parent Company,string,0.0,7476,0.984701
Domestic Ultimate Company,string,0.0,7203,0.979771
Global Ultimate Company,string,0.0,7201,0.979698
Parent Street Address,string,0.012735,7339,0.971538
Address Line 1,string,0.040075,8156,0.95954
Entity Type,string,0.0,3,0.952594
Domestic Ultimate Street Address,string,0.03879,6986,0.94338
Global Ultimate Street Address,string,0.039958,6967,0.941937


In [None]:
numeric_cols = df_clean.select_dtypes(include="number").columns.tolist()
missing_structure = {}

for col in df_clean.columns:
    mr = df_clean[col].isna().mean()
    if mr < 0.10 or len(numeric_cols) == 0:
        missing_structure[col] = 0.0
        continue

    miss_flag = df_clean[col].isna().astype(int)
    mi_scores = []

    for nc in numeric_cols:
        valid = df_clean[nc].notna()
        if valid.sum() <= 50:
            continue
        try:
            binned = pd.qcut(df_clean.loc[valid, nc], q=5, duplicates="drop")
            mi_scores.append(mutual_info_score(miss_flag.loc[valid], binned))
        except:
            continue

    missing_structure[col] = float(max(mi_scores)) if mi_scores else 0.0

summary["missing_structure_MI"] = pd.Series(missing_structure)
summary.sort_values("missing_structure_MI", ascending=False).head(10)


Unnamed: 0,dtype,missing_ratio,n_unique,IDS,missing_structure_MI
No. of Routers,string,0.385442,4,0.105415,0.364904
Phone Number,float64,0.804416,1670,0.195559,0.303823
Ownership Type,string,0.126417,5,0.011801,0.248285
No. of Servers,string,0.391284,3,0.005972,0.221281
No. of Storage Devices,string,0.401098,4,0.04851,0.216307
ANZSIC Code,float64,0.833392,136,0.130571,0.201196
ANZSIC Description,string,0.833158,137,0.130736,0.200499
Is Headquarters,float64,0.828601,2,0.09754,0.18865
NACE Rev 2 Description,string,0.823344,177,0.13834,0.179933
ISIC Rev 4 Description,string,0.82311,147,0.137654,0.179612


In [None]:
redundant = set()
if len(numeric_cols) >= 2:
    corr = df_clean[numeric_cols].corr().abs()
    for i in range(len(corr.columns)):
        for j in range(i):
            if corr.iloc[i, j] > 0.95:
                redundant.add(corr.columns[i])

summary["redundant"] = summary.index.isin(redundant)
summary[summary["redundant"]].head(20)


Unnamed: 0,dtype,missing_ratio,n_unique,IDS,missing_structure_MI,redundant
Employees Total,int64,0.0,247,0.553791,0.0,True
8-Digit SIC Code,float64,0.620283,445,0.299423,0.065698,True
Ticker,float64,0.999533,4,0.000467,0.002335,True
Longitude,float64,0.776609,326,0.179177,0.146916,True
Parent Postal Code,float64,0.022783,1583,0.779617,0.0,True
Global Ultimate Postal Code,Int64,0.056665,1563,0.758436,0.0,True
Domestic Ultimate Postal Code,float64,0.046618,1581,0.766902,0.0,True
Registration Number,Float64,0.301087,5976,0.698882,0.135618,True
Is Domestic Ultimate,float64,0.82311,2,0.094748,0.179612,True
IT spend,int64,0.0,1935,0.589199,0.0,True


In [None]:
def decide(row):
    if row["n_unique"] <= 1:
        return "DROP: constant"

    # ÊûÅÈ´òÁº∫Â§± + Âá†‰πéÊó†‰ø°ÊÅØ + Áº∫Â§±Êó†ÁªìÊûÑ ‚Üí Êâç‰ºö drop
    if (row["missing_ratio"] > 0.85) and (row["IDS"] < 0.05) and (row["missing_structure_MI"] < 0.01):
        return "DROP: no information"

    # ÂÜó‰ΩôÂàó ‚Üí dropÔºàÊàñÂêéÁª≠Êåë‰∏Ä‰∏™‰øùÁïôÔºâ
    if bool(row["redundant"]):
        return "DROP: redundant"

    # Áº∫Â§±ÊúâÁªìÊûÑ ‚Üí keepÔºàÁº∫Â§±Êú¨Ë∫´Â∞±ÊòØ‰ø°Âè∑Ôºâ
    if row["missing_structure_MI"] > 0.02:
        return "KEEP: informative missingness"

    # ‰ø°ÊÅØÂØÜÂ∫¶È´ò ‚Üí keep
    if row["IDS"] > 0.15:
        return "KEEP: informative content"

    return "CONDITIONAL: review"

decision_table = summary.reset_index().rename(columns={"index": "column"}).copy()
decision_table["decision"] = summary.apply(decide, axis=1).values

decision_table["decision"].value_counts()


decision
KEEP: informative content        33
KEEP: informative missingness    18
DROP: redundant                   9
DROP: constant                    6
CONDITIONAL: review               5
DROP: no information              1
Name: count, dtype: int64

In [None]:
CLEAN_OUT = "cleaned_base.csv"
AUDIT_OUT = "cleaning_audit_changes.xlsx"
DECISION_OUT = "ids_decision_table_no_dictionary.xlsx"

df_clean.to_csv(CLEAN_OUT, index=False)
audit.to_excel(AUDIT_OUT, index=False)
decision_table.to_excel(DECISION_OUT, index=False)