In [1]:
import numpy as np
import pandas as pd
from scipy import stats

In [2]:
from google.colab import files
uploaded = files.upload()

INPUT_PATH = next(fn for fn in uploaded.keys() if fn.lower().endswith(('.xlsx', '.xls')))
print(f"[INFO] 사용 파일: {INPUT_PATH}")

Saving 202501_clean2.xlsx to 202501_clean2.xlsx
[INFO] 사용 파일: 202501_clean2.xlsx


In [3]:
# 대상 금속
TARGET_METALS = ['Cr(ng/m3)', 'Co(ng/m3)', 'Ni(ng/m3)',
                 'As(ng/m3)', 'Cd(ng/m3)', 'Sb(ng/m3)', 'Pb(ng/m3)']

# 출력 리스트
DIST_LIST = ["로그 정규","감마","최대 극값","와이블","로지스틱","삼각형","스튜던트의 t",
             "정규","최소 극값","지수","파레토","균일","BetaPERT","베타"]

OUTPUT_PATH = "Tx-적합도.xlsx"

In [4]:
# 유틸/검정 함수
def get_distributions_extended():
    return {"로그 정규":   stats.lognorm,
            "감마":       stats.gamma,
            "최대 극값":   stats.gumbel_r,
            "와이블":     stats.weibull_min,
            "로지스틱":    stats.logistic,
            "삼각형":     stats.triang,
            "스튜던트의 t": stats.t,
            "정규":       stats.norm,
            "최소 극값":   stats.gumbel_l,
            "지수":       stats.expon,
            "파레토":     stats.pareto,
            "균일":       stats.uniform,
            "베타":       stats.beta}

def select_data_for_dist(x: np.ndarray, name: str) -> np.ndarray:
    x = x[~np.isnan(x)]
    if name in ("로그 정규","감마","와이블","지수","파레토","베타","BetaPERT"):
        x = x[x > 0]
    return x

def fit_params(dist, x: np.ndarray, name: str):
    try:
        if name in ("로그 정규","감마","와이블","지수","파레토"):
            try:
                params = dist.fit(x, floc=0)  # 먼저 loc=0으로 고정 시도
            except Exception:
                params = dist.fit(x)
        else:
            params = dist.fit(x)
        return params, None
    except Exception as e:
        return None, f"fit_error: {e}"

def ks_test(dist, params, x: np.ndarray):
    try:
        D, p = stats.kstest(x, dist.cdf, args=params)
        return float(D), float(p), None
    except Exception as e:
        return np.nan, np.nan, f"ks_error: {e}"

def chi_square_gof(dist, params, x: np.ndarray, min_expected=5, bins_min=5, bins_max=20):
    n = x.size
    if n < 10:
        return np.nan, np.nan, "chi_error: n<10"
    bins0 = int(np.clip(int(np.sqrt(n)), bins_min, bins_max))
    try:
        hist, edges = np.histogram(x, bins=bins0)
    except Exception as e:
        return np.nan, np.nan, f"chi_hist_error: {e}"

    cdfs = dist.cdf(edges, *params)
    expected = n * np.diff(cdfs)

    counts = hist.astype(float).tolist()
    expct  = expected.astype(float).tolist()
    edges_list = edges.tolist()

    def merge_bins(counts, expct, edges_list):
        i = 0
        while i < len(expct):
            if expct[i] < min_expected:
                if i == 0:
                    counts[i+1] += counts[i]; expct[i+1] += expct[i]
                    edges_list.pop(i+1); counts.pop(i); expct.pop(i)
                elif i == len(expct) - 1:
                    counts[i-1] += counts[i]; expct[i-1] += expct[i]
                    edges_list.pop(i); counts.pop(i); expct.pop(i); i -= 1
                else:
                    if expct[i-1] < expct[i+1]:
                        counts[i-1] += counts[i]; expct[i-1] += expct[i]
                        edges_list.pop(i); counts.pop(i); expct.pop(i); i -= 1
                    else:
                        counts[i+1] += counts[i]; expct[i+1] += expct[i]
                        edges_list.pop(i+1); counts.pop(i); expct.pop(i)
            else:
                i += 1
        return counts, expct, edges_list

    try:
        counts, expct, edges_list = merge_bins(counts, expct, edges_list)
        if any(e < min_expected for e in expct) or len(expct) < 2:
            return np.nan, np.nan, "chi_error: insufficient expected after merge"
        obs = np.array(counts, dtype=float)
        exp = np.array(expct, dtype=float)
        chi_stat = ((obs - exp) ** 2 / exp).sum()
        dof = max(len(exp) - 1 - (len(params)), 1)
        pval = 1 - stats.chi2.cdf(chi_stat, dof)
        return float(chi_stat), float(pval), None
    except Exception as e:
        return np.nan, np.nan, f"chi_error: {e}"

