In [None]:
## 데이터 전처리 ##

In [6]:
import pandas as pd
import numpy as np
import io, os, re
from google.colab import files

pd.options.display.float_format = "{:.6g}".format

# =========================
# 설정값
# =========================
CONFIRM_DETECTED_COLUMNS = True

# 음수→NaN
NEGATIVE_TO_NAN = True

# IQR 셀-마스킹 적용 + 완화 기준(IQR_K=3.0)
APPLY_IQR_FILTER = True
IQR_K = 3.0  # Tukey 기본 1.5보다 완화 → 피크 이벤트 보존

# 대상 전부 NaN인 행만 삭제
DROP_EMPTY_TARGET = True

# =========================
# 파일 업로드(Colab 위젯) + 메모리 직접 로드
# =========================
def _safe_basename(name: str) -> str:
    base, ext = os.path.splitext(name)
    base = re.sub(r"[^\w\-]+", "_", base).strip("_")
    return base, ext

print("엑셀 파일(.xlsx) 1개 이상 업로드하세요.")
uploaded = files.upload()
if not uploaded:
    raise RuntimeError("업로드된 파일이 없습니다. 다시 실행하여 업로드하세요.")

xlsx_names = [k for k in uploaded.keys() if k.lower().endswith(".xlsx")]
if not xlsx_names:
    raise RuntimeError("업로드된 파일 중 .xlsx 확장자가 없습니다.")

src_name = xlsx_names[0]
print("Uploaded (in-memory):", src_name)

# 메모리에서 바로 DataFrame 로드
df_raw = pd.read_excel(io.BytesIO(uploaded[src_name]))
df_raw.columns = [str(c).strip() for c in df_raw.columns]

# 원본 파일명 기반 산출물 경로 자동 설정
_base, _ = _safe_basename(src_name)
RAW_REPORT_PATH   = f"{_base}_QA_raw.xlsx"     # 전처리 전 QA
CLEAN_REPORT_PATH = f"{_base}_QA_clean.xlsx"   # 전처리 후 QA
CLEAN_DATA_PATH   = f"{_base}_clean.xlsx"      # 전처리 후 데이터(엑셀)
CLEAN_DATA_CSV    = f"{_base}_clean.csv"       # 전처리 후 데이터(CSV)

print("Output paths set:")
print(" RAW_REPORT_PATH   =", RAW_REPORT_PATH)
print(" CLEAN_REPORT_PATH =", CLEAN_REPORT_PATH)
print(" CLEAN_DATA_PATH   =", CLEAN_DATA_PATH)
print(" CLEAN_DATA_CSV    =", CLEAN_DATA_CSV)

# =========================
# 유틸: 컬럼 자동 검출
# =========================
def detect_columns(df: pd.DataFrame):
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]

    # 시간 후보
    time_candidates = [c for c in df.columns
                       if any(k in c.lower() for k in ["pump-begin", "date", "datetime", "time", "측정일시"])]
    time_col = time_candidates[0] if len(time_candidates) > 0 else None

    # PM2.5 후보
    pm25_candidates = [c for c in df.columns
                       if ("pm2.5" in c.lower())
                       or ("con(ug/m3)" in c.lower())
                       or ("ug/m3" in c.lower() and "con" in c.lower())]
    pm25_col = pm25_candidates[0] if len(pm25_candidates) > 0 else None

    # 금속 단위 패턴
    metal_markers = ["(ng/m3)", "(ng/㎥)", "ng/m3", "ng/㎥"]
    metal_cols = [c for c in df.columns if any(m.lower() in c.lower() for m in metal_markers)]

    # 숫자형 후보(참고용)
    numeric_cols = []
    for c in df.columns:
        ser = pd.to_numeric(df[c], errors="coerce")
        if ser.notna().mean() >= 0.7:
            numeric_cols.append(c)

    return {"time_col": time_col, "pm25_col": pm25_col,
            "metal_cols": metal_cols, "numeric_cols": numeric_cols}

# =========================
# 로드 & 자동 검출 결과 출력
# =========================
detected = detect_columns(df_raw)
time_col   = detected["time_col"]
pm25_col   = detected["pm25_col"]
metal_cols = detected["metal_cols"]
numeric_cols = detected["numeric_cols"]

print("Detected time_col:", time_col)
print("Detected pm25_col:", pm25_col)
print("Detected metal_cols (first 10):", metal_cols[:10])
print("Detected numeric_cols (first 10):", numeric_cols[:10])

if not CONFIRM_DETECTED_COLUMNS:
    raise RuntimeError("자동 검출된 컬럼 확인 후, 맞으면 CONFIRM_DETECTED_COLUMNS=True로 설정하세요.")

