In [None]:
# ==========================================
# SETUP BLOCK
# ==========================================

import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
from scipy.stats import chi2_contingency, fisher_exact, spearmanr

sys.path.append(os.path.abspath(".."))

from Helper_functions import (
    clean_up_subjects,
    calculate_true_false_score,
    calculate_internet_terms_understanding_score,
    group_internet_understanding,
    explode_column,
    prepare_pair,
    order_crosstab,
)

from lists import (
    multiple_choice_questions,
    LIKERT_VALUE_MAPS,
    comparison_pairs_by_demo,
    cross_tab_titles_and_colors,
    nominal_posthoc_pairs_demo,
    ordinal_posthoc_pairs_demo,
)

from answer_categories import question_orders, COLUMN_ALIASES

sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (10, 5)

DATA_FILE = os.path.join("..", "Data", "Fertige Tabelle.xlsx")
df = pd.read_excel(DATA_FILE)
df.columns = df.columns.astype(str).str.strip()
df = df.rename(columns=COLUMN_ALIASES)

for col in ["Most used subjects", "Preferred Subjects", "Least preferred Subjects"]:
    if col in df.columns:
        df = clean_up_subjects(df, col)

true_false_cols = [f"True/False_{i}" for i in range(1, 7)]
if all(c in df.columns for c in true_false_cols):
    df = calculate_true_false_score(df)

if any(c.startswith("Internet terms_") for c in df.columns):
    df = calculate_internet_terms_understanding_score(df)
    df = group_internet_understanding(df)

print("Setup complete – DataFrame loaded and preprocessed")
print(f"Rows: {len(df)}, Columns: {len(df.columns)}")


In [None]:
# --------- Shows distribution in table and graph form -----------

multiple_choice_demographics = {
    "Preferred Subjects",
    "Least preferred Subjects",
    "Most used subjects",
}

for demo, question_list in comparison_pairs_by_demo.items():
    for question in question_list:

        q_is_multi = question in multiple_choice_questions
        d_is_multi = demo in multiple_choice_demographics

        data = prepare_pair(df, question, demo, q_is_multi, d_is_multi)
        if data.empty:
            print(f"No overlapping data for '{question}' x '{demo}'. Skipping.")
            continue

        ct = pd.crosstab(data[question], data[demo])
        ct = order_crosstab(ct, question, demo)

        if ct.empty:
            print(f"Empty crosstab for '{question}' x '{demo}'. Skipping.")
            continue

        ct_percent = ct.div(ct.sum(axis=1), axis=0) * 100

        print(f"\n{question} – stacked by {demo}")
        print("Counts:")
        print(ct)
        print("\nRow %:")
        print(ct_percent.round(1))

        title_and_colors = cross_tab_titles_and_colors.get(
            (demo, question),
            [f"{question} – distribution of {demo} within each answer"]
        )

        plot_title = title_and_colors[0]

        if len(title_and_colors) > 1:
            ax = ct.plot(
                kind="bar",
                stacked=True,
                figsize=(10, 6),
                color=title_and_colors[1:],
                width=0.9,
            )
        else:
            ax = ct.plot(
                kind="bar",
                stacked=True,
                figsize=(10, 6),
                colormap="Set3",
                width=0.9,
            )

        ax.set_title(plot_title)
        ax.set_ylabel("Count")
        ax.set_xlabel(question)
        plt.xticks(rotation=45, ha="right")
        plt.legend(title=demo, bbox_to_anchor=(1.01, 1), loc="upper left")
        plt.tight_layout()
        plt.show()


In [None]:
# ==========================================
# CHI-SQUARE BLOCK 
# ==========================================


def cramers_v(chi2, ct):
    n = ct.values.sum()
    if n == 0:
        return np.nan
    r, k = ct.shape
    phi2 = chi2 / n
    phi2corr = max(0, phi2 - ((k - 1)*(r - 1)) / (n - 1)) if n > 1 else np.nan
    rcorr = r - ((r - 1)**2) / (n - 1) if n > 1 else r
    kcorr = k - ((k - 1)**2) / (n - 1) if n > 1 else k
    denom = min(rcorr - 1, kcorr - 1)
    return np.sqrt(phi2corr / denom) if denom > 0 else np.nan


def chi_square_with_checks(ct):
    ct = ct.loc[ct.sum(axis=1) > 0, ct.sum(axis=0) > 0]
    if ct.empty or ct.shape[0] < 2 or ct.shape[1] < 2:
        return None

    chi2, p, dof, expected = chi2_contingency(ct.values, correction=False)
    expected_df = pd.DataFrame(expected, index=ct.index, columns=ct.columns)

    return {
        "chi2": chi2,
        "p": p,
        "dof": dof,
        "cramers_v": cramers_v(chi2, ct),
        "min_expected": expected_df.values.min(),
        "prop_expected_lt5": (expected_df.values < 5).mean(),
    }


