KaggleViznet_95CI_StatAudit_UniverseAndSample.xlsx

In [19]:
import math
import re
from datetime import datetime

import pandas as pd


# ---------------------------------------------------------
# 1. Clean VizNet-style names in column "name"
# ---------------------------------------------------------

def clean_viznet_name(raw_name: object) -> object:
    """
    Normalise VizNet / SATO names such as:
      0_1438...json.gz_481-APC USB cable schematic pi_ZFCQZC7DYMRKS2PV

    Keep only the useful part between '.gz_' and the last '_CODE', and
    also strip trailing '(CODE)' patterns.

    Examples:
      "0_...json.gz_481-APC USB cable schematic pi_ZFCQZC7DYMRKS2PV"
        -> "481-APC USB cable schematic pi"

      "iTunes - Music - Cosmic L (TQWNRUTARQEBIXOH)"
        -> "iTunes - Music - Cosmic L"
    """
    if not isinstance(raw_name, str):
        return raw_name

    name = raw_name.strip()

    # Case 1 - CC-MAIN pattern with ".gz_####-Title_CODE"
    m = re.search(r"\.gz_(\d+)-(.+)", name)
    if m:
        num = m.group(1)
        rest = m.group(2)

        # Remove final underscore code (e.g. _ZFCQZC7DYMRKS2PV)
        rest = re.sub(r"_[A-Z0-9]{10,}$", "", rest).strip()
        cleaned = f"{num}-{rest}"
    else:
        cleaned = name

    # Case 2 - remove trailing "(CODE)" used in some titles
    cleaned = re.sub(r"\s*\([A-Z0-9]{10,}\)\s*$", "", cleaned).strip()

    return cleaned


# ---------------------------------------------------------
# 2. Abbreviation usage
# ---------------------------------------------------------