# =========================
# QA 리포트 함수
# =========================
def summarize_series(s: pd.Series, name: str):
    s_num = pd.to_numeric(s, errors="coerce")
    return pd.Series({
        "col": name,
        "count_total": int(s_num.shape[0]),
        "count_valid": int(s_num.notna().sum()),
        "valid_pct": float(100.0 * s_num.notna().mean()),
        "mean": float(s_num.mean(skipna=True)) if s_num.notna().any() else np.nan,
        "std": float(s_num.std(skipna=True)) if s_num.notna().any() else np.nan,
        "p5": float(s_num.quantile(0.05)) if s_num.notna().any() else np.nan,
        "p50": float(s_num.quantile(0.50)) if s_num.notna().any() else np.nan,
        "p95": float(s_num.quantile(0.95)) if s_num.notna().any() else np.nan,
        "min": float(s_num.min(skipna=True)) if s_num.notna().any() else np.nan,
        "max": float(s_num.max(skipna=True)) if s_num.notna().any() else np.nan})

def build_qc_table(df: pd.DataFrame, targets: list):
    rows = []
    for c in targets:
        if c in df.columns:
            rows.append(summarize_series(df[c], c))
    return pd.DataFrame(rows)

# =========================
# 대상 컬럼 정의
# =========================
target_cols = []
if pm25_col:
    target_cols.append(pm25_col)
if isinstance(metal_cols, (list, tuple)):
    target_cols += list(metal_cols)
target_cols = [c for c in target_cols if c in df_raw.columns]
assert len(target_cols) > 0, "PM2.5 또는 금속 대상 컬럼을 찾지 못했습니다."

# =========================
# QA 리포트(전처리 전)
# =========================
qc_raw = build_qc_table(df_raw, target_cols)
with pd.ExcelWriter(RAW_REPORT_PATH, engine="openpyxl") as w:
    qc_raw.to_excel(w, index=False, sheet_name="RAW_QA")

# =========================
# 전처리
# =========================
df_clean = df_raw.copy()

# 시간 정렬
if (time_col is not None) and (time_col in df_clean.columns):
    df_clean[time_col] = pd.to_datetime(df_clean[time_col], errors="coerce")
    df_clean = df_clean.sort_values(time_col).reset_index(drop=True)

# 대상만 숫자화(별도 프레임 보관)
num_targets = df_clean[target_cols].apply(pd.to_numeric, errors="coerce")

# --- 음수 → NaN ---
neg_before = int((num_targets < 0).sum().sum())
if NEGATIVE_TO_NAN:
    num_targets = num_targets.mask(num_targets < 0, np.nan)

# --- IQR 셀-마스킹 (행 삭제 금지) ---
cells_masked = 0
if APPLY_IQR_FILTER:
    Q1 = num_targets.quantile(0.25)
    Q3 = num_targets.quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - IQR_K * IQR
    upper = Q3 + IQR_K * IQR

    low_mask  = num_targets.lt(lower, axis=1)
    high_mask = num_targets.gt(upper, axis=1)
    outlier_mask = low_mask | high_mask

    cells_masked = int(outlier_mask.sum().sum())
    num_targets = num_targets.mask(outlier_mask, np.nan)

# 대상 컬럼 적용
df_clean[target_cols] = num_targets

# --- 대상 전부 NaN 행 삭제 ---
rows_before_drop_empty = len(df_clean)
if DROP_EMPTY_TARGET:
    df_clean = df_clean.dropna(subset=target_cols, how="all").reset_index(drop=True)
rows_dropped_empty = rows_before_drop_empty - len(df_clean)

neg_after = int((df_clean[target_cols].apply(pd.to_numeric, errors="coerce") < 0).sum().sum())

# =========================
# 요약 출력
# =========================
print("=== Cleaning Summary ===")
print(f"Target columns (n={len(target_cols)}): {target_cols[:6]}{' ...' if len(target_cols)>6 else ''}")
print(f"Negatives (before -> after): {neg_before} -> {neg_after}")
print(f"IQR masked cells (k={IQR_K}): {cells_masked}")
print(f"Empty-target dropped rows: {rows_dropped_empty}")
print(f"Shape: raw {df_raw.shape} -> clean {df_clean.shape}")

# =========================
# QA 리포트(전처리 후)
# =========================
qc_clean = build_qc_table(df_clean, target_cols)
with pd.ExcelWriter(CLEAN_REPORT_PATH, engine="openpyxl") as w:
    qc_clean.to_excel(w, index=False, sheet_name="CLEAN_QA")

# =========================
# 저장 + 다운로드
# =========================
df_clean.to_excel(CLEAN_DATA_PATH, index=False)
df_clean.to_csv(CLEAN_DATA_CSV, index=False)

files.download(CLEAN_DATA_PATH)
files.download(CLEAN_REPORT_PATH)
files.download(RAW_REPORT_PATH)

# CSV 필요 시 활성화
# files.download(CLEAN_DATA_CSV)

엑셀 파일(.xlsx) 1개 이상 업로드하세요.