all_tests = []

for demo, question_list in comparison_pairs_by_demo.items():
    for question in question_list:

        q_is_multi = question in multiple_choice_questions
        d_is_multi = demo in multiple_choice_demographics

        data = prepare_pair(df, question, demo, q_is_multi, d_is_multi)
        if data.empty:
            continue

        ct = pd.crosstab(data[question], data[demo])
        ct = order_crosstab(ct, question, demo)
        if ct.empty:
            continue

        test = chi_square_with_checks(ct)
        if test is None:
            continue

        all_tests.append({
            "demo": demo,
            "question": question,
            "n": int(ct.values.sum()),
            "rows": ct.shape[0],
            "cols": ct.shape[1],
            **test,
        })

chi2_results = pd.DataFrame(all_tests)
print(f"Chi-square tests completed: {len(chi2_results)}")

EXPORT = True
EXPORT_PATH = os.path.join(
    "..",
    "Data",
    "test_results",
    "ai_crosstabs_and_chi_square.xlsx"
)

if EXPORT:
    with pd.ExcelWriter(EXPORT_PATH, engine="xlsxwriter") as writer:
        chi2_results.to_excel(
            writer,
            index=False,
            sheet_name="chi_square_results"
        )
    print(f"Exported to: {EXPORT_PATH}")


In [None]:
# ==========================================
# POST-HOC ANALYSES (2×2 & Spearman)
# ==========================================

multiple_choice_demographics = {
    "Preferred Subjects",
    "Least preferred Subjects",
    "Most used subjects",
}

def test_2x2(a, b, c, d, alternative="two-sided", prefer="auto", yates=False):
    table = np.array([[int(a), int(b)], [int(c), int(d)]], dtype=int)

    row_sums = table.sum(axis=1)
    col_sums = table.sum(axis=0)
    if (row_sums[0] == 0) or (row_sums[1] == 0) or (col_sums[0] == 0) or (col_sums[1] == 0):
        return {
            "method": "skip",
            "reason": "degenerate margins",
            "p": np.nan,
            "chi2": np.nan,
            "dof": 1,
            "odds_ratio": np.nan,
            "min_expected": 0.0,
            "table": table,
        }

    N = table.sum()
    expected = np.outer(row_sums, col_sums) / N
    min_exp = float(expected.min())

    method = "chi2"
    if prefer == "fisher" or (prefer == "auto" and (min_exp < 5 or (table == 0).any())):
        method = "fisher"

    if method == "fisher":
        orat, p = fisher_exact(table, alternative=alternative)
        odds_ratio = float(orat) if np.isfinite(orat) else np.nan
        return {
            "method": "fisher",
            "reason": "",
            "p": float(p),
            "chi2": np.nan,
            "dof": 1,
            "odds_ratio": odds_ratio,
            "min_expected": min_exp,
            "table": table,
        }

    chi2_val, p, dof, _ = chi2_contingency(table, correction=yates)

    a_, b_, c_, d_ = table.astype(float).ravel()
    if 0 in (a_, b_, c_, d_):
        a_, b_, c_, d_ = a_ + 0.5, b_ + 0.5, c_ + 0.5, d_ + 0.5
    odds_ratio = float((a_ * d_) / (b_ * c_))

    return {
        "method": "chi2",
        "reason": "",
        "p": float(p),
        "chi2": float(chi2_val),
        "dof": int(dof),
        "odds_ratio": odds_ratio,
        "min_expected": min_exp,
        "table": table,
    }


def posthoc_nominal(question, group, prefer="auto", yates=False, alternative="two-sided"):
    q_is_multi = question in multiple_choice_questions
    g_is_multi = (group in multiple_choice_demographics) or (group in multiple_choice_questions)

    data = prepare_pair(df, question, group, q_is_multi, g_is_multi)
    if data.empty:
        return pd.DataFrame(
            columns=[
                "question","group","row_cat","g1","g2","method","p","chi2","dof",
                "odds_ratio","min_expected","n_g1","n_g2","x_g1","x_g2","note"
            ]
        )

    ct = pd.crosstab(data[question], data[group])

    if not ct.columns.is_unique:
        ct = ct.T.groupby(level=0).sum().T
    if not ct.index.is_unique:
        ct = ct.groupby(level=0).sum()

    ct = order_crosstab(ct, question, group)

    totals = ct.sum(axis=0)
    results = []

    for row_cat in ct.index:
        for g1, g2 in itertools.combinations(ct.columns, 2):
            a = int(ct.loc[row_cat, g1]); n1 = int(totals[g1]); b = n1 - a
            c = int(ct.loc[row_cat, g2]); n2 = int(totals[g2]); d = n2 - c

            res = test_2x2(a, b, c, d, alternative=alternative, prefer=prefer, yates=yates)

            results.append({
                "question": question,
                "group": group,
                "row_cat": row_cat,
                "g1": g1,
                "g2": g2,
                "method": res["method"],
                "p": res["p"],
                "chi2": res["chi2"],
                "dof": res["dof"],
                "odds_ratio": res["odds_ratio"],
                "min_expected": res["min_expected"],
                "n_g1": n1,
                "n_g2": n2,
                "x_g1": a,
                "x_g2": c,
                "note": res.get("reason", ""),
            })

    return pd.DataFrame(results)


def map_to_numeric_flexible(series, question):
    s = series.astype("string").str.strip()

    if question in LIKERT_VALUE_MAPS:
        mp = {str(k): v for k, v in LIKERT_VALUE_MAPS[question].items()}
        out = s.map(mp)
        if out.notna().any():
            return out.astype(float)

    if question in question_orders:
        order = [str(x) for x in question_orders[question]]
        mapper = {cat: i for i, cat in enumerate(order)}
        out = s.map(mapper)
        if out.notna().any():
            return out.astype(float)

    return pd.to_numeric(series, errors="coerce")


def posthoc_trend_spearman(question, group):
    if question not in df.columns or group not in df.columns:
        return pd.DataFrame(columns=["question","group","n","rho","p","direction"])

    data = df[[question, group]].dropna().copy()
    if data.empty:
        return pd.DataFrame(columns=["question","group","n","rho","p","direction"])

    x = map_to_numeric_flexible(data[question], question)
    y = map_to_numeric_flexible(data[group], group)

    valid = (~x.isna()) & (~y.isna())
    if not valid.any():
        return pd.DataFrame(columns=["question","group","n","rho","p","direction"])

    rho, p = spearmanr(x[valid].values, y[valid].values)
    direction = "positive" if rho > 0 else ("negative" if rho < 0 else "zero")

    return pd.DataFrame([{
        "question": question,
        "group": group,
        "n": int(valid.sum()),
        "rho": float(rho),
        "p": float(p),
        "direction": direction,
    }])


all_nominal = []
for (q, g) in nominal_posthoc_pairs_demo:
    try:
        df_res = posthoc_nominal(q, g, prefer="auto", yates=False, alternative="two-sided")
        if not df_res.empty:
            all_nominal.append(df_res)
            print(f"Nominal 2×2 computed for {q} × {g} ({len(df_res)} tests).")
        else:
            print(f"No data for {q} × {g}.")
    except Exception as e:
        print(f"Failed nominal 2×2 for {q} × {g}: {e}")

nominal_posthoc_results = (
    pd.concat(all_nominal, ignore_index=True) if all_nominal
    else pd.DataFrame(columns=[
        "question","group","row_cat","g1","g2","method","p","chi2","dof",
        "odds_ratio","min_expected","n_g1","n_g2","x_g1","x_g2","note"
    ])
)

all_trend = []
for (q, g) in ordinal_posthoc_pairs_demo:
    try:
        trend = posthoc_trend_spearman(q, g)
        if not trend.empty:
            all_trend.append(trend)
            print(f"Spearman trend computed for {q} × {g}.")
        else:
            print(f"No data for Spearman {q} × {g}.")
    except Exception as e:
        print(f"Failed Spearman for {q} × {g}: {e}")

trend_results = (
    pd.concat(all_trend, ignore_index=True) if all_trend
    else pd.DataFrame(columns=["question","group","n","rho","p","direction"])
)

print(f"Nominal 2×2 tests: {len(nominal_posthoc_results)} Spearman trends: {len(trend_results)}")


EXPORT = True

if EXPORT:
    export_path_nominal = os.path.join("..", "Data", "test_results", "ai_posthoc_nominal_2x2.xlsx")
    with pd.ExcelWriter(export_path_nominal, engine="xlsxwriter") as writer:
        nominal_posthoc_results.to_excel(writer, index=False, sheet_name="pairwise_2x2_tests")
    print(f"Exported to: {export_path_nominal}")

    export_path_spearman = os.path.join("..", "Data", "test_results", "ai_posthoc_spearman_trends.xlsx")
    with pd.ExcelWriter(export_path_spearman, engine="xlsxwriter") as writer:
        trend_results.to_excel(writer, index=False, sheet_name="spearman_trends")
    print(f"Exported to: {export_path_spearman}")