def anderson_darling_stat(dist, params, x: np.ndarray):
    x = np.sort(x)
    n = x.size
    if n < 5:
        return np.nan, np.nan, "ad_error: n<5"
    F = dist.cdf(x, *params)
    F = np.clip(F, 1e-12, 1-1e-12)
    i = np.arange(1, n+1)
    A2 = -n - np.mean((2*i - 1) * (np.log(F) + np.log(1 - F[::-1])))
    return float(A2), np.nan, None  # p-값은 --- 처리

# BetaPERT: (min, mode, max)을 Beta(loc=a, scale=b-a) 매핑
def betapert_alpha_beta(a, m, b, lamb=4.0):
    if not (a < m < b):
        return None, None
    alpha = 1.0 + lamb * (m - a) / (b - a)
    beta  = 1.0 + lamb * (b - m) / (b - a)
    if alpha <= 0 or beta <= 0:
        return None, None
    return alpha, beta

def fit_betapert(x: np.ndarray, lamb=4.0):
    a = float(np.min(x)); b = float(np.max(x))
    hist, edges = np.histogram(x, bins='auto')
    idx = int(np.argmax(hist))
    m = float(0.5 * (edges[idx] + edges[idx+1]))
    alpha, beta = betapert_alpha_beta(a, m, b, lamb=lamb)
    if alpha is None:
        return None, "betapert_error: invalid (a,m,b)", None
    return (alpha, beta, a, (b - a)), None, m  # beta 포맷, 에러없음, 모드 m 반환

In [5]:
# 매개변수 라벨링
import math
def fmt(x, digits=6):
    try:
        return f"{float(x):.{digits}g}"
    except Exception:
        return str(x)

def describe_params_leftstyle(name: str, params: tuple, extra=None) -> str:
    if name == "정규":
        loc, scale = params
        return f"평균={fmt(loc)}, 표준 편 차={fmt(scale)}"
    if name == "로지스틱":
        loc, scale = params
        return f"평균={fmt(loc)}, 스케일={fmt(scale)}"
    if name in ("최대 극값","최소 극값"):
        loc, scale = params
        return f"최고가능성={fmt(loc)}, 스케일={fmt(scale)}"
    if name == "로그 정규":
        s, loc, scale = params  # scale=exp(mu)
        mu_log = math.log(scale) if scale > 0 else float("nan")
        # shift 포함 평균/표준편차
        mean = loc + math.exp(mu_log + s*s/2.0)
        std  = math.sqrt((math.exp(s*s)-1.0) * math.exp(2*mu_log + s*s))
        return f"평균={fmt(mean)}, 표준 편 차={fmt(std)}, 위치={fmt(loc)}"
    if name == "와이블":
        c, loc, scale = params
        return f"위치={fmt(loc)}, 스케일={fmt(scale)}, 형태={fmt(c)}"
    if name == "감마":
        a, loc, scale = params
        return f"위치={fmt(loc)}, 스케일={fmt(scale)}, 형태={fmt(a)}"
    if name == "지수":
        loc, scale = params
        lam = (1.0/scale) if (scale not in (0, float('inf'))) else float('nan')
        return f"비율={fmt(lam)}"
    if name == "파레토":
        b, loc, scale = params
        return f"위치={fmt(loc)}, 형태={fmt(b)}"
    if name == "균일":
        loc, scale = params
        a = loc; b = loc + scale
        return f"최소={fmt(a)}, 최대={fmt(b)}"
    if name == "스튜던트의 t":
        df, loc, scale = params
        return f"중간점={fmt(loc)}, 스케일={fmt(scale)}, 자유도={fmt(df)}"
    if name == "삼각형":
        c, loc, scale = params
        a = loc; b = loc + scale; mode = a + c*(b-a)
        return f"최소={fmt(a)}, 최고가능성={fmt(mode)}, 최대={fmt(b)}"
    if name == "베타":
        a, b, loc, scale = params
        left = loc; right = loc + scale
        return f"최소={fmt(left)}, 최대={fmt(right)}, 알파={fmt(a)}, 베타={fmt(b)}"
    if name == "BetaPERT":
        a, b, loc, scale = params
        left = loc; right = loc + scale

        lamb = 4.0
        mode = left + (a - 1.0)/lamb * (right - left)
        return f"최소={fmt(left)}, 최고가능성={fmt(mode)}, 최대={fmt(right)}"
    return ", ".join([f"p{i}={fmt(p)}" for i,p in enumerate(params, start=1)])