Saving 202501_04.xlsx to 202501_04 (1).xlsx
Uploaded (in-memory): 202501_04 (1).xlsx
Output paths set:
 RAW_REPORT_PATH   = 202501_04_1_QA_raw.xlsx
 CLEAN_REPORT_PATH = 202501_04_1_QA_clean.xlsx
 CLEAN_DATA_PATH   = 202501_04_1_clean.xlsx
 CLEAN_DATA_CSV    = 202501_04_1_clean.csv
Detected time_col: Pump-Begin
Detected pm25_col: Conc(ug/m3)
Detected metal_cols (first 10): ['Cr(ng/m3)', 'Co(ng/m3)', 'Ni(ng/m3)', 'As(ng/m3)', 'Cd(ng/m3)', 'Sb(ng/m3)', 'Pb(ng/m3)']
Detected numeric_cols (first 10): ['Pump-Begin', 'Pump-End', 'MassResetTime', 'Conc(ug/m3)', 'Cr(ng/m3)', 'Co(ng/m3)', 'Ni(ng/m3)', 'As(ng/m3)', 'Cd(ng/m3)', 'Sb(ng/m3)']
=== Cleaning Summary ===
Target columns (n=8): ['Conc(ug/m3)', 'Cr(ng/m3)', 'Co(ng/m3)', 'Ni(ng/m3)', 'As(ng/m3)', 'Cd(ng/m3)'] ...
Negatives (before -> after): 1817 -> 0
IQR masked cells (k=3.0): 62
Empty-target dropped rows: 1
Shape: raw (1683, 11) -> clean (1682, 11)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
## 데이터 전처리 파일 여러개일 때 ##

In [10]:
import pandas as pd
import numpy as np
import io, os, re
from google.colab import files

pd.options.display.float_format = "{:.6g}".format

# =========================
# 금속 선택 옵션
# =========================
USE_METAL_FILTER = True  # True면 아래 목록에 있는 금속만 사용
METAL_NAME_FILTER = ["Cr", "Co", "Ni", "As", "Sb", "Pb"]  # 원하는 금속 기호만 적기

# =========================
# 설정값
# =========================
CONFIRM_DETECTED_COLUMNS = True

# 음수→NaN
NEGATIVE_TO_NAN = True

# IQR 셀-마스킹 적용 + 완화 기준(IQR_K=3.0)
APPLY_IQR_FILTER = True
IQR_K = 3.0  # Tukey 기본 1.5보다 완화 → 피크 이벤트 보존

# 대상 전부 NaN인 행만 삭제
DROP_EMPTY_TARGET = True

# =========================
# 유틸 함수들
# =========================
def _safe_basename(name: str) -> str:
    base, ext = os.path.splitext(name)
    base = re.sub(r"[^\w\-]+", "_", base).strip("_")
    return base, ext

def normalize_colname(c: str) -> str:
    """
    컬럼명 정규화:
    - 양쪽 공백 제거
    - 중간 공백 제거 (As (ng/m3) → As(ng/m3))
    - ㎥를 m3로 통일 (ng/㎥ → ng/m3)
    """
    c2 = str(c).strip()
    c2 = c2.replace(" ", "")
    c2 = c2.replace("㎥", "m3")
    return c2

# =========================
# 파일 업로드 + 여러 파일 병합
# =========================
print("엑셀 파일(.xlsx) 1개 이상 업로드하세요.")
uploaded = files.upload()
if not uploaded:
    raise RuntimeError("업로드된 파일이 없습니다. 다시 실행하여 업로드하세요.")

# .xlsx 파일 목록 추출
xlsx_names = [k for k in uploaded.keys() if k.lower().endswith(".xlsx")]
if not xlsx_names:
    raise RuntimeError("업로드된 파일 중 .xlsx 확장자가 없습니다.")

print(f"업로드된 .xlsx 파일 목록 ({len(xlsx_names)}개):")
for name in xlsx_names:
    print(" -", name)

# 여러 파일을 모두 읽어서 하나의 DataFrame으로 병합
df_list = []
for name in xlsx_names:
    print(f"Reading: {name}")
    bytes_io = io.BytesIO(uploaded[name])
    df_tmp = pd.read_excel(bytes_io)
    # 컬럼명 정규화 적용
    df_tmp.columns = [normalize_colname(c) for c in df_tmp.columns]
    df_list.append(df_tmp)

# 세로 병합 (row-wise)
df_raw = pd.concat(df_list, axis=0, ignore_index=True)

# =========================
# 불필요 컬럼 지정 삭제
#  - 예: "Number-of-Split", "Alarms" 등
# =========================
DROP_COL_EXACT = ["Number-of-split","Alarms", "ElemError", "BP_Mass(ugC)", "BP_Conc(ugC/m3)"] # 여기에 더 추가하면 됨