def compute_abbrev_used(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create boolean column 'abbrev_used' based on any non-null
    abbreviation-log columns present in the DataFrame:
      - abbrev_clean
      - abbrev_desc
      - abbrev_clean_analysis
    """
    abbrev_cols = [
        col for col in ["abbrev_clean", "abbrev_desc", "abbrev_clean_analysis"]
        if col in df.columns
    ]

    if abbrev_cols:
        df["abbrev_used"] = df[abbrev_cols].notna().any(axis=1)
    else:
        df["abbrev_used"] = False

    return df


# ---------------------------------------------------------
# 3. Original token statistics (now with numeric-only flag)
# ---------------------------------------------------------

def add_original_token_stats(df: pd.DataFrame) -> pd.DataFrame:
    """
    Analyse the ORIGINAL column header (column 'Original Column') and compute:

      - orig_token_count: number of alphabetic tokens after removing the
        numeric prefix "NN.".
      - orig_single_token: True when there is exactly one alphabetic token.
      - orig_zero_token: True when there are no alphabetic tokens at all
        (years like 2003, 1991/92, symbols like %, #, 1977.0, etc).
      - orig_numeric_only: True when the tail has at least one digit and
        no letters (pure numeric labels such as 2003, 1977.0, 1991/92).

    This separates:
      - well-formed word headers,
      - numeric-only headers, and
      - symbol-only or mixed headers.
    """

    def analyse_original(original: object):
        if not isinstance(original, str):
            return 0, False, False, False

        # Remove leading "NN." or "NN.   "
        m = re.match(r"\s*\d+\.\s*(.*)", original)
        tail = m.group(1) if m else original

        # Alphabetic tokens for orig_token_count
        letter_tokens = re.findall(r"[A-Za-z]+", tail)
        token_count = len(letter_tokens)

        has_alpha = bool(re.search(r"[A-Za-z]", tail))
        has_digit = bool(re.search(r"\d", tail))

        orig_single = token_count == 1
        orig_zero = token_count == 0
        # numeric-only: at least one digit and no letters
        orig_numeric_only = has_digit and (not has_alpha)

        return token_count, orig_single, orig_zero, orig_numeric_only

    analysed = df["Original Column"].apply(analyse_original)

    df["orig_token_count"] = analysed.apply(lambda t: t[0])
    df["orig_single_token"] = analysed.apply(lambda t: t[1])
    df["orig_zero_token"] = analysed.apply(lambda t: t[2])
    df["orig_numeric_only"] = analysed.apply(lambda t: t[3])

    return df


# ---------------------------------------------------------
# 4. Trivial single-token detection on CleanedColumn
# ---------------------------------------------------------

def is_trivial_single_token(row: pd.Series) -> bool:
    """
    Mark row as trivial_single_token when:

      - CleanedColumn has exactly 1 token;
      - no abbreviation was used;
      - CleanedColumn, SourceKeyword and ColumnKeyword are identical
        (case-insensitive);
      - FinalFormat equals ColumnFormat.

    Typical examples:
      3. Name  -> name
      2. Type  -> categorical
      1. ID    -> IDcolumn

    Note: this is based on the cleaned header, not on the original text.
          We separately track whether the original header had a single
          token (orig_single_token) and whether it was numeric-only
          (orig_numeric_only).
    """
    cleaned = str(row.get("CleanedColumn", "")).strip()
    if not cleaned or pd.isna(row.get("FinalFormat")):
        return False

    tokens = cleaned.split()
    if len(tokens) != 1:
        return False

    cleaned_l = cleaned.lower()

    source_kw = (
        str(row.get("SourceKeyword", "")).strip().lower()
        if pd.notna(row.get("SourceKeyword"))
        else ""
    )
    col_kw = (
        str(row.get("ColumnKeyword", "")).strip().lower()
        if pd.notna(row.get("ColumnKeyword"))
        else ""
    )

    ff = row.get("FinalFormat")
    col_fmt = row.get("ColumnFormat")

    abbrev_used = bool(row.get("abbrev_used", False))
    if abbrev_used:
        return False

    cond_keywords = bool(source_kw) and bool(col_kw) and (
        cleaned_l == source_kw == col_kw
    )
    cond_formats = pd.notna(ff) and pd.notna(col_fmt) and (str(ff) == str(col_fmt))

    return cond_keywords and cond_formats


def add_trivial_flag(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add boolean column 'trivial_single_token' to the DataFrame
    based on CleanedColumn and the rules in is_trivial_single_token.
    """
    df["trivial_single_token"] = df.apply(is_trivial_single_token, axis=1)
    return df


# ---------------------------------------------------------
# 5. Sample size for finite population
# ---------------------------------------------------------

def compute_sample_size(
    N: int,
    z: float = 1.96,
    e: float = 0.02,
    p: float = 0.5
) -> int:
    """
    Finite population sample size for a proportion with
    confidence level z, margin of error e, and true proportion p.
    """
    if N <= 0:
        return 0

    n0 = (z ** 2) * p * (1 - p) / (e ** 2)
    n = (N * n0) / (N + n0 - 1)
    return int(round(n))


# ---------------------------------------------------------
# 6. Stratified sampling by area + abbrev_used (non-trivial)
# ---------------------------------------------------------

def build_stratified_sample_by_area_abbrev(
    df_nontrivial: pd.DataFrame,
    random_state: int = 42,
    z: float = 1.96,
    e: float = 0.02,
    p: float = 0.5,
):
    """
    Given the non-trivial universe:

      - Build stratum = "area_with_abbrev" or "area_no_abbrev".
      - Compute total sample size at 95 percent confidence and 2 percent error.
      - Allocate sample size proportionally across strata.
      - Return:
          sample_df   -> concatenated stratified sample
          strata_info -> population and sample counts per stratum
    """
    df = df_nontrivial.copy()

    N = len(df)
    if N == 0:
        raise ValueError("No non-trivial rows available for sampling.")

    n_total = compute_sample_size(N, z=z, e=e, p=p)

    df["stratum"] = df["area"].astype(str) + "_" + df["abbrev_used"].map(
        {True: "with_abbrev", False: "no_abbrev"}
    )

    strata_pop = (
        df.groupby("stratum")
        .size()
        .rename("population_n")
        .reset_index()
        .sort_values("population_n", ascending=False)
    )

    strata_pop["sample_n"] = (
        strata_pop["population_n"] * n_total / N
    ).round().astype(int)

    # ensure at least 1 sample per non-empty stratum
    mask_zero = (strata_pop["population_n"] > 0) & (strata_pop["sample_n"] == 0)
    strata_pop.loc[mask_zero, "sample_n"] = 1

    # adjust rounding to match total n_total
    diff = n_total - strata_pop["sample_n"].sum()
    while diff != 0 and len(strata_pop) > 0:
        if diff > 0:
            idx = strata_pop["population_n"].idxmax()
            strata_pop.at[idx, "sample_n"] += 1
            diff -= 1
        else:
            mask_can_reduce = strata_pop["sample_n"] > 1
            if not mask_can_reduce.any():
                break
            idx = strata_pop.loc[mask_can_reduce, "population_n"].idxmax()
            strata_pop.at[idx, "sample_n"] -= 1
            diff += 1

    samples = []
    for _, row in strata_pop.iterrows():
        stratum = row["stratum"]
        k = int(row["sample_n"])
        sub = df[df["stratum"] == stratum]

        if k >= len(sub):
            samp = sub.copy()
        else:
            samp = sub.sample(n=k, random_state=random_state)

        samples.append(samp)

    sample_df = pd.concat(samples).sort_index()

    return sample_df, strata_pop


# ---------------------------------------------------------
# 7. Area-level summary
# ---------------------------------------------------------

def build_area_summary(
    df_nontrivial: pd.DataFrame,
    sample_df: pd.DataFrame,
) -> pd.DataFrame:
    """
    Build an area-level table with:

      - area_population_n
      - area_sample_n
      - pop_abbrev_true / pop_abbrev_false
      - sample_abbrev_true / sample_abbrev_false
    """
    pop_area = (
        df_nontrivial.groupby("area")
        .agg(
            area_population_n=("area", "size"),
            pop_abbrev_true=("abbrev_used", lambda x: int(x.sum())),
            pop_abbrev_false=("abbrev_used", lambda x: int((~x).sum())),
        )
        .reset_index()
    )

    samp_area = (
        sample_df.groupby("area")
        .agg(
            area_sample_n=("area", "size"),
            sample_abbrev_true=("abbrev_used", lambda x: int(x.sum())),
            sample_abbrev_false=("abbrev_used", lambda x: int((~x).sum())),
        )
        .reset_index()
    )

    area_summary = pop_area.merge(samp_area, on="area", how="left").fillna(0)

    int_cols = [
        "area_population_n",
        "pop_abbrev_true",
        "pop_abbrev_false",
        "area_sample_n",
        "sample_abbrev_true",
        "sample_abbrev_false",
    ]
    area_summary[int_cols] = area_summary[int_cols].astype(int)

    return area_summary


# ---------------------------------------------------------
# 8A. Formats summary (population vs sample)
# ---------------------------------------------------------

def build_formats_summary(
    df_nontrivial: pd.DataFrame,
    sample_df: pd.DataFrame,
) -> pd.DataFrame:
    """
    Build a FinalFormat-level table with both population and sample distributions.

    Output columns:
      - FinalFormat
      - population_n
      - population_percent
      - sample_n
      - sample_percent
      - percent_diff (sample_percent - population_percent)

    Notes:
      - Percent columns are in percentage points (0-100), not proportions.
      - Missing formats in either side are filled with 0.
    """
    pop_total = int(len(df_nontrivial))
    samp_total = int(len(sample_df))

    pop_counts = (
        df_nontrivial["FinalFormat"]
        .dropna()
        .astype(str)
        .value_counts()
        .rename("population_n")
        .reset_index()
        .rename(columns={"index": "FinalFormat"})
    )

    samp_counts = (
        sample_df["FinalFormat"]
        .dropna()
        .astype(str)
        .value_counts()
        .rename("sample_n")
        .reset_index()
        .rename(columns={"index": "FinalFormat"})
    )

    summary = pop_counts.merge(samp_counts, on="FinalFormat", how="outer").fillna(0)

    summary["population_n"] = summary["population_n"].astype(int)
    summary["sample_n"] = summary["sample_n"].astype(int)

    summary["population_percent"] = (
        summary["population_n"] / pop_total * 100.0
        if pop_total > 0 else 0.0
    )
    summary["sample_percent"] = (
        summary["sample_n"] / samp_total * 100.0
        if samp_total > 0 else 0.0
    )
    summary["percent_diff"] = summary["sample_percent"] - summary["population_percent"]

    # Sort by population size (descending), then by FinalFormat for stability
    summary = summary.sort_values(
        by=["population_n", "FinalFormat"],
        ascending=[False, True]
    ).reset_index(drop=True)

    return summary


# ---------------------------------------------------------
# 8B. Ensure coverage of all FinalFormats in the sample
# ---------------------------------------------------------

def enforce_format_coverage(
    sample_df: pd.DataFrame,
    df_nontrivial: pd.DataFrame,
    random_state: int = 42,
):
    """
    Ensure that every FinalFormat that appears in the non-trivial universe
    also appears at least once in the sample.

    If some formats are missing, top up by adding 1 row for each missing format
    (without replacement from the universe).

    Returns:
      sample_with_formats: final sample after top-up
      added_rows: DataFrame of rows added by the coverage step (can be empty)
      missing_formats: list[str] of formats that were missing before top-up
    """
    sample = sample_df.copy()

    universe_formats = (
        df_nontrivial["FinalFormat"]
        .dropna()
        .astype(str)
        .unique()
        .tolist()
    )

    present_counts = (
        sample["FinalFormat"]
        .dropna()
        .astype(str)
        .value_counts()
    )

    missing_formats = [fmt for fmt in universe_formats if present_counts.get(fmt, 0) == 0]

    added_rows_list = []
    for fmt in missing_formats:
        candidates = df_nontrivial[df_nontrivial["FinalFormat"].astype(str) == fmt]
        candidates = candidates.loc[~candidates.index.isin(sample.index)]  # avoid duplicates
        if len(candidates) > 0:
            extra = candidates.sample(n=1, random_state=random_state)
            added_rows_list.append(extra)

    if added_rows_list:
        added_rows = pd.concat(added_rows_list)
        sample = pd.concat([sample, added_rows])
    else:
        added_rows = pd.DataFrame(columns=sample.columns)

    return sample, added_rows, missing_formats


# ---------------------------------------------------------
# 9. Main pipeline
# ---------------------------------------------------------

def main():
    input_file = "AnalysedColumnsKaggleViznetFiltered.xlsx"
    output_file = "KaggleViznet_95CI_StatAudit_UniverseAndSample.xlsx"

    # 9.1 load data
    df = pd.read_excel(input_file)
    print(f"Total rows in file: {len(df)}")

    # 9.2 clean VizNet-style names in 'name'
    if "name" in df.columns:
        df["name"] = df["name"].apply(clean_viznet_name)

    # 9.3 restrict to rows with non-null FinalFormat
    df_universe = df[df["FinalFormat"].notna()].copy()
    print(f"Universe (FinalFormat not NaN): {len(df_universe)}")

    # 9.4 abbreviation flag
    df_universe = compute_abbrev_used(df_universe)

    # 9.5 original token stats (includes orig_numeric_only)
    df_universe = add_original_token_stats(df_universe)

    # 9.6 trivial flag based on CleanedColumn
    df_universe = add_trivial_flag(df_universe)

    # 9.7 define "trivial_strict":
    #     - trivial_single_token (CleanedColumn)
    #     - AND (orig_single_token OR orig_numeric_only)
    #       -> includes pure numeric headers (years, 1991/92, etc)
    #          but keeps symbol-only headers (#, %, %+/-) as non-trivial.
    df_universe["trivial_strict"] = (
        df_universe["trivial_single_token"]
        & (df_universe["orig_single_token"] | df_universe["orig_numeric_only"])
    )

    # 9.8 split strict trivial and non-trivial
    df_trivial_strict = df_universe[df_universe["trivial_strict"]].copy()
    df_nontrivial = df_universe[~df_universe["trivial_strict"]].copy()

    # For information: trivial_single_token but original multi-token
    mask_triv_cleaned_orig_multi = (
        df_universe["trivial_single_token"]
        & (~df_universe["orig_single_token"])
        & (~df_universe["orig_numeric_only"])
    )
    df_trivial_cleaned_orig_multi = df_universe[mask_triv_cleaned_orig_multi].copy()

    print(f"Strict trivial (Cleaned single token + orig word or numeric-only): {len(df_trivial_strict)}")
    print(f"trivial_single_token but original multi-token (non-trivial): {len(df_trivial_cleaned_orig_multi)}")
    print(f"Non-trivial rows for sampling: {len(df_nontrivial)}")

    # 9.9 stratified sample on non-trivial rows
    sample_df_raw, strata_info = build_stratified_sample_by_area_abbrev(df_nontrivial)
    print(f"Raw stratified sample size (before coverage correction): {len(sample_df_raw)}")

    # 9.10 formats summary before enforcing coverage
    formats_summary_before = build_formats_summary(df_nontrivial, sample_df_raw)

    # 9.11 enforce FinalFormat coverage
    sample_df, added_rows, missing_formats = enforce_format_coverage(sample_df_raw, df_nontrivial)
    print(f"Final sample size (after coverage correction): {len(sample_df)}")
    print(f"Rows added by coverage correction: {len(added_rows)}")
    if missing_formats:
        print("Formats added by coverage correction:")
        print(", ".join(missing_formats))

    # 9.12 formats summary after enforcing coverage
    formats_summary_after = build_formats_summary(df_nontrivial, sample_df)

    # 9.13 area summary
    area_summary = build_area_summary(df_nontrivial, sample_df)

    # 9.14 write outputs to Excel
    with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
        sample_df.to_excel(writer, sheet_name="Nontrivial_Sample", index=False)
        df_nontrivial.to_excel(writer, sheet_name="Nontrivial_Universe", index=False)
        df_trivial_strict.to_excel(writer, sheet_name="Trivial_Universe", index=False)
        df_trivial_cleaned_orig_multi.to_excel(
            writer, sheet_name="Trivial_cleaned_orig_multi", index=False
        )
        area_summary.to_excel(writer, sheet_name="Area_Summary", index=False)
        strata_info.to_excel(writer, sheet_name="Strata_Summary", index=False)

        # Updated formats outputs
        formats_summary_before.to_excel(writer, sheet_name="Formats_Summary_BeforeCoverage", index=False)
        formats_summary_after.to_excel(writer, sheet_name="Formats_Summary_AfterCoverage", index=False)
        added_rows.to_excel(writer, sheet_name="Formats_Added_ByCoverage", index=False)

    print(f"\nResults written to: {output_file}")

    from datetime import datetime
    print(f"Last run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


if __name__ == "__main__":
    main()


Total rows in file: 118639
Universe (FinalFormat not NaN): 102205
Strict trivial (Cleaned single token + orig word or numeric-only): 53763
trivial_single_token but original multi-token (non-trivial): 1492
Non-trivial rows for sampling: 48442
Raw stratified sample size (before coverage correction): 2288
Final sample size (after coverage correction): 2295
Rows added by coverage correction: 7
Formats added by coverage correction:
postalcode, normalized, acidity, modelname, ph, alkalinity, saltness

Results written to: KaggleViznet_95CI_StatAudit_UniverseAndSample.xlsx
Last run on: 2025-12-15 13:51:59


Creates GPTAnalysisOn2295.xlsx and GPTOutputsOn2295.xlsx is just a manual copy of the first sheet of KaggleViznet_95CI_StatAudit_UniverseAndSample.xlsx.


In [13]:
import pandas as pd


def _to_bool(value):
    """
    Robust conversion to boolean for mixed columns like
    TRUE/FALSE, 1/0, yes/no, etc.
    """
    if pd.isna(value):
        return False
    if isinstance(value, (int, float)):
        return bool(value)
    if isinstance(value, str):
        v = value.strip().lower()
        return v in {"true", "t", "1", "yes", "y"}
    return bool(value)


def analyze_gpt_outputs_to_excel(input_path: str, output_path: str) -> None:
    """
    Load GPTOutputsOn2295.xlsx and write all analyses into a new
    Excel workbook with multiple sheets instead of printing.
    """

    # -------------------
    # 1. Load data
    # -------------------
    df = pd.read_excel(input_path)

    n_rows = len(df)
    n_cols = len(df.columns)

    # Convenience flags
    df["problem"] = df["GPT_approves_format"] != 1
    df["wrong_abbrev"] = df["GPT_found_wrong_abbrev_expansions"].apply(_to_bool)
    df["notes_nonnull"] = df["GPT_notes"].notna()

    # -------------------
    # 2. Global summaries
    # -------------------
    approves_true = int((df["GPT_approves_format"] == 1).sum())
    approves_false = int((df["GPT_approves_format"] == 0).sum())

    global_summary = pd.DataFrame(
        [
            ["rows_total", n_rows],
            ["columns_total", n_cols],
            ["approves_true", approves_true],
            ["approves_true_rate", approves_true / n_rows if n_rows else 0],
            ["approves_false", approves_false],
            ["approves_false_rate", approves_false / n_rows if n_rows else 0],
        ],
        columns=["metric", "value"],
    )

    no_finalformat_counts = (
        df["no_FinalFormat_found"].value_counts(dropna=False)
        .rename_axis("no_FinalFormat_found")
        .reset_index(name="count")
    )
    no_finalformat_counts["proportion"] = (
        no_finalformat_counts["count"] / n_rows if n_rows else 0
    )

    wrong_abbrev_total = int(df["wrong_abbrev"].sum())
    wrong_abbrev_summary = pd.DataFrame(
        [
            ["wrong_abbrev_true", wrong_abbrev_total],
            [
                "wrong_abbrev_rate",
                wrong_abbrev_total / n_rows if n_rows else 0,
            ],
        ],
        columns=["metric", "value"],
    )

    # -------------------
    # 3. FinalFormat distribution and problems
    # -------------------
    ff_counts = df["FinalFormat"].value_counts().sort_values(ascending=False)
    ff_table = ff_counts.to_frame("count")
    ff_table["proportion"] = ff_table["count"] / n_rows if n_rows else 0
    ff_table = ff_table.reset_index().rename(columns={"index": "FinalFormat"})

    problem_df = df[df["problem"]]
    total_per_ff = df.groupby("FinalFormat").size()
    problems_per_ff = problem_df.groupby("FinalFormat").size()

    problem_stats = (
        pd.DataFrame({"total": total_per_ff, "problems": problems_per_ff})
        .fillna(0)
        .astype({"problems": "int"})
    )
    problem_stats["problem_rate"] = (
        problem_stats["problems"] / problem_stats["total"]
    )
    problem_stats = problem_stats.reset_index()

    # -------------------
    # 4. Format transitions (for problem cases only)
    # -------------------
    if not problem_df.empty:
        fmt_sugg = (
            problem_df.groupby(
                ["FinalFormat", "GPT_suggested_correct_format"]
            )
            .size()
            .reset_index(name="count")
            .sort_values("count", ascending=False)
        )
    else:
        fmt_sugg = pd.DataFrame(
            columns=["FinalFormat", "GPT_suggested_correct_format", "count"]
        )

    # -------------------
    # 5. Keyword suggestions
    # -------------------
    kw_df = df[df["GPT_suggested_correct_keyword"].notna()]
    if not kw_df.empty:
        kw_pairs = (
            kw_df.groupby(["SourceKeyword", "GPT_suggested_correct_keyword"])
            .size()
            .reset_index(name="count")
            .sort_values("count", ascending=False)
        )
    else:
        kw_pairs = pd.DataFrame(
            columns=["SourceKeyword", "GPT_suggested_correct_keyword", "count"]
        )

    # -------------------
    # 6. Abbreviation impact
    # -------------------
    abbrev_group = df.groupby("abbrev_used")["problem"].agg(
        total="size", problems="sum"
    )
    abbrev_group["problem_rate"] = (
        abbrev_group["problems"] / abbrev_group["total"]
    )
    abbrev_group = abbrev_group.reset_index()

    wrong_abbrev_by_area = df.groupby("area")["wrong_abbrev"].sum()
    total_by_area = df.groupby("area").size()
    abbrev_area = (
        pd.DataFrame(
            {"total": total_by_area, "wrong_abbrev": wrong_abbrev_by_area}
        )
        .fillna(0)
        .astype({"wrong_abbrev": "int"})
    )
    abbrev_area["wrong_abbrev_rate"] = (
        abbrev_area["wrong_abbrev"] / abbrev_area["total"]
    )
    abbrev_area = abbrev_area.reset_index()

    # -------------------
    # 7. Problems by area
    # -------------------
    problems_per_area = problem_df.groupby("area").size()
    area_stats = (
        pd.DataFrame(
            {"total": total_by_area, "problems": problems_per_area}
        )
        .fillna(0)
        .astype({"problems": "int"})
    )
    area_stats["problem_rate"] = (
        area_stats["problems"] / area_stats["total"]
    )
    area_stats = area_stats.reset_index()

    # -------------------
    # 8. GPT_notes coverage
    # -------------------
    notes_nonnull = int(df["notes_nonnull"].sum())
    notes_summary = pd.DataFrame(
        [
            ["rows_with_notes", notes_nonnull],
            [
                "notes_coverage_rate",
                notes_nonnull / n_rows if n_rows else 0,
            ],
        ],
        columns=["metric", "value"],
    )

    notes_ff = (
        df.groupby("FinalFormat")["notes_nonnull"]
        .mean()
        .rename("notes_coverage_rate")
        .reset_index()
        .sort_values("notes_coverage_rate", ascending=False)
    )

    notes_area = (
        df.groupby("area")["notes_nonnull"]
        .mean()
        .rename("notes_coverage_rate")
        .reset_index()
        .sort_values("notes_coverage_rate", ascending=False)
    )

    # -------------------
    # 9. Write everything to Excel
    # -------------------
    # Let pandas choose the engine (usually openpyxl), so we do not need xlsxwriter.
    with pd.ExcelWriter(output_path) as writer:
        # Global sheet
        global_summary.to_excel(
            writer, sheet_name="GlobalSummary", index=False
        )
        no_finalformat_counts.to_excel(
            writer, sheet_name="GlobalSummary", index=False, startrow=8
        )
        wrong_abbrev_summary.to_excel(
            writer,
            sheet_name="GlobalSummary",
            index=False,
            startrow=8 + len(no_finalformat_counts) + 3,
        )

        # FinalFormat-related
        ff_table.to_excel(
            writer, sheet_name="FinalFormatDistribution", index=False
        )
        problem_stats.to_excel(
            writer, sheet_name="ProblemsByFinalFormat", index=False
        )
        fmt_sugg.to_excel(
            writer, sheet_name="FormatTransitions", index=False
        )
        kw_pairs.to_excel(
            writer, sheet_name="KeywordSuggestions", index=False
        )

        # Abbreviation impact
        abbrev_group.to_excel(
            writer, sheet_name="AbbrevGlobal", index=False
        )
        abbrev_area.to_excel(
            writer, sheet_name="AbbrevByArea", index=False
        )

        # Area stats
        area_stats.to_excel(
            writer, sheet_name="ProblemsByArea", index=False
        )

        # Notes coverage
        notes_summary.to_excel(
            writer, sheet_name="NotesSummary", index=False
        )
        notes_ff.to_excel(
            writer, sheet_name="NotesByFinalFormat", index=False
        )
        notes_area.to_excel(
            writer, sheet_name="NotesByArea", index=False
        )


if __name__ == "__main__":
    input_path = "GPTOutputsOn2295.xlsx"
    output_path = "GPTAnalysisOn2295.xlsx"
    analyze_gpt_outputs_to_excel(input_path, output_path)

    from datetime import datetime
    print(f"Last run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


Last run on: 2025-12-13 15:32:03


full 795 items with the 11 problematic expansions

In [14]:
import pandas as pd
from datetime import datetime

def extract_suspicious_abbrev_rows(
    superset_path: str,
    output_path: str = "K_V_superset_suspicious_abbrev.xlsx"
) -> pd.DataFrame:
    """
    Extract all rows from the Kaggle+VizNet superset that contain any of a given
    set of suspicious abbreviation expansions in one of the abbreviation columns.

    It searches the following columns:
      - abbrev_clean
      - abbrev_desc
      - abbrev_clean_analysis

    And filters rows where at least one of these columns contains one of the
    suspicious expansions.

    Parameters
    ----------
    superset_path : str
        Path to the full K+V superset file (118k headers) with abbreviation columns.
    output_path : str, optional
        Path of the Excel file that will contain the filtered rows.

    Returns
    -------
    pd.DataFrame
        DataFrame with all matching rows, including a helper column
        'suspicious_abbrev_hits' listing which abbrev(s) were found.
    """
    # 1. Suspicious expansions to trace
    suspicious_expansions = [
        "e->errors",
        "hr->heart rate",
        "ip->internet protocol",
        "co->carbon oxide",
        "int->interception",
        "sl->soil",
        "acc->accuracy",
        "cmp->completions",
        "direct->direction",
        "uk->unique key",
        "reg->region",
    ]

    # 2. Columns that may contain abbreviation expansions
    abbrev_cols = ["abbrev_clean", "abbrev_desc", "abbrev_clean_analysis"]

    print(f"Loading superset from: {superset_path}")
    df = pd.read_excel(superset_path)

    # 3. Sanity check: all required columns must exist
    missing = [c for c in abbrev_cols if c not in df.columns]
    if missing:
        raise ValueError(
            f"The following required columns are missing in the superset file: {missing}"
        )

    # 4. Build mask of rows that contain at least one suspicious expansion
    mask_any = None
    for col in abbrev_cols:
        col_mask = df[col].isin(suspicious_expansions)
        mask_any = col_mask if mask_any is None else (mask_any | col_mask)

    # If no rows match, bail out gracefully
    if mask_any is None:
        print("No abbreviation columns processed. Check the column list.")
        return pd.DataFrame()

    matched = df[mask_any].copy()

    # 5. Build a helper column listing which expansions were hit and where
    def collect_hits(row) -> str:
        hits = []
        for col in abbrev_cols:
            val = row[col]
            if isinstance(val, str) and val in suspicious_expansions:
                hits.append(f"{col}:{val}")
        return "; ".join(hits)

    matched["suspicious_abbrev_hits"] = matched.apply(collect_hits, axis=1)

    # 6. Report basic stats
    print(f"Total rows in superset: {len(df)}")
    print(f"Rows with at least one suspicious abbreviation: {len(matched)}")

    # 7. Write to Excel (single sheet)
    matched.to_excel(output_path, index=False)
    print(f"Filtered rows written to: {output_path}")

    return matched


if __name__ == "__main__":
    # TODO: update this path to your real 118k+V superset file
    superset_file = r"AnalysedColumnsKaggleViznetFiltered.xlsx"

    output_file = "K_V_superset_suspicious_abbrev.xlsx"

    result_df = extract_suspicious_abbrev_rows(
        superset_path=superset_file,
        output_path=output_file,
    )

    print(f"Last run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


Loading superset from: AnalysedColumnsKaggleViznetFiltered.xlsx
Total rows in superset: 118639
Rows with at least one suspicious abbreviation: 795
Filtered rows written to: K_V_superset_suspicious_abbrev.xlsx
Last run on: 2025-12-14 14:24:13


Analysing 795 for correctness of abbreviations

In [None]:
import pandas as pd
from datetime import datetime


def load_abbrev_review_excel(
    file_path: str,
    sheet_name: str | int | None = 0
) -> pd.DataFrame:
    """
    Load the suspicious abbreviation review Excel file into a pandas DataFrame.

    Parameters
    ----------
    file_path : str
        Path to the Excel file, for example:
        'K_V_superset_suspicious_abbrev.xlsx'.
    sheet_name : str or int or None, optional
        Sheet name or index to read. Default is 0 (first sheet).

    Returns
    -------
    pd.DataFrame
        DataFrame with all columns from the Excel sheet.
    """
    df = pd.read_excel(file_path, sheet_name=sheet_name)
    return df


def normalise_abbrev_values(series: pd.Series) -> pd.Series:
    """
    Take a Series with values like 'abbrev_clean:ip->internet protocol'
    or 'abbrev_desc:e->errors' and return only the part after the colon,
    for example 'ip->internet protocol' or 'e->errors'.

    If there is no colon, returns the stripped original value.
    """
    s = series.fillna("").astype(str)
    # Split only once, keep the last part so:
    # 'abbrev_clean:ip->internet protocol' -> ['abbrev_clean', 'ip->internet protocol'] -> take index -1
    s = s.str.split(":", n=1).str[-1].str.strip()
    return s


def summarise_decisions_per_abbrev(
    df: pd.DataFrame,
    abbrev_col: str = "suspicious_abbrev_hits",
    decision_col: str = "GPT_abbrev_decision"
) -> pd.DataFrame:
    """
    For each suspicious abbreviation, count how many times it received
    each GPT_abbrev_decision value (OK, WRONG_ABBREV, UNCERTAIN).

    The abbreviation values are normalised to keep only what is after ':'.
    For example:
        'abbrev_clean:ip->internet protocol'
    becomes:
        'ip->internet protocol'

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing at least abbrev_col and decision_col.
    abbrev_col : str
        Column name with the raw abbreviation pattern.
    decision_col : str
        Column name with the GPT decision.

    Returns
    -------
    pd.DataFrame
        A pivot table with one row per normalised abbreviation and one column
        per decision.
        Example columns: ['abbrev', 'OK', 'WRONG_ABBREV', 'UNCERTAIN', 'total'].
    """
    temp_df = df.copy()
    # Create a cleaned abbreviation column that only keeps what is after ':'
    temp_df["abbrev"] = normalise_abbrev_values(temp_df[abbrev_col])

    pivot = (
        temp_df
        .pivot_table(
            index="abbrev",
            columns=decision_col,
            aggfunc="size",
            fill_value=0
        )
        .reset_index()
    )

    for col in ["OK", "WRONG_ABBREV", "UNCERTAIN"]:
        if col not in pivot.columns:
            pivot[col] = 0

    decision_cols = [c for c in pivot.columns if c in ["OK", "WRONG_ABBREV", "UNCERTAIN"]]
    pivot["total"] = pivot[decision_cols].sum(axis=1)

    pivot = pivot.sort_values(by="abbrev").reset_index(drop=True)
    return pivot


def summarise_suggested_expansions(
    df: pd.DataFrame,
    abbrev_col: str = "suspicious_abbrev_hits",
    suggested_col: str = "GPT_suggested_abbrev"
) -> pd.DataFrame:
    """
    For each suspicious abbreviation, count how many times each
    GPT_suggested_abbrev string appears.

    The abbreviation values are normalised to keep only what is after ':'.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing at least abbrev_col and suggested_col.
    abbrev_col : str
        Column name with the raw abbreviation pattern.
    suggested_col : str
        Column name with GPT suggested expansion.

    Returns
    -------
    pd.DataFrame
        DataFrame with columns:
        ['abbrev', suggested_col, 'count'], sorted for readability.
    """
    temp_df = df.copy()
    temp_df["abbrev"] = normalise_abbrev_values(temp_df[abbrev_col])
    temp_df[suggested_col] = temp_df[suggested_col].fillna("").astype(str).str.strip()

    grouped = (
        temp_df
        .groupby(["abbrev", suggested_col], dropna=False)
        .size()
        .reset_index(name="count")
    )

    grouped = grouped.sort_values(
        by=["abbrev", "count", suggested_col],
        ascending=[True, False, True]
    ).reset_index(drop=True)

    return grouped


def main():
    """
    Example usage:
    - Load your Excel file.
    - Compute decision counts per normalised abbreviation.
    - Compute suggested expansion counts per normalised abbreviation.
    - Print and optionally write to Excel.
    """
    input_file = "K_V_superset_suspicious_abbrev.xlsx"

    abbrev_col_name = "suspicious_abbrev_hits"
    decision_col_name = "GPT_abbrev_decision"
    suggested_col_name = "GPT_suggested_abbrev"

    df = load_abbrev_review_excel(input_file)

    decision_summary = summarise_decisions_per_abbrev(
        df,
        abbrev_col=abbrev_col_name,
        decision_col=decision_col_name
    )

    suggested_summary = summarise_suggested_expansions(
        df,
        abbrev_col=abbrev_col_name,
        suggested_col=suggested_col_name
    )

    print("Decision counts per abbreviation (only part after ':'):")
    print(decision_summary)

    print("\nSuggested expansions per abbreviation (only part after ':'):")
    print(suggested_summary)

    output_file = "abbrev_review_summary.xlsx"

    try:
        with pd.ExcelWriter(output_file) as writer:
            decision_summary.to_excel(writer, sheet_name="decision_counts", index=False)
            suggested_summary.to_excel(writer, sheet_name="suggested_counts", index=False)
        print(f"\nSummaries written to: {output_file}")
    except ModuleNotFoundError as e:
        print("\nCould not write Excel file because an engine is missing.")
        print(f"Details: {e}")
        print("The printed tables above are still correct. "
              "To generate Excel output, install an engine such as 'openpyxl' or 'xlsxwriter'.")


if __name__ == "__main__":
    main()

print(f"Last run on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


Decision counts per abbreviation (only part after ':'):
GPT_abbrev_decision                 abbrev  OK  UNCERTAIN  WRONG_ABBREV  total
0                            acc->accuracy  20          4            15     39
1                         cmp->completions  14          0            16     30
2                         co->carbon oxide  20          2            51     73
3                        direct->direction   0          0            27     27
4                                e->errors  35          8           128    171
5                           hr->heart rate   2          1           159    162
6                        int->interception  48          0            19     67
7                    ip->internet protocol  49          4            81    134
8                              reg->region   0          5            11     16
9                                 sl->soil   0          0            56     56
10                          uk->unique key   0          0            20    