In [6]:
def sort_by_goodness(res_df: pd.DataFrame, alpha_ks: float = 0.05, alpha_chi: float = 0.05) -> pd.DataFrame:
    tmp = res_df.copy()
    for c in ["A-D","A-D P-값","K-S","K-S P-값","카이제곱","카이제곱 P-값"]:
        tmp[c] = pd.to_numeric(tmp[c], errors="coerce")

    # 결측 패널티
    tmp["_ks_p"]  = tmp["K-S P-값"].fillna(-1)
    tmp["_chi_p"] = tmp["카이제곱 P-값"].fillna(-1)
    tmp["_ad"]    = tmp["A-D"].fillna(np.inf)
    tmp["_ks"]    = tmp["K-S"].fillna(np.inf)
    tmp["_chi"]   = tmp["카이제곱"].fillna(np.inf)

    # 그룹: 0=KS&χ² 통과, 1=KS만 통과, 2=둘 다 미통과
    ks_pass  = tmp["_ks_p"]  >= alpha_ks
    chi_pass = tmp["_chi_p"] >= alpha_chi
    tmp["_group"] = np.where(ks_pass & chi_pass, 0, np.where(ks_pass, 1, 2))

     # 그룹 -> KS p(내림) -> χ² p(내림) -> A-D(오름) -> K-S(오름) -> χ²(오름)
    tmp = tmp.sort_values(
        by=["_group","_ks_p","_chi_p","_ad","_ks","_chi"],
        ascending=[ True,   False,   False,   True, True, True])
    return tmp.drop(columns=["_group","_ks_p","_chi_p","_ad","_ks","_chi"])

In [7]:
# 데이터 로드 & 컬럼 체크
df = pd.read_excel(INPUT_PATH)
df.columns = (df.columns
              .str.replace('\u200b', '', regex=False)  # 제로폭문자 제거
              .str.strip())

name_map = {'Cr (ng/m3)': 'Cr(ng/m3)','Cr(ng/m³)': 'Cr(ng/m3)',
            'Co (ng/m3)': 'Co(ng/m3)','Co(ng/m³)': 'Co(ng/m3)',
            'Ni (ng/m3)': 'Ni(ng/m3)','Ni(ng/m³)': 'Ni(ng/m3)',
            'As (ng/m3)': 'As(ng/m3)','As(ng/m³)': 'As(ng/m3)',
            'Cd (ng/m3)': 'Cd(ng/m3)','Cd(ng/m³)': 'Cd(ng/m3)',
            'Sb (ng/m3)': 'Sb(ng/m3)','Sb(ng/m³)': 'Sb(ng/m3)',
            'Pb (ng/m3)': 'Pb(ng/m3)','Pb(ng/m³)': 'Pb(ng/m3)',}

# 매핑 적용
df.rename(columns=name_map, inplace=True)

# 누락 체크
missing = [m for m in TARGET_METALS if m not in df.columns]
if missing:
    raise ValueError(f"원본에 없는 컬럼: {missing}")

In [8]:
# 적합도 계산 및 금속별 정렬
dist_map = get_distributions_extended()
results_by_metal = {}

for metal in TARGET_METALS:
    raw = pd.to_numeric(df[metal], errors='coerce').to_numpy()
    recs = []

    for name in DIST_LIST:
        x = select_data_for_dist(raw.copy(), name)
        if x.size < 5:
            recs.append([name, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, "표본수 부족(n<5)"])
            continue

        if name == "BetaPERT":
            params, fit_err, mode_est = fit_betapert(x, lamb=4.0)
            dist = stats.beta
        else:
            dist = dist_map[name]
            params, fit_err = fit_params(dist, x, name)
            mode_est = None

        if fit_err or params is None:
            recs.append([name, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, fit_err or "fit_error"])
            continue

        ad_stat, ad_p, _    = anderson_darling_stat(dist, params, x)
        ks_D, ks_p, _       = ks_test(dist, params, x)
        chi_stat, chi_p, _  = chi_square_gof(dist, params, x)

        param_str = describe_params_leftstyle(name, params, extra={"mode_est": mode_est})
        ad_p_str = np.nan  # A-D p-값은 미계산 '---' 표시 예정

        recs.append([name, ad_stat, ad_p_str, ks_D, ks_p, chi_stat, chi_p, param_str])

    res_df = pd.DataFrame(
        recs,
        columns=["분포","A-D","A-D P-값","K-S","K-S P-값","카이제곱","카이제곱 P-값","매개 변수"])

    # 금속별 정렬 적용
    res_df_sorted = sort_by_goodness(res_df, alpha_ks=0.05, alpha_chi=0.05)
    results_by_metal[metal] = res_df_sorted

  x = np.asarray((x - loc)/scale, dtype=dtyp)
  x = np.asarray((x - loc)/scale, dtype=dtyp)
  sk = 2*(b-a)*np.sqrt(a + b + 1) / (a + b + 2) / np.sqrt(a*b)


In [9]:
# 엑셀 레이아웃 생성
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.worksheet.table import Table, TableStyleInfo

wb = Workbook()
ws = wb.active
ws.title = "Tx-적합도"

thin = Side(style="thin", color="000000")
border_all = Border(top=thin, bottom=thin, left=thin, right=thin)

header_fill = PatternFill("solid", fgColor="305496")  # 남색 헤더
header_font = Font(bold=True, color="FFFFFF")
metal_font  = Font(bold=True)
wrap = Alignment(wrapText=True, vertical="top")

# 컬럼 폭
ws.column_dimensions["B"].width = 14   # 분포
ws.column_dimensions["C"].width = 9
ws.column_dimensions["D"].width = 11
ws.column_dimensions["E"].width = 9
ws.column_dimensions["F"].width = 11
ws.column_dimensions["G"].width = 11
ws.column_dimensions["H"].width = 13
ws.column_dimensions["I"].width = 55   # 매개 변수
ws.column_dimensions["A"].width = 9    # 금속 레이블

r = 1  # 현재 행
table_idx = 1

def metal_short(m):
    # "Fe(ng/m3)"는 "Fe"
    return m.split("(")[0].strip()

for metal in TARGET_METALS:
    # 금속 레이블
    ws.cell(row=r, column=1, value=metal_short(metal)).font = metal_font
    r += 1

    # 섹션 헤더
    headers = ["분포","A-D","A-D P-값","K-S","K-S P-값","카이제곱","카이제곱 P-값","매개 변수"]
    for j, h in enumerate(headers, start=2):
        c = ws.cell(row=r, column=j, value=h)
        c.fill = header_fill; c.font = header_font; c.alignment = Alignment(horizontal="center")
        c.border = border_all

    # 데이터 행
    df_sec = results_by_metal[metal].copy()
    # NaN → '---' 변환(표시용)
    df_sec = df_sec.replace({np.nan: '---'})

    start_row = r + 1
    for i in range(len(df_sec)):
        rowvals = df_sec.iloc[i].tolist()
        for j, v in enumerate(rowvals, start=2):
            cell = ws.cell(row=start_row + i, column=j, value=v)
            cell.border = border_all
            if j == 9:
                cell.alignment = wrap

    end_row = start_row + len(df_sec) - 1

    # 섹션별 테이블
    tbl_ref = f"B{r}:I{end_row if end_row>=r else r}"
    table = Table(displayName=f"T_{table_idx}", ref=tbl_ref)
    style = TableStyleInfo(name="TableStyleMedium9", showFirstColumn=False,
                           showLastColumn=False, showRowStripes=True, showColumnStripes=False)
    table.tableStyleInfo = style
    ws.add_table(table)
    table_idx += 1

    # 섹션 간 공백줄
    r = end_row + 2

# 숫자 서식
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, min_col=3, max_col=8):
    for cell in row:
        if isinstance(cell.value, (float, int)):
            cell.number_format = "0.000000"

In [10]:
# 엑셀 저장 및 다운
wb.save(OUTPUT_PATH)
print("[완료] 저장:", OUTPUT_PATH)
files.download(OUTPUT_PATH)

[완료] 저장: Tx-적합도.xlsx


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>