# 부분 문자열로 걸러서 지우고 싶을 때 사용
DROP_COL_CONTAINS = ["number-of-split","alarms", "elemerror", "bp_mass(ugC)", "bp_conc(ugC/m3)"]

cols_to_drop = []
for c in df_raw.columns:
    c_lower = c.lower()
    if (c in DROP_COL_EXACT) or any(key in c_lower for key in DROP_COL_CONTAINS):
        cols_to_drop.append(c)

print("Dropping extra non-needed columns:", cols_to_drop)
df_raw = df_raw.drop(columns=cols_to_drop, errors="ignore")


# 원본 파일명 기반 산출물 경로 자동 설정
# - 1개면 그 파일명 기준
# - 2개 이상이면 첫 파일명 + "_merged"
if len(xlsx_names) == 1:
    base0, _ = _safe_basename(xlsx_names[0])
    _base = base0
else:
    base0, _ = _safe_basename(xlsx_names[0])
    _base = f"{base0}_merged"

RAW_REPORT_PATH   = f"{_base}_QA_raw.xlsx"     # 전처리 전 QA
CLEAN_REPORT_PATH = f"{_base}_QA_clean.xlsx"   # 전처리 후 QA
CLEAN_DATA_PATH   = f"{_base}_clean.xlsx"      # 전처리 후 데이터(엑셀)
CLEAN_DATA_CSV    = f"{_base}_clean.csv"       # 전처리 후 데이터(CSV)

print("Output paths set:")
print(" RAW_REPORT_PATH   =", RAW_REPORT_PATH)
print(" CLEAN_REPORT_PATH =", CLEAN_REPORT_PATH)
print(" CLEAN_DATA_PATH   =", CLEAN_DATA_PATH)
print(" CLEAN_DATA_CSV    =", CLEAN_DATA_CSV)

# =========================
# 유틸: 컬럼 자동 검출
# =========================
def detect_columns(df: pd.DataFrame):
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]

    # 시간 후보
    time_candidates = [
        c for c in df.columns
        if any(k in c.lower() for k in ["pump-begin", "date", "datetime", "time", "측정일시"])]
    time_col = time_candidates[0] if len(time_candidates) > 0 else None

    # PM2.5 후보
    pm25_candidates = [
        c for c in df.columns
        if ("pm2.5" in c.lower())
        or ("con(ug/m3)" in c.lower())
        or ("ug/m3" in c.lower() and "con" in c.lower())]
    pm25_col = pm25_candidates[0] if len(pm25_candidates) > 0 else None

    # 금속 단위 패턴
    metal_markers = ["(ng/m3)", "ng/m3", "(ng/㎥)", "ng/㎥"]
    metal_cols = [c for c in df.columns if any(m.lower() in c.lower() for m in metal_markers)]

    # 숫자형 후보(참고용)
    numeric_cols = []
    for c in df.columns:
        ser = pd.to_numeric(df[c], errors="coerce")
        if ser.notna().mean() >= 0.7:
            numeric_cols.append(c)

    return {
        "time_col": time_col,
        "pm25_col": pm25_col,
        "metal_cols": metal_cols,
        "numeric_cols": numeric_cols}

# =========================
# 로드 & 자동 검출 결과 출력
# =========================
detected = detect_columns(df_raw)
time_col   = detected["time_col"]
pm25_col   = detected["pm25_col"]
metal_cols = detected["metal_cols"]
numeric_cols = detected["numeric_cols"]

# 전체 금속 후보를 따로 보관
all_metal_candidates = metal_cols.copy()

# 지정 금속 필터 적용
if USE_METAL_FILTER and METAL_NAME_FILTER:
    metal_cols = [
        c for c in metal_cols
        if any(c.lower().startswith(sym.lower() + "(") for sym in METAL_NAME_FILTER)]

print("Detected time_col:", time_col)
print("Detected pm25_col:", pm25_col)
print("Detected metal_cols (first 10):", metal_cols[:10])
print("Detected numeric_cols (first 10):", numeric_cols[:10])

if not CONFIRM_DETECTED_COLUMNS:
    raise RuntimeError("자동 검출된 컬럼 확인 후, 맞으면 CONFIRM_DETECTED_COLUMNS=True로 설정하세요.")

# =========================
# QA 리포트 함수
# =========================
def summarize_series(s: pd.Series, name: str):
    s_num = pd.to_numeric(s, errors="coerce")
    return pd.Series({
        "col": name,
        "count_total": int(s_num.shape[0]),
        "count_valid": int(s_num.notna().sum()),
        "valid_pct": float(s_num.notna().mean()),
        "mean": float(s_num.mean(skipna=True)) if s_num.notna().any() else np.nan,
        "std": float(s_num.std(skipna=True)) if s_num.notna().any() else np.nan,
        "p5": float(s_num.quantile(0.05)) if s_num.notna().any() else np.nan,
        "p50": float(s_num.quantile(0.50)) if s_num.notna().any() else np.nan,
        "p95": float(s_num.quantile(0.95)) if s_num.notna().any() else np.nan,
        "min": float(s_num.min(skipna=True)) if s_num.notna().any() else np.nan,
        "max": float(s_num.max(skipna=True)) if s_num.notna().any() else np.nan})

def build_qc_table(df: pd.DataFrame, targets: list):
    rows = []
    for c in targets:
        if c in df.columns:
            rows.append(summarize_series(df[c], c))
    return pd.DataFrame(rows)

# =========================
# 대상 컬럼 정의
# =========================
target_cols = []
if pm25_col:
    target_cols.append(pm25_col)
if isinstance(metal_cols, (list, tuple)):
    target_cols += list(metal_cols)
target_cols = [c for c in target_cols if c in df_raw.columns]
assert len(target_cols) > 0, "PM2.5 또는 금속 대상 컬럼을 찾지 못했습니다."

# =========================
# 선택 금속 + PM2.5 외 모든 금속/컬럼 제거
# =========================
drop_metal_cols = [c for c in all_metal_candidates if c not in target_cols]

df_raw = df_raw.drop(columns=drop_metal_cols, errors="ignore")

print("Removed non-target metal columns:", drop_metal_cols)

# =========================
# QA 리포트(전처리 전)
# =========================
qc_raw = build_qc_table(df_raw, target_cols)
with pd.ExcelWriter(RAW_REPORT_PATH, engine="openpyxl") as w:
    qc_raw.to_excel(w, index=False, sheet_name="RAW_QA")

# =========================
# 전처리
# =========================
df_clean = df_raw.copy()

# 시간 정렬
if (time_col is not None) and (time_col in df_clean.columns):
    df_clean[time_col] = pd.to_datetime(df_clean[time_col], errors="coerce")
    df_clean = df_clean.sort_values(time_col).reset_index(drop=True)

# 대상만 숫자화(별도 프레임 보관)
num_targets = df_clean[target_cols].apply(pd.to_numeric, errors="coerce")

# --- 음수 → NaN ---
neg_before = int((num_targets < 0).sum().sum())
if NEGATIVE_TO_NAN:
    num_targets = num_targets.mask(num_targets < 0, np.nan)

# --- IQR 셀-마스킹 (행 삭제 금지) ---
cells_masked = 0
if APPLY_IQR_FILTER:
    Q1 = num_targets.quantile(0.25)
    Q3 = num_targets.quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - IQR_K * IQR
    upper = Q3 + IQR_K * IQR

    low_mask  = num_targets.lt(lower, axis=1)
    high_mask = num_targets.gt(upper, axis=1)
    outlier_mask = low_mask | high_mask

    cells_masked = int(outlier_mask.sum().sum())
    num_targets = num_targets.mask(outlier_mask, np.nan)

# 대상 컬럼 적용
df_clean[target_cols] = num_targets

# --- 대상 전부 NaN 행 삭제 ---
rows_before_drop_empty = len(df_clean)
if DROP_EMPTY_TARGET:
    df_clean = df_clean.dropna(subset=target_cols, how="all").reset_index(drop=True)
rows_dropped_empty = rows_before_drop_empty - len(df_clean)

neg_after = int((df_clean[target_cols].apply(pd.to_numeric, errors="coerce") < 0).sum().sum())

# =========================
# 요약 출력
# =========================
print("=== Cleaning Summary ===")
print(f"Target columns (n={len(target_cols)}): {target_cols[:6]}{' ...' if len(target_cols)>6 else ''}")
print(f"Negatives (before -> after): {neg_before} -> {neg_after}")
print(f"IQR masked cells (k={IQR_K}): {cells_masked}")
print(f"Empty-target dropped rows: {rows_dropped_empty}")
print(f"Shape: raw {df_raw.shape} -> clean {df_clean.shape}")

# =========================
# QA 리포트(전처리 후)
# =========================
qc_clean = build_qc_table(df_clean, target_cols)
with pd.ExcelWriter(CLEAN_REPORT_PATH, engine="openpyxl") as w:
    qc_clean.to_excel(w, index=False, sheet_name="CLEAN_QA")

# =========================
# 저장 + 다운로드
# =========================
df_clean.to_excel(CLEAN_DATA_PATH, index=False)
df_clean.to_csv(CLEAN_DATA_CSV, index=False)

files.download(CLEAN_DATA_PATH)
files.download(CLEAN_REPORT_PATH)
files.download(RAW_REPORT_PATH)

# CSV 필요 시 활성화
# files.download(CLEAN_DATA_CSV)

엑셀 파일(.xlsx) 1개 이상 업로드하세요.


Saving 202501.xlsx to 202501 (4).xlsx
Saving 202502.xlsx to 202502 (4).xlsx
Saving 202503.xlsx to 202503 (4).xlsx
Saving 202504.xlsx to 202504 (4).xlsx
업로드된 .xlsx 파일 목록 (4개):
 - 202501 (4).xlsx
 - 202502 (4).xlsx
 - 202503 (4).xlsx
 - 202504 (4).xlsx
Reading: 202501 (4).xlsx
Reading: 202502 (4).xlsx
Reading: 202503 (4).xlsx
Reading: 202504 (4).xlsx
Dropping extra non-needed columns: ['Number-of-Split', 'Alarms']
Output paths set:
 RAW_REPORT_PATH   = 202501_4_merged_QA_raw.xlsx
 CLEAN_REPORT_PATH = 202501_4_merged_QA_clean.xlsx
 CLEAN_DATA_PATH   = 202501_4_merged_clean.xlsx
 CLEAN_DATA_CSV    = 202501_4_merged_clean.csv
Detected time_col: Pump-Begin
Detected pm25_col: Conc(ug/m3)
Detected metal_cols (first 10): ['Cr(ng/m3)', 'Co(ng/m3)', 'Ni(ng/m3)', 'As(ng/m3)', 'Sb(ng/m3)', 'Pb(ng/m3)']
Detected numeric_cols (first 10): ['Pump-Begin', 'Pump-End', 'MassResetTime', 'Analysis-Id', 'Mass(ug)', 'Conc(ug/m3)', 'Al(ng/m3)', 'Si(ng/m3)', 'S(ng/m3)', 'K(ng/m3)']
Removed non-target metal colu

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
## 산점도 그래프 생성 ##

In [None]:
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from scipy.stats import pearsonr
from google.colab import files

# ===== 파일 업로드 =====
uploaded = files.upload()
fname = next(iter(uploaded.keys()))
df = pd.read_excel(fname)

# ===== 분석 대상 컬럼 추출 =====
unit_patterns = ['(ng/m3)', '(ug/m3)', '(µg/m³)', '(μg/m3)']
columns_to_analyze = [c for c in df.columns if any(p in str(c) for p in unit_patterns)]

# X축 후보
x_candidates = ['Conc(ug/m3)', 'PM2.5(ug/m3)', 'PM2.5 (ug/m3)', 'Con(ug/m3)']
xcol = next((c for c in x_candidates if c in df.columns), None)
if xcol is None:
    raise ValueError("X-axis column not found. Check the actual column name for PM2.5 mass.")

# Y축 후보
ycols = [c for c in columns_to_analyze if c != xcol]
if len(ycols) == 0:
    raise ValueError("No Y columns to plot. Check filter logic.")

# ===== 데이터 클린 함수 =====
def clean_series(s: pd.Series) -> pd.Series:
    s_num = pd.to_numeric(s, errors='coerce')
    return s_num.mask(s_num < 0)

# ===== 전역 스타일 =====
TITLE_FT = 18   # subplot 제목
LABEL_FT = 16   # 축 라벨
TICK_FT  = 16   # 축 숫자
ANN_FT   = 12   # 주석 폰트

plt.rcParams.update({
    "axes.titlesize": TITLE_FT,
    "axes.titleweight": "bold",
    "axes.labelsize": LABEL_FT,
    "axes.labelweight": "bold",
    "xtick.labelsize": TICK_FT,
    "ytick.labelsize": TICK_FT
})

# ===== 산점도 + 회귀 함수 =====
def plot_scatter_with_regression(x: str, y: str, data: pd.DataFrame, ax):
    title = f"{y}"
    x_ser = clean_series(data[x])
    y_ser = clean_series(data[y])
    df_xy = pd.concat([x_ser.rename(x), y_ser.rename(y)], axis=1).dropna()

    # 예외 처리
    if len(df_xy) < 3 or df_xy[x].std() == 0 or df_xy[y].std() == 0:
        sns.scatterplot(data=df_xy, x=x, y=y, ax=ax, s=30, alpha=0.8, edgecolor=None)
        ax.set_xlabel(x, fontweight='bold'); ax.set_ylabel(y, fontweight='bold')
        ax.set_title(title, fontweight='bold')
        # 축 숫자 크기/볼드
        ax.tick_params(axis='both', which='both', labelsize=TICK_FT)
        for label in ax.get_xticklabels() + ax.get_yticklabels():
            label.set_fontsize(TICK_FT)
            label.set_fontweight('bold')
        return None

    # 산점도
    sns.scatterplot(data=df_xy, x=x, y=y, ax=ax, s=30, alpha=0.8, edgecolor=None)

    # 선형 회귀
    X_vals = df_xy[[x]].to_numpy()
    y_vals = df_xy[y].to_numpy()
    model = LinearRegression().fit(X_vals, y_vals)
    y_pred = model.predict(X_vals)
    r2 = float(model.score(X_vals, y_vals))
    slope = float(model.coef_[0])
    r, p = pearsonr(df_xy[x].to_numpy(), df_xy[y].to_numpy())

    # 회귀선
    order = np.argsort(X_vals.ravel())
    ax.plot(X_vals.ravel()[order], y_pred[order], linewidth=2.5, color='C1')

    # 라벨/제목
    ax.set_xlabel(x, fontsize=LABEL_FT, fontweight='bold')
    ax.set_ylabel(y, fontsize=LABEL_FT, fontweight='bold')
    ax.set_title(title, fontsize=TITLE_FT, fontweight='bold')

    # 축 숫자 크기/볼드
    ax.tick_params(axis='both', which='both', labelsize=TICK_FT)
    for label in ax.get_xticklabels() + ax.get_yticklabels():
        label.set_fontsize(TICK_FT)
        label.set_fontweight('bold')

    # 라벨/제목
    ax.set_xlabel(x, fontsize=LABEL_FT, fontweight='bold')
    ax.set_ylabel(y, fontsize=LABEL_FT, fontweight='bold')
    ax.set_title(title, fontsize=TITLE_FT, fontweight='bold')

    # 축 숫자 크기/볼드
    ax.tick_params(axis='both', which='both', labelsize=TICK_FT)
    for label in ax.get_xticklabels() + ax.get_yticklabels():
        label.set_fontsize(TICK_FT)
        label.set_fontweight('bold')

    # 주석 박스
    note = (f"R²={r2:.3f}\nr={r:.3f}\np={p:.3g}\n"
            f"slope={slope:.3f}")
    ax.text(0.98, 0.98, note, transform=ax.transAxes,
            ha='right', va='top',
            fontsize=ANN_FT, fontweight='bold',
            bbox=dict(facecolor='white', edgecolor='0.5',
                      boxstyle='round,pad=0.3', alpha=0.6))

# ===== 서브플롯 배치 =====
n = len(ycols)
ncols = 4
nrows = math.ceil(n / ncols)
fig, axes = plt.subplots(nrows, ncols, figsize=(6.2*ncols, 5.2*nrows))
axes = np.atleast_1d(axes).ravel()

# 여백 조정
fig.subplots_adjust(right=0.96, wspace=0.40, hspace=0.55)

# 플롯 생성
for i, metal in enumerate(ycols):
    plot_scatter_with_regression(xcol, metal, df, axes[i])

# 남는 축 숨김
for j in range(i + 1, len(axes)):
    axes[j].set_visible(False)

plt.tight_layout()
plt.show()

In [None]:
## Pearson 상관관계 분석 + 상관관계 분석표 생성 ##

In [None]:
import re
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from google.colab import files
from IPython.display import display
import matplotlib.pyplot as plt

# --- 업로드 & 로드 ---
uploaded = files.upload()
fname = next(iter(uploaded.keys()))
df = pd.read_excel(fname)

# 1) 중복 열 제거(첫 번째만 유지)
if df.columns.duplicated().any():
    dup_names = df.columns[df.columns.duplicated()].unique().tolist()
    print(f"[warn] duplicated columns dropped (keeping first): {dup_names}")
    df = df.loc[:, ~df.columns.duplicated()].copy()

# 2) 대상 열 선택: (ng|ug|µg|μg)/(m3|m³|㎥) 단위 포함
unit_pat = re.compile(r"µ³㎥", flags=re.I)
target_cols = [c for c in df.columns if unit_pat.search(str(c))]
target_cols = pd.Index(target_cols).drop_duplicates().tolist()
if not target_cols:
    raise ValueError("단위 패턴에 맞는 열이 없습니다. 열명/단위를 확인하세요.")

# 3) 숫자화 (전처리 데이터 신뢰: 추가 마스킹/클리닝 금지)
df_num = df.loc[:, target_cols].apply(pd.to_numeric, errors="coerce")

# 4) 상관계수/유의확률/표본수 테이블
rmat = pd.DataFrame(index=target_cols, columns=target_cols, dtype=float)
pmat = pd.DataFrame(index=target_cols, columns=target_cols, dtype=float)
nmat = pd.DataFrame(index=target_cols, columns=target_cols, dtype=int)

for i, c1 in enumerate(target_cols):
    s1 = df_num[c1]
    for j, c2 in enumerate(target_cols):
        if j > i:
            continue  # 하삼각만 채움
        if i == j:
            rmat.iat[i, j] = 1.0
            pmat.iat[i, j] = 0.0
            nmat.iat[i, j] = s1.notna().sum()
            continue

        pair = pd.concat([s1, df_num[c2]], axis=1).dropna()
        n = len(pair)
        nmat.iat[i, j] = n

        if n < 3 or pair.iloc[:, 0].nunique() < 2 or pair.iloc[:, 1].nunique() < 2:
            rmat.iat[i, j] = np.nan
            pmat.iat[i, j] = np.nan
            continue

        r, p = pearsonr(pair.iloc[:, 0].values, pair.iloc[:, 1].values)
        rmat.iat[i, j] = r
        pmat.iat[i, j] = p

# 하삼각만 채운 행렬을 대칭 보정(표/히트맵에서 NaN처럼 보이는 문제 방지)
rmat = rmat.combine_first(rmat.T)
pmat = pmat.combine_first(pmat.T)
nmat = nmat.combine_first(nmat.T)

# 5) 포맷/라운드
DECIMALS = 3
rmat_round = rmat.round(DECIMALS)
pmat_round = pmat.round(DECIMALS)

def stars(p):
    if pd.isna(p): return ""
    return "**" if p < 0.01 else ("*" if p < 0.05 else "")

out_fmt = pd.DataFrame("", index=rmat.index, columns=rmat.columns, dtype=object)
for i in range(len(rmat.index)):
    for j in range(len(rmat.columns)):
        if j > i:                      # 상삼각은 비움
            out_fmt.iat[i, j] = ""
        elif i == j:                   # 대각선은 1
            out_fmt.iat[i, j] = "1"
        else:                          # 하삼각만 값/별표 표시
            rij = rmat.iat[i, j]
            pij = pmat.iat[i, j]
            out_fmt.iat[i, j] = "NaN" if pd.isna(rij) else f"{rij:.{DECIMALS}f}{stars(pij)}"

# (선택) p값 0.000을 "<0.001"로 표시하고 싶으면:
pmat_disp = pmat.map(lambda x: "<0.001" if pd.notna(x) and x < 0.001 else (f"{x:.3f}" if pd.notna(x) else "NaN"))

# 6) 결과 확인/표시
try:
    from google.colab import data_table
    data_table.enable_dataframe_formatter()
except Exception as e:
    print("[info] data_table 사용 불가:", e)

print("▶ 상관관계 분석표 (*: p<0.05, **: p<0.01)")
display(out_fmt)

print("▶ 상관계수 행렬(r)")
display(rmat_round)

print("▶ p값 행렬(p)")
display(pmat_round)  # 또는 display(pmat_disp)

print("▶ 쌍별 표본수(N)")
display(nmat)

# 7) 납작화 요약 / 유의쌍
def melt_lower(rmat: pd.DataFrame, pmat: pd.DataFrame, nmat: pd.DataFrame) -> pd.DataFrame:
    cols = rmat.columns.tolist()
    recs = []
    for i in range(len(cols)):
        for j in range(i):
            r = rmat.iat[i, j]; p = pmat.iat[i, j]; n = nmat.iat[i, j]
            recs.append({
                "var1": cols[i], "var2": cols[j],
                "r": r, "p": p, "n": int(n) if pd.notna(n) else np.nan,
                "abs_r": abs(r) if pd.notna(r) else np.nan})
    return pd.DataFrame(recs)

pairs = melt_lower(rmat, pmat, nmat)

ALPHA = 0.05
MIN_N = 30
sig_pairs = pairs[(pairs["p"] < ALPHA) & (pairs["n"] >= MIN_N)].sort_values("abs_r", ascending=False)

pairs_round = pairs.copy()
for c in ["r", "p", "abs_r"]:
    if c in pairs_round.columns:
        pairs_round[c] = pairs_round[c].round(DECIMALS)

sig_pairs_round = sig_pairs.copy()
for c in ["r", "p", "abs_r"]:
    if c in sig_pairs_round.columns:
        sig_pairs_round[c] = sig_pairs_round[c].round(DECIMALS)

print(f"▶ 유의(p<{ALPHA}) & 표본수≥{MIN_N} 상위 20쌍")
display(sig_pairs_round.head(20))

# 8) 히트맵
fig, ax = plt.subplots(figsize=(10, 8))
im = ax.imshow(rmat_round.values, aspect='auto', cmap='coolwarm', vmin=-1, vmax=1)
ax.set_title("PM2.5-Metals Correlation Matrix (Pearson)", fontsize=18, fontweight='bold')
ax.set_xticks(range(len(target_cols))); ax.set_yticks(range(len(target_cols)))
ax.set_xticklabels(target_cols, rotation=90); ax.set_yticklabels(target_cols)
fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
plt.tight_layout(); plt.show()

# 9) 저장
with pd.ExcelWriter("상관관계분석표.xlsx") as w:
    out_fmt.to_excel(w, sheet_name="pearson_r_fmt")
    rmat.to_excel(w, sheet_name="pearson_r_raw")
    pmat.to_excel(w, sheet_name="p_values")
    nmat.to_excel(w, sheet_name="pair_N")

with pd.ExcelWriter("상관분석_요약.xlsx") as w:
    pairs.to_excel(w, index=False, sheet_name="all_pairs")
    sig_pairs.to_excel(w, index=False, sheet_name=f"sig_pairs_p<{ALPHA}_N>={MIN_N}")

# 다운로드 (필요시 활성화)
#files.download("상관관계분석표.xlsx")
#files.download("상관분석_요약.xlsx")