# All the statistics tests in a single file with a long log...

In [1]:
# Importe dependencies
import time
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
# statistics
from scipy.stats import shapiro, kstest
from scipy.stats import levene
from scipy.stats import f_oneway
from statsmodels.stats.anova import anova_lm
from scipy.stats import boxcox
from scipy.stats import yeojohnson
from pingouin import welch_anova as pg_welch_anova
from scipy import stats
import scikit_posthocs as sp
from statsmodels.formula.api import ols
from statsmodels.stats.multicomp import pairwise_tukeyhsd

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Iterable, Tuple

import glob
import os
import numpy as np

import logging

import statsmodels.api as sm
import statsmodels.formula.api as smf
from statsmodels.stats.anova import anova_lm


In [2]:

@dataclass
class LoadData:
    num_rows: Optional[int]
    s_folder: str
    algo: str

    def __post_init__(self):
        self._root = Path(self.s_folder)

    # ---------- helpers ----------
    def _read_csv_safe(self, path: Path, usecols: Optional[Iterable[str]] = None) -> Optional[pd.DataFrame]:
        try:
            if not path.exists():
                return None
            return pd.read_csv(path, nrows=self.num_rows, usecols=usecols)
        except Exception as e:
            logger.info(f"[read_csv] Failed '{path}': {e}")
            return None

    def _glob_one(self, pattern: str) -> Optional[Path]:
        hits = list(self._root.glob(pattern))
        return hits[0] if hits else None

    def _per_run_csv(self, run_id: str, filename: str) -> Path:
        return self._root / run_id / filename

    # ---------- general ----------
    def load_run_table(self) -> pd.DataFrame:
        df = self._read_csv_safe(self._root / "run_table.csv")
        return df if df is not None else pd.DataFrame()

    # transforms
    def log_transform(self, df: pd.DataFrame, label: str) -> pd.Series:
        # keep zeros; NaNs stay NaN
        s = pd.to_numeric(df[label], errors="coerce")
        return np.log1p(s)

    def boxcox_transform(self, df: pd.DataFrame, label: str) -> Tuple[pd.Series, float]:
        s = pd.to_numeric(df[label], errors="coerce").dropna()
        # ensure strictly positive
        s = s[s > 0]
        if s.empty:
            raise ValueError(f"boxcox_transform: '{label}' has no positive values")
        transformed, lam = boxcox(s)
        out = pd.Series(index=s.index, data=transformed, name=label)
        return out, lam

    def filter_df(self, df: pd.DataFrame, label: str, value) -> pd.DataFrame:
        return df[df[label] == value]

    def group_df_by(self, df: pd.DataFrame, key: str, outliers: bool, column_for_outlier: str = "avg_energy_pct"):
        g = df.groupby(key, dropna=False)
        if not outliers:
            return g
        # apply outlier removal per group, then return a grouped view again
        cleaned = g.apply(self.remove_outliers, column_name=column_for_outlier)
        cleaned = cleaned.reset_index(level=0, drop=True)  # flatten index from .apply
        return cleaned.groupby(key, dropna=False)

    # ---------- nav2 aggregates ----------
    def load_nav2_success(self, component: Optional[str], outliers: bool) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            if row.get("__done") == "TODO":
                continue
            path = self._per_run_csv(run_id, "nav2_performance.csv")
            nav2 = self._read_csv_safe(path)
            if nav2 is None:
                success = 0
            else:
                s = pd.to_numeric(nav2.get("success"), errors="coerce")
                success = int(np.nansum(s)) if s is not None else 0
            rows.append({"__run_id": run_id, "success": success})

        agg = pd.DataFrame(rows)
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    def load_nav2_time(self, component: Optional[str], outliers: bool, timeout_fallback: float = 120.0) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            if row.get("__done") == "TODO":
                continue
            path = self._per_run_csv(run_id, "nav2_performance.csv")
            nav2 = self._read_csv_safe(path)
            if nav2 is None:
                nav_time = timeout_fallback
            else:
                s = pd.to_numeric(nav2.get("navigation_time"), errors="coerce")
                nav_time = float(np.nansum(s)) if s is not None else timeout_fallback
                if nav_time == 0:
                    nav_time = timeout_fallback
            rows.append({"__run_id": run_id, "navigation_time": nav_time})

        agg = pd.DataFrame(rows)
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    def load_nav2_path_length(self, component: Optional[str], outliers: bool) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            if row.get("__done") == "TODO":
                continue
            path = self._per_run_csv(run_id, "nav2_performance.csv")
            nav2 = self._read_csv_safe(path)
            if nav2 is None:
                planned = np.nan
            else:
                s = pd.to_numeric(nav2.get("planned_distance_m"), errors="coerce")
                planned = float(np.nansum(s)) if s is not None else np.nan
            rows.append({"__run_id": run_id, "planned_distance_m": planned})

        agg = pd.DataFrame(rows)
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    def load_nav2_recoveries(self, component: Optional[str], outliers: bool) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            if row.get("__done") == "TODO":
                continue
            path = self._per_run_csv(run_id, "nav2_performance.csv")
            nav2 = self._read_csv_safe(path)
            if nav2 is None:
                rec = 0
            else:
                s = pd.to_numeric(nav2.get("recoveries"), errors="coerce")
                rec = int(np.nansum(s)) if s is not None else 0
            rows.append({"__run_id": run_id, "recoveries": rec})

        agg = pd.DataFrame(rows)
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    # ---------- power/CPU/machine ----------
    def load_power(self, component: str, transform: bool, outliers: bool) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            pattern = str(self._per_run_csv(run_id, f"pj_{component}_server.csv-*.csv"))
            hits = glob.glob(pattern)
            if not hits:
                continue
            df = self._read_csv_safe(Path(hits[0]))
            if df is None or "CPU Power" not in df:
                continue
            df["CPU Power"] = pd.to_numeric(df["CPU Power"], errors="coerce")
            df = df[df["CPU Power"] > 0.001]
            if df.empty:
                continue
            if transform:
                transformed, _ = self.boxcox_transform(df, "CPU Power")
                df.loc[transformed.index, "CPU Power"] = transformed
            avg = float(df["CPU Power"].mean())
            rows.append({"__run_id": run_id, "avg_energy_pct": avg})

        agg = pd.DataFrame(rows)
        if agg.empty:
            return run_table
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    def load_machine_power(self, transform: bool, outliers: bool) -> pd.DataFrame:
        # Keep as "controller" file unless you actually have a machine-level file elsewhere.
        return self.load_power(component="controller", transform=transform, outliers=outliers)

    def load_machine_cpu(self, transform: bool, outliers: bool) -> pd.DataFrame:
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            path = self._per_run_csv(run_id, "pj_controller_server.csv")
            df = self._read_csv_safe(path)
            if df is None or "CPU Utilization" not in df:
                continue
            df["CPU Utilization"] = pd.to_numeric(df["CPU Utilization"], errors="coerce")
            df = df[df["CPU Utilization"] > 0.001]
            if df.empty:
                continue
            if transform:
                transformed, _ = self.boxcox_transform(df, "CPU Utilization")
                df.loc[transformed.index, "CPU Utilization"] = transformed
            avg = float(df["CPU Utilization"].mean())
            rows.append({"__run_id": run_id, "avg_energy_pct": avg})

        agg = pd.DataFrame(rows)
        if agg.empty:
            return run_table
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    def load_machine_mem(self, transform: bool, outliers: bool) -> pd.DataFrame:
        # If you truly want memory, change the column below (e.g., 'Memory Used (MB)')
        run_table = self.load_run_table()
        if run_table.empty:
            return run_table

        rows = []
        for _, row in run_table.iterrows():
            run_id = row["__run_id"]
            path = self._per_run_csv(run_id, "global.csv")
            df = self._read_csv_safe(path)
            if df is None or "cpu_cycles_est" not in df:
                continue
            df["cpu_cycles_est"] = pd.to_numeric(df["cpu_cycles_est"], errors="coerce")
            df = df[df["cpu_cycles_est"] > 0.001]
            if df.empty:
                continue
            if transform:
                transformed, _ = self.boxcox_transform(df, "cpu_cycles_est")
                df.loc[transformed.index, "cpu_cycles_est"] = transformed
            avg = float(df["cpu_cycles_est"].mean())
            rows.append({"__run_id": run_id, "avg_energy_pct": avg})

        agg = pd.DataFrame(rows)
        if agg.empty:
            return run_table
        return run_table.merge(agg, on="__run_id").dropna(subset=["__run_id"])

    # ---------- outliers ----------
    def remove_outliers(self, group: pd.DataFrame, column_name: str) -> pd.DataFrame:
        s = pd.to_numeric(group[column_name], errors="coerce")
        q1 = s.quantile(0.25)
        q3 = s.quantile(0.75)
        iqr = q3 - q1
        lo = q1 - 1.5 * iqr
        hi = q3 + 1.5 * iqr
        return group[(s >= lo) & (s <= hi)]


In [3]:
import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats import shapiro
from statsmodels.stats.oneway import anova_oneway   # Welch-capable
from statsmodels.stats.multicomp import pairwise_tukeyhsd
import pingouin as pg                               # for Games-Howell & Dunn
import scikit_posthocs as sp

def setup_logger(log_file="stats_tests.log"):
    logger = logging.getLogger("stats")
    logger.setLevel(logging.INFO)

    # Avoid duplicate handlers if re-run in notebooks
    if not logger.handlers:
        fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
        fh.setLevel(logging.INFO)
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)

        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)

        logger.addHandler(fh)
        logger.addHandler(ch)

    return logger

logger = setup_logger("stats_tests.log")

def _ensure_groups(df: pd.DataFrame, key: str, value_col: str):
    if key not in df.columns:
        raise ValueError(f"Grouping key '{key}' not in dataframe.")
    if value_col not in df.columns:
        raise ValueError(f"Value column '{value_col}' not in dataframe.")
    grouped = df[[key, value_col]].dropna().groupby(key)
    groups = [g[value_col].values for _, g in grouped]
    labels = [name for name, _ in grouped]
    return grouped, groups, labels

def shapiro_wilk(df: pd.DataFrame, column_name: str, key: str, max_per_group: int = 5000):
    """
    Runs Shapiro per group (downsamples if needed). Returns dict with per-group p-values and overall flag.
    """
    grouped = df.groupby(key)
    all_normal = True
    results = []
    for group_name, g in grouped:
        x = pd.to_numeric(g[column_name], errors="coerce").dropna().values
        if len(x) < 3:
            results.append((group_name, np.nan, np.nan, "insufficient"))
            continue
        # Shapiro becomes unstable for very large n; subsample
        if len(x) > max_per_group:
            rng = np.random.default_rng(42)
            x = rng.choice(x, size=max_per_group, replace=False)
        stat, p = shapiro(x)
        verdict = "normal" if p > 0.05 else "non-normal"
        if p <= 0.05:
            all_normal = False
        results.append((group_name, stat, p, verdict))
        logger.info(f"[Shapiro] {group_name}: W={stat:.3f}, p={p:.4g} → {verdict}")
    return {"all_normal": all_normal, "per_group": results}

def levene_test(df: pd.DataFrame, key: str, value_col: str, center: str = "median"):
    """
    Brown–Forsythe (Levene with center='median') for homogeneity of variances.
    """
    grouped, groups, labels = _ensure_groups(df, key, value_col)
    if len(groups) < 2:
        raise ValueError("Levene requires ≥2 groups.")
    stat, p = stats.levene(*groups, center=center)
    logger.info(f"[Levene/Brown–Forsythe] W={stat:.3f}, p={p:.4g} (center={center})")
    return {"stat": stat, "p": p}

def one_way_anova(df: pd.DataFrame, key: str, value_col: str):
    """
    Classical one-way ANOVA (equal variances). Also returns Tukey HSD post-hoc if significant.
    """
    grouped, groups, labels = _ensure_groups(df, key, value_col)
    if len(groups) < 2:
        raise ValueError("ANOVA requires ≥2 groups.")
    # stats.f_oneway expects arrays
    f_stat, p = stats.f_oneway(*groups)
    logger.info(f"[ANOVA] F={f_stat:.3f}, p={p:.4g}")
    posthoc = None
    if p < 0.05:
        values = np.concatenate(groups)
        group_labels = np.concatenate([[lbl]*len(g) for lbl, g in zip(labels, groups)])
        posthoc = pairwise_tukeyhsd(values, group_labels, alpha=0.05).summary()
        logger.info("[Post-hoc] Tukey HSD:")
        logger.info(posthoc.to_string())
    return {"test": "ANOVA", "F": f_stat, "p": p, "posthoc": posthoc}

def welch_anova(df: pd.DataFrame, key: str, value_col: str):
    """
    Welch's ANOVA using statsmodels (correct F and dfs).
    """
    # statsmodels takes a long-form DataFrame
    sub = df[[key, value_col]].dropna()
    res = anova_oneway(sub, dv=value_col, between=key, use_var="unequal", welch_correction=True)
    # res has columns: 'stat', 'pvalue', 'df', etc.
    f_stat = res.loc["overall", "stat"]
    p = res.loc["overall", "pvalue"]
    logger.info(f"[Welch ANOVA] F={f_stat:.3f}, p={p:.4g}")
    # Games–Howell post-hoc (unequal variances, unequal n)
    gh = pg.pairwise_gameshowell(dv=value_col, between=key, data=sub)
    logger.info("[Post-hoc] Games–Howell:")
    logger.info(gh.to_string())
    return {"test": "Welch ANOVA", "F": f_stat, "p": p, "posthoc": gh}

def kruskal_wallis_test(df: pd.DataFrame, key: str, value_col: str):
    grouped, groups, labels = _ensure_groups(df, key, value_col)
    if len(groups) < 2:
        raise ValueError("Kruskal–Wallis requires ≥2 groups.")
    stat, p = stats.kruskal(*groups)
    logger.info(f"[Kruskal–Wallis] H={stat:.3f}, p={p:.4g}")
    posthoc = None
    if p < 0.05:
        sub = df[[key, value_col]].dropna()        
        posthoc = sp.posthoc_dunn(sub, val_col=value_col,
                          group_col=key, p_adjust="bonferroni")
        # posthoc = pg.pairwise_dunn(sub, dv=value_col, between=key, p_adjust="bonferroni")
        logger.info("[Post-hoc] Dunn (Bonferroni) computed:")
        logger.info(posthoc.to_string())
    return {"test": "Kruskal–Wallis", "H": stat, "p": p, "posthoc": posthoc}

def choose_and_run_tests(df: pd.DataFrame, key: str, value_col: str):
    """
    Orchestrates:
      1) normality per group (Shapiro)
      2) variance homogeneity (Brown–Forsythe)
      3) picks test + post-hoc
    Returns a dict with everything.
    """
    res = {"normality": None, "homogeneity": None, "main": None}

    # 1) Normality
    norm = shapiro_wilk(df, value_col, key)
    res["normality"] = norm

    # 2) Homogeneity
    homo = levene_test(df, key, value_col, center="median")
    res["homogeneity"] = homo

    k = df[key].nunique()
    if k < 2:
        raise ValueError("Need at least 2 groups to compare.")

    # 3) Branch
    if norm["all_normal"] and homo["p"] >= 0.05:
        # Parametric, equal variances
        main = one_way_anova(df, key, value_col)
    elif norm["all_normal"] and homo["p"] < 0.05:
        # Parametric but unequal variances
        main = welch_anova(df, key, value_col)
    else:
        # Non-parametric
        main = kruskal_wallis_test(df, key, value_col)

    res["main"] = main
    return res

def build_power_df_for_runs(experiment_run: List[str], components: List[str], prefix_f: dict, algo: str = "nav2"):
    """
    Returns a dataframe with columns:
      ['__run_id', 'configuration', 'number_obstacles', 'avg_energy_pct', 'component', 'experiment_run', ...]
    """
    rows = []
    for run in experiment_run:
        s_folder = prefix_f[run]
        loader = LoadData(num_rows=None, s_folder=s_folder, algo=algo)
        for comp in components:
            dfp = loader.load_power(component=comp, transform=False, outliers=False)
            if dfp is None or dfp.empty:
                continue
            dfp = dfp.copy()
            dfp["component"] = comp
            dfp["experiment_run"] = run
            rows.append(dfp)
    return pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()

def build_overall_energy_df_for_runs(experiment_run: List[str], prefix_f: dict, algo: str = "nav2"):
    rows = []
    for run in experiment_run:
        s_folder = prefix_f[run]
        loader = LoadData(num_rows=None, s_folder=s_folder, algo=algo)
        dfe = loader.load_machine_power(transform=False, outliers=False)
        if dfe is None or dfe.empty:
            continue
        dfe = dfe.copy()
        dfe["experiment_run"] = run
        rows.append(dfe)
    return pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()

def compare_runs_by_component(experiment_run, components, prefix_f, logger):
    all_df = build_power_df_for_runs(experiment_run, components, prefix_f)
    if all_df.empty:
        logger.warning("[compare_runs_by_component] No power data found.")
        return {}

    results = {}
    for comp in components:
        dfc = all_df[all_df["component"] == comp]
        if dfc["experiment_run"].nunique() < 2:
            logger.info(f"[compare_runs_by_component] Skipping {comp}: need both runs.")
            continue

        logger.info(f"\n=== Compare runs (small_map vs large_map) | component={comp} ===")
        res = choose_and_run_tests(dfc, key="experiment_run", value_col="avg_energy_pct")
        results[comp] = res
    return results

def compare_runs_overall_energy(experiment_run, prefix_f, logger):
    all_energy = build_overall_energy_df_for_runs(experiment_run, prefix_f)
    if all_energy.empty or all_energy["experiment_run"].nunique() < 2:
        logger.warning("[compare_runs_overall_energy] Need overall energy from both runs.")
        return None

    logger.info("\n=== Compare runs (small_map vs large_map) | overall energy ===")
    return choose_and_run_tests(all_energy, key="experiment_run", value_col="avg_energy_pct")

def two_way_anova_map_obstacles_power(experiment_run, components, prefix_f, logger):
    """
    For each component, fits: avg_energy_pct ~ C(experiment_run) * C(number_obstacles)
    Returns dict of ANOVA tables.
    """
    all_df = build_power_df_for_runs(experiment_run, components, prefix_f)
    if all_df.empty:
        logger.warning("[two_way_anova_map_obstacles_power] No power data.")
        return {}

    out = {}
    for comp in components:
        dfc = all_df[all_df["component"] == comp].dropna(subset=["avg_energy_pct", "experiment_run", "number_obstacles"])
        if dfc.empty or dfc["experiment_run"].nunique() < 2 or dfc["number_obstacles"].nunique() < 2:
            logger.info(f"[two_way_anova] Skipping {comp}: need ≥2 levels for both factors.")
            continue

        # OLS with interaction
        model = smf.ols("avg_energy_pct ~ C(experiment_run) * C(number_obstacles)", data=dfc).fit()
        aov = anova_lm(model, typ=2)  # Type II ANOVA
        logger.info(f"\n=== Two-way ANOVA (map × obstacles) | component={comp} ===\n{aov}")
        out[comp] = aov
    return out

def two_way_anova_map_obstacles_overall(experiment_run, prefix_f, logger):
    df = build_overall_energy_df_for_runs(experiment_run, prefix_f)
    if df.empty or df["experiment_run"].nunique() < 2 or df["number_obstacles"].nunique() < 2:
        logger.info("[two_way_anova_overall] Need ≥2 levels for both factors.")
        return None
    model = smf.ols("avg_energy_pct ~ C(experiment_run) * C(number_obstacles)", data=df).fit()
    aov = anova_lm(model, typ=2)
    logger.info("\n=== Two-way ANOVA (map × obstacles) | overall energy ===\n%s", aov)
    return aov


def statistical_tests():
    global component
    global algo
    global d_folder

    # Setting Up Environment Variables
    experiment_run = ["small_map", "large_map"]
    components = ["controller", "planner"]
    prefix_f = {"small_map": "data/greenros_reconf_world_small_voxel/",
                "default": "data/greenros_reconf_world_large_voxel_default/",
                "large_map": "data/greenros_reconf_world_large_voxel/"}
    number_obstacles = [0, 2]

    # for run in experiment_run:
    #     logger.info(f'Map: {run}')
    #     s_folder = prefix_f[run]
    #     loader = LoadData(num_rows=None, s_folder=s_folder, algo=None)
    #     df_ctrl = loader.load_power("controller", transform=False, outliers=False)
    #     df_plan = loader.load_power("planner", transform=False, outliers=False)
    for run in experiment_run:
        s_folder = prefix_f[run]
        loader = LoadData(num_rows=None, s_folder=s_folder, algo="nav2")

        for comp in components:
            df_power = loader.load_power(component=comp, transform=False, outliers=False)
            if df_power.empty:
                logger.info(f"[skip] no power data for {run} / {comp}")
                continue

            # Effect of configuration
            logger.info(f"\n=== {run} | {comp} | effect of configuration ===")
            choose_and_run_tests(df_power, key="configuration", value_col="avg_energy_pct")

            # Effect of obstacles
            if "number_obstacles" in df_power.columns:
                logger.info(f"\n=== {run} | {comp} | effect of number_obstacles ===")
                choose_and_run_tests(df_power, key="number_obstacles", value_col="avg_energy_pct")

        # Overall energy (machine-level)
        df_machine = loader.load_machine_power(transform=False, outliers=False)
        if not df_machine.empty:
            logger.info(f"\n=== {run} | overall energy | configuration ===")
            choose_and_run_tests(df_machine, key="configuration", value_col="avg_energy_pct")
            if "number_obstacles" in df_machine.columns:
                logger.info(f"\n=== {run} | overall energy | number_obstacles ===")
                choose_and_run_tests(df_machine, key="number_obstacles", value_col="avg_energy_pct")

    logger.info("### SMALL vs LARGE comparisons ###")
    _ = compare_runs_by_component(experiment_run, components, prefix_f, logger)
    _ = compare_runs_overall_energy(experiment_run, prefix_f, logger)

    logger.info("### Two-way ANOVA (map × obstacles) ###")
    _ = two_way_anova_map_obstacles_power(experiment_run, components, prefix_f, logger)
    _ = two_way_anova_map_obstacles_overall(experiment_run, prefix_f, logger)


if __name__ == "__main__":
    logger.info('Running statistical tests...')
    statistical_tests()
        

2025-09-08 17:37:04,424 - INFO - Running statistical tests...
2025-09-08 17:37:04,993 - INFO - 
=== small_map | controller | effect of configuration ===
2025-09-08 17:37:04,995 - INFO - [Shapiro] 0: W=0.778, p=2.472e-06 → non-normal
2025-09-08 17:37:04,996 - INFO - [Shapiro] 1: W=0.727, p=2.733e-07 → non-normal
2025-09-08 17:37:04,996 - INFO - [Shapiro] 2: W=0.690, p=6.816e-08 → non-normal
2025-09-08 17:37:04,997 - INFO - [Shapiro] 3: W=0.708, p=1.332e-07 → non-normal
2025-09-08 17:37:04,997 - INFO - [Shapiro] 4: W=0.800, p=6.732e-06 → non-normal
2025-09-08 17:37:04,997 - INFO - [Shapiro] 5: W=0.941, p=0.03731 → non-normal
2025-09-08 17:37:04,998 - INFO - [Shapiro] 6: W=0.978, p=0.6029 → normal
2025-09-08 17:37:04,998 - INFO - [Shapiro] 7: W=0.671, p=3.347e-08 → non-normal
2025-09-08 17:37:04,999 - INFO - [Shapiro] 8: W=0.591, p=2.348e-09 → non-normal
2025-09-08 17:37:04,999 - INFO - [Shapiro] 9: W=0.680, p=4.599e-08 → non-normal
2025-09-08 17:37:04,999 - INFO - [Shapiro] 10: W=0.743, 

AttributeError: 'SimpleTable' object has no attribute 'to_string'