# 찐막코드

In [5]:
# -*- coding: utf-8 -*-
import os
import re
import glob
import warnings
from datetime import datetime

import numpy as np
import pandas as pd
from tqdm import tqdm

warnings.filterwarnings("ignore")

# =========================
# 설정
# =========================
BASE_DIR = r"G:/SYNTHEA_NEW"          # 폴더 내 모든 CSV 자동 탐색
OUTPUT_DIR = os.path.join(BASE_DIR, "QUIQ")
os.makedirs(OUTPUT_DIR, exist_ok=True)

USE_SAMPLE = False                     # 10,000명 샘플링 사용 여부
SAMPLE_N = 10000
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# =========================
# 공통 컬럼 / 유틸
# =========================
QUIQ_COLS = [
    "Primary_key", "Variable_ID", "Original_table_name", "Variable_name", "Event_date",
    "Value", "Unit", "Variable_type", "Is_categorical", "Recorder",
    "Recorder_position", "Recorder_affiliation", "Patient_id", "Admission_id",
    "Ground_truth", "Mapping_info_1", "Mapping_info_2"
]

# 코드 계열 변수명(이름으로도 강제 적용)
CODE_LIKE_NAMES = {
    "CODE","REASONCODE","PROCEDURECODE","BODYSITE_CODE","MODALITY_CODE","SOP_CODE",
    "PROCEDURE_CODE","DIAGNOSIS1","DIAGNOSIS2","DIAGNOSIS3","DIAGNOSIS4",
    "DIAGNOSIS5","DIAGNOSIS6","DIAGNOSIS7","DIAGNOSIS8"
}

# ── 값 정리기: strip만 적용
def strip_only(x):
    if pd.isna(x):
        return x
    if isinstance(x, (int, float, np.number, pd.Timestamp)):
        return x
    return str(x).strip()

# ── timestamp 판별
_DATE_REGEXES = [
    r"^\d{4}-\d{2}-\d{2}$",
    r"^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}(:\d{2})?$",
    r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$",
    r"^\d{4}/\d{2}/\d{2}$",
    r"^\d{2}/\d{2}/\d{4}$",
]
def looks_like_date_str(s: str) -> bool:
    s = s.strip()
    if len(s) < 8: return False
    for rgx in _DATE_REGEXES:
        if re.match(rgx, s): return True
    return False

def safe_parse_timestamp(s):
    try:
        ts = pd.to_datetime(s, errors="raise", utc=False, infer_datetime_format=False)
        y = getattr(ts, "year", None)
        if y is not None and 1900 <= int(y) <= 2100:
            return ts
    except Exception:
        pass
    return None

def infer_variable_type(val):
    if pd.isna(val): return np.nan
    if isinstance(val, pd.Timestamp): return "timestamp"
    s = str(val).strip()
    if looks_like_date_str(s) and safe_parse_timestamp(s) is not None:
        return "timestamp"
    if re.fullmatch(r"^[+-]?(\d+(\.\d*)?|\.\d+)$", s):
        return "numeric"
    return "string"

def normalize_vartype(v):
    return v if v in ["numeric", "string", "timestamp"] else np.nan

def finalize_common_columns(df, table_name, patient_col, admission_col):
    df = df.copy()
    df["Original_table_name"] = table_name
    df["Variable_ID"] = np.nan

    for col in ["Is_categorical", "Mapping_info_1", "Mapping_info_2", "Value", "Variable_type", "Variable_name"]:
        if col not in df.columns:
            df[col] = np.nan

    # ✅ Value는 무조건 strip만
    df["Value"] = df["Value"].apply(strip_only)

    # 타입 표준화
    df["Variable_type"] = df["Variable_type"].apply(normalize_vartype)

    mask_valid = ~df["Value"].isna()
    mask_na = ~mask_valid

    # 코드 계열 감지
    varnames = df["Variable_name"].astype(str)
    mask_code_name = mask_valid & varnames.isin(CODE_LIKE_NAMES)
    mask_medcode_map = mask_valid & (df["Mapping_info_1"].astype(str) == "medical_code")
    mask_code_any = mask_code_name | mask_medcode_map

    if mask_code_any.any():
        df.loc[mask_code_any, "Variable_type"] = "string"
        df.loc[mask_code_any, "Is_categorical"] = 1

    if len(df) > 0:
        idx = mask_na[mask_na].index
        if len(idx) > 0:
            df.loc[idx, "Is_categorical"] = np.nan
            df.loc[idx, "Mapping_info_1"] = np.nan
            df.loc[idx, "Mapping_info_2"] = np.nan

    # 범주형 추론
    CATEGORICAL_THRESHOLD = 10
    df["Is_categorical"] = df["Is_categorical"].astype("float")
    to_infer = mask_valid & df["Is_categorical"].isna()
    if to_infer.any():
        vc = df.loc[mask_valid].groupby("Variable_name")["Value"].nunique(dropna=True)
        cat_vars = set(vc[vc <= CATEGORICAL_THRESHOLD].index)
        df.loc[to_infer & df["Variable_name"].isin(cat_vars), "Is_categorical"] = 1
        df.loc[to_infer & (df["Variable_type"] == "string"), "Is_categorical"] = 1
        df.loc[to_infer & df["Is_categorical"].isna(), "Is_categorical"] = 0

    if patient_col and (patient_col in df.columns):
        df["Patient_id"] = df[patient_col]
    else:
        if "Patient_id" not in df.columns:
            df["Patient_id"] = np.nan

    if admission_col and (admission_col in df.columns):
        df["Admission_id"] = df[admission_col]
    else:
        if "Admission_id" not in df.columns:
            df["Admission_id"] = np.nan

    df["Recorder"] = np.nan
    df["Recorder_position"] = np.nan
    df["Recorder_affiliation"] = np.nan
    df["Ground_truth"] = np.nan

    return df


def melt_generic(df, table_name, exclude_cols, event_date_col, patient_col, admission_col,
                 mapping_rules=None, primary_key_start=1):
    _df = df.copy()

    _df["_row_idx"] = np.arange(len(_df))
    _df["_row_pk"] = np.arange(primary_key_start, primary_key_start + len(_df))

    event_src = None
    if event_date_col and (event_date_col in _df.columns):
        event_src = _df[["_row_idx", event_date_col]].rename(columns={event_date_col: "_event_src"})

    drop_cols = set(exclude_cols or [])
    keep_id_cols = []
    if patient_col and (patient_col in _df.columns):
        keep_id_cols.append(patient_col)
    if admission_col and (admission_col in _df.columns):
        keep_id_cols.append(admission_col)
    keep_id_cols += ["_row_idx", "_row_pk"]

    id_vars = list(dict.fromkeys(keep_id_cols))
    value_vars = [c for c in _df.columns if c not in drop_cols and c not in id_vars]

    long_df = _df[id_vars + value_vars].melt(
        id_vars=id_vars,
        value_vars=value_vars,
        var_name="Variable_name",
        value_name="Value"
    ).reset_index(drop=True)

    long_df["Primary_key"] = long_df["_row_pk"].values
    long_df["Unit"] = np.nan
    long_df["Variable_type"] = long_df["Value"].map(infer_variable_type)

    long_df["Mapping_info_1"] = np.nan
    long_df["Mapping_info_2"] = np.nan
    long_df["Is_categorical"] = np.nan

    if mapping_rules:
        def _apply_rule(row):
            v = row["Variable_name"]
            rule = mapping_rules.get(v)
            if rule is None:
                return (np.nan, np.nan)
            return rule(row) if callable(rule) else rule
        m = long_df.apply(_apply_rule, axis=1, result_type="expand")
        if isinstance(m, pd.DataFrame):
            m.columns = ["Mapping_info_1", "Mapping_info_2"]
            long_df[["Mapping_info_1", "Mapping_info_2"]] = m.values

    long_df["Event_date"] = np.nan
    if event_src is not None and "DESCRIPTION" in df.columns:
        long_df = long_df.merge(event_src, on="_row_idx", how="left")
        desc_mask = long_df["Variable_name"].astype(str).eq("DESCRIPTION")
        long_df.loc[desc_mask, "Event_date"] = long_df.loc[desc_mask, "_event_src"]
        long_df.drop(columns=["_event_src"], inplace=True)

    long_df = finalize_common_columns(
        long_df, table_name, patient_col, admission_col
    )
    long_df = long_df.drop(columns=["_row_idx", "_row_pk"], errors="ignore")

    long_df = long_df[QUIQ_COLS]
    next_pk = primary_key_start + len(_df)
    return long_df, next_pk

def convert_observations(df, primary_key_start):
    _df = df.copy()
    n = len(_df)
    row_pk = np.arange(primary_key_start, primary_key_start + n)

    base_cols = {
        "Original_table_name": "observations",
        "Recorder": np.nan,
        "Recorder_position": np.nan,
        "Recorder_affiliation": np.nan,
        "Ground_truth": np.nan,
        "Variable_ID": np.nan,
        "Patient_id": _df.get("PATIENT"),
        "Admission_id": _df.get("ENCOUNTER"),
    }

    vtype_desc = _df.get("VALUE").map(infer_variable_type) if "VALUE" in _df.columns else pd.Series(index=_df.index, dtype=object)
    if "TYPE" in _df.columns:
        forced = _df["TYPE"].astype(str).str.lower().eq("text")
        vtype_desc = vtype_desc.astype(object)
        vtype_desc[forced.fillna(False)] = "string"

    desc_df = pd.DataFrame({
        "Primary_key": row_pk,
        "Variable_ID": base_cols["Variable_ID"],
        "Original_table_name": base_cols["Original_table_name"],
        "Variable_name": _df.get("DESCRIPTION"),
        "Event_date": _df.get("DATE"),
        "Value": _df.get("VALUE"),
        "Unit": _df.get("UNITS"),
        "Variable_type": vtype_desc,
        "Is_categorical": np.nan,
        "Recorder": base_cols["Recorder"],
        "Recorder_position": base_cols["Recorder_position"],
        "Recorder_affiliation": base_cols["Recorder_affiliation"],
        "Patient_id": base_cols["Patient_id"],
        "Admission_id": base_cols["Admission_id"],
        "Ground_truth": base_cols["Ground_truth"],
        "Mapping_info_1": np.nan,
        "Mapping_info_2": np.nan
    })

    if "CATEGORY" in _df.columns:
        is_lab = _df["CATEGORY"].astype(str).str.lower().eq("laboratory")
        desc_df.loc[is_lab, ["Mapping_info_1", "Mapping_info_2"]] = ["event", "lab_event"]
        desc_df.loc[~is_lab, ["Mapping_info_1", "Mapping_info_2"]] = ["event", "chart_event"]

    code_df = pd.DataFrame({
        "Primary_key": row_pk,
        "Variable_ID": base_cols["Variable_ID"],
        "Original_table_name": base_cols["Original_table_name"],
        "Variable_name": "CODE",
        "Event_date": np.nan,
        "Value": _df.get("CODE"),
        "Unit": np.nan,
        "Variable_type": "string",
        "Is_categorical": 1,
        "Recorder": base_cols["Recorder"],
        "Recorder_position": base_cols["Recorder_position"],
        "Recorder_affiliation": base_cols["Recorder_affiliation"],
        "Patient_id": base_cols["Patient_id"],
        "Admission_id": base_cols["Admission_id"],
        "Ground_truth": base_cols["Ground_truth"],
        "Mapping_info_1": "medical_code",
        "Mapping_info_2": np.nan
    })

    category_df = pd.DataFrame({
        "Primary_key": row_pk,
        "Variable_ID": base_cols["Variable_ID"],
        "Original_table_name": base_cols["Original_table_name"],
        "Variable_name": "CATEGORY",
        "Event_date": np.nan,
        "Value": _df.get("CATEGORY"),
        "Unit": np.nan,
        "Variable_type": "string",
        "Is_categorical": np.nan,
        "Recorder": base_cols["Recorder"],
        "Recorder_position": base_cols["Recorder_position"],
        "Recorder_affiliation": base_cols["Recorder_affiliation"],
        "Patient_id": base_cols["Patient_id"],
        "Admission_id": base_cols["Admission_id"],
        "Ground_truth": base_cols["Ground_truth"],
        "Mapping_info_1": np.nan,
        "Mapping_info_2": np.nan
    })

    out = pd.concat([desc_df, code_df, category_df], ignore_index=True)
    out = finalize_common_columns(out, "observations", patient_col=None, admission_col=None)
    out = out[QUIQ_COLS]
    next_pk = primary_key_start + n
    return out, next_pk

# =========================
# 테이블 스펙
# =========================
TABLE_SPECS = {
    "allergies": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "REACTION1": ("medical_code", None),
            "REACTION2": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "DESCRIPTION": ("diagnosis", None),
            "DESCRIPTION1": ("diagnosis", None),
            "DESCRIPTION2": ("diagnosis", None),
        }
    ),
    "careplans": dict(
        exclude={"Id", "PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "REASONCODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "DESCRIPTION": ("procedure", None),
            "REASONDESCRIPTION": ("diagnosis", None),
        }
    ),
    "claims": dict(
        exclude={"Id", "PATIENTID"},
        event_date="SERVICEDATE",
        patient="PATIENTID",
        admission=None,
        mapping={**{f"DIAGNOSIS{i}": ("medical_code", None) for i in range(1, 9)},
                 **{k: ("date", None) for k in ["CURRENTILLNESSDATE","SERVICEDATE","LASTBILLEDDATE1","LASTBILLEDDATE2","LASTBILLEDDATEP"]}}
    ),
    "claims_transactions": dict(
        exclude={"ID", "PATIENTID"},
        event_date="FROMDATE",
        patient="PATIENTID",
        admission=None,
        mapping={
            "PROCEDURECODE": ("medical_code", None),
            "FROMDATE": ("date", None),
            "TODATE": ("date", None),
        }
    ),
    "conditions": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "DESCRIPTION": ("event", "chart_event"),
        }
    ),
    "devices": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
        }
    ),
    "encounters": dict(
        exclude={"Id", "PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "REASONCODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "REASONDESCRIPTION": ("diagnosis", None),
        }
    ),
    "imaging_studies": dict(
        exclude={"Id", "PATIENT", "ENCOUNTER"},
        event_date="DATE",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "PROCEDURE_CODE": ("medical_code", None),
            "BODYSITE_CODE": ("medical_code", None),
        }
    ),
    "immunizations": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="DATE",
        patient="PATIENT",
        admission=None,
        mapping={
            "CODE": ("medical_code", None),
            "DESCRIPTION": ("prescription", "drug"),
        }
    ),
    "medications": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "REASONCODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "DESCRIPTION": ("prescription", "drug"),
            "REASONDESCRIPTION": ("diagnosis", None),
        }
    ),
    "observations": dict(_custom="observations"),
    "organizations": dict(
        exclude={"Id"},
        event_date=None,
        patient="Id",
        admission=None,
        mapping={}
    ),
    "patients": dict(
        exclude={"Id"},
        event_date=None,
        patient="Id",
        admission=None,
        mapping={
            "BIRTHDATE": ("date", None),
            "DEATHDATE": ("date", None),
        }
    ),
    "payers": dict(
        exclude=set(),
        event_date=None,
        patient=None,
        admission=None,
        mapping={}
    ),
    "payer_transitions": dict(
        exclude={"PATIENT"},
        event_date=None,
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "START_DATE": ("date", None),
            "END_DATE": ("date", None),
        }
    ),
    "procedures": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="START",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
            "REASONCODE": ("medical_code", None),
            "START": ("date", None),
            "STOP": ("date", None),
            "DESCRIPTION": ("procedure", None),
            "REASONDESCRIPTION": ("diagnosis", None),
        }
    ),
    "providers": dict(
        exclude={"Id"},
        event_date=None,
        patient="Id",
        admission=None,
        mapping={}
    ),
    "supplies": dict(
        exclude={"PATIENT", "ENCOUNTER"},
        event_date="DATE",
        patient="PATIENT",
        admission="ENCOUNTER",
        mapping={
            "CODE": ("medical_code", None),
        }
    ),
}

def apply_custom_claims_tx_mappings(df):
    def note_mapper(row):
        v = row["Variable_name"]
        if v in ["NOTES", "LINENOTE"]:
            val = row["Value"]
            if isinstance(val, str) and ("(" in val and ")" in val):
                return ("procedure", None)
            return ("prescription", "drug")
        return (row["Mapping_info_1"], row["Mapping_info_2"])
    m = df.apply(note_mapper, axis=1, result_type="expand")
    if isinstance(m, pd.DataFrame):
        m.columns = ["Mapping_info_1", "Mapping_info_2"]
        df[["Mapping_info_1", "Mapping_info_2"]] = m.values
    return df

# =========================
# 샘플링 준비
# =========================
sample_patient_ids = None
if USE_SAMPLE:
    patient_csv = None
    for nm in ["patients.csv", "Patients.csv", "PATIENTS.csv"]:
        p = os.path.join(BASE_DIR, nm)
        if os.path.exists(p):
            patient_csv = p
            break
    if patient_csv:
        pdf = pd.read_csv(patient_csv, low_memory=False)
        id_col = "Id" if "Id" in pdf.columns else ("ID" if "ID" in pdf.columns else None)
        if id_col:
            cand = pdf[id_col].dropna().drop_duplicates().astype(str)
            if len(cand) > SAMPLE_N:
                cand = cand.sample(n=SAMPLE_N, random_state=RANDOM_SEED)
            sample_patient_ids = set(cand)

def apply_sampling(df):
    """
    ✅ 샘플링은 오직 환자 식별 컬럼에서만 적용:
       - PATIENT / PATIENTID 만 필터
       - 'Id'는 환자테이블이 아닌 곳에서는 필터하지 않음
    """
    if not USE_SAMPLE or sample_patient_ids is None:
        return df
    _df = df.copy()
    if "PATIENT" in _df.columns:
        return _df[_df["PATIENT"].astype(str).isin(sample_patient_ids)]
    if "PATIENTID" in _df.columns:
        return _df[_df["PATIENTID"].astype(str).isin(sample_patient_ids)]
    return _df  # 그 외 테이블은 샘플링 건너뜀

# =========================
# 메인 루프 (tqdm 진행바)
# =========================
def run():
    global_pk = 1
    successes, failures = [], []

    csv_files = sorted(glob.glob(os.path.join(BASE_DIR, "*.csv")))
    if not csv_files:
        print("⚠️ No CSV files found in:", BASE_DIR)
        return

    pbar_files = tqdm(csv_files, desc="🔄 Converting CSV → QUIQ", unit="file")

    for fp in pbar_files:
        filename = os.path.basename(fp)
        table = filename[:-4].lower() if filename.lower().endswith(".csv") else filename.lower()
        pbar_files.set_postfix_str(f"reading {filename}")

        try:
            with tqdm(total=4, desc=f"  {table}", leave=False, unit="step") as pbar_step:
                # 1) 읽기
                df = pd.read_csv(fp, low_memory=False)
                pbar_step.update(1); pbar_step.set_postfix_str("loaded")

                # 2) 샘플링
                df = apply_sampling(df)
                pbar_step.update(1); pbar_step.set_postfix_str(f"sampled n={len(df)}")

                # 3) 변환
                spec = TABLE_SPECS.get(table, None)

                if table == "observations":
                    out_df, global_pk = convert_observations(df, global_pk)
                else:
                    if not spec:
                        patient_col = "PATIENT" if "PATIENT" in df.columns else (
                            "PATIENTID" if "PATIENTID" in df.columns else None
                        )
                        admission_col = "ENCOUNTER" if "ENCOUNTER" in df.columns else None
                        out_df, global_pk = melt_generic(
                            df=df, table_name=table,
                            exclude_cols=set(),
                            event_date_col=None,
                            patient_col=patient_col,
                            admission_col=admission_col,
                            mapping_rules={},
                            primary_key_start=global_pk
                        )
                    else:
                        mapping_rules = dict(spec.get("mapping", {}))
                        out_df, global_pk = melt_generic(
                            df=df, table_name=table,
                            exclude_cols=spec.get("exclude", set()),
                            event_date_col=spec.get("event_date"),
                            patient_col=spec.get("patient"),
                            admission_col=spec.get("admission"),
                            mapping_rules=mapping_rules,
                            primary_key_start=global_pk
                        )
                        if table == "claims_transactions":
                            out_df = apply_custom_claims_tx_mappings(out_df)

                pbar_step.update(1); pbar_step.set_postfix_str(f"converted rows={len(out_df)}")

                # 4) 저장
                out_path = os.path.join(OUTPUT_DIR, f"{table}_QUIQ.csv")
                out_df = out_df[QUIQ_COLS]
                out_df.to_csv(out_path, index=False)
                pbar_step.update(1); pbar_step.set_postfix_str("saved")

            successes.append((filename, len(out_df)))
            pbar_files.set_postfix_str(f"✅ {filename} ({len(out_df)} rows)")

        except Exception as e:
            failures.append((filename, str(e)))
            pbar_files.set_postfix_str(f"❌ {filename} error")
            tqdm.write(f"❌ Error in {filename}: {e}")

    # 요약
    print("\n" + "="*70)
    print(f"📊 SUMMARY @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"   Output dir: {OUTPUT_DIR}")
    print(f"   Total files: {len(csv_files)} | Success: {len(successes)} | Fail: {len(failures)}")
    if successes:
        print("\n✅ Successes:")
        for name, n in successes:
            print(f"  - {name:30s} → rows: {n}")
    if failures:
        print("\n❌ Failures:")
        for name, msg in failures:
            print(f"  - {name:30s} → {msg}")
    print("="*70)

if __name__ == "__main__":
    run()


🔄 Converting CSV → QUIQ:   0%|                               | 0/18 [00:00<?, ?file/s, reading allergies.csv]
  allergies:   0%|                                                                    | 0/4 [00:00<?, ?step/s][A
  allergies:  25%|████████████▊                                      | 1/4 [00:00<00:00, 499.98step/s, loaded][A
  allergies:  50%|██████████████████████                      | 2/4 [00:00<00:00, 666.82step/s, sampled n=111][A
  allergies:  75%|█████████████████████████████▎         | 3/4 [00:00<00:00, 32.26step/s, converted rows=1443][A
  allergies: 100%|███████████████████████████████████████| 4/4 [00:00<00:00, 38.83step/s, converted rows=1443][A
  allergies: 100%|█████████████████████████████████████████████████████| 4/4 [00:00<00:00, 38.83step/s, saved][A
🔄 Converting CSV → QUIQ:   6%|█▎                     | 1/18 [00:00<00:01,  9.26file/s, reading careplans.csv][A
  careplans:   0%|                                                                    | 0/4 [


📊 SUMMARY @ 2025-08-25 15:37:24
   Output dir: G:/SYNTHEA_NEW\QUIQ
   Total files: 18 | Success: 18 | Fail: 0

✅ Successes:
  - allergies.csv                  → rows: 1443
  - careplans.csv                  → rows: 2154
  - claims.csv                     → rows: 314650
  - claims_transactions.csv        → rows: 2989113
  - conditions.csv                 → rows: 20700
  - devices.csv                    → rows: 3475
  - encounters.csv                 → rows: 77012
  - imaging_studies.csv            → rows: 90710
  - immunizations.csv              → rows: 6196
  - medications.csv                → rows: 54186
  - observations.csv               → rows: 264468
  - organizations.csv              → rows: 2810
  - patients.csv                   → rows: 2997
  - payer_transitions.csv          → rows: 7910
  - payers.csv                     → rows: 220
  - procedures.csv                 → rows: 143944
  - providers.csv                  → rows: 3372
  - supplies.csv                   → rows: 1463




# VIA

In [7]:
import pandas as pd

# ===============================================
# 1) Source CSV schemas (exactly as you provided)
# ===============================================
schema = {
    "allergies":      ["START","STOP","CODE","DESCRIPTION"],
    "careplans":      ["START","STOP","CODE","DESCRIPTION","REASONCODE","REASONDESCRIPTION"],
    "conditions":     ["START","STOP","CODE","DESCRIPTION"],
    "encounters":     ["DATE","CODE","DESCRIPTION","REASONCODE","REASONDESCRIPTION"],
    "immunizations":  ["DATE","CODE","DESCRIPTION"],
    "medications":    ["DATE","CODE","DESCRIPTION"],
    "observations":   ["DATE","CODE","DESCRIPTION","VALUE"],
    "patients":       ["BIRTHDATE","DEATHDATE","SSN","DRIVERS","PASSPORT","PREFIX","FIRST","LAST","SUFFIX",
                       "MAIDEN","MARITAL","RACE","ETHNICITY","GENDER","BIRTHPLACE","ADDRESS"],
    "procedures":     ["DATE","CODE","DESCRIPTION","REASONCODE","REASONDESCRIPTION"],
}

# ===============================================
# 2) Column name → Description (English)
# ===============================================
variable_desc = {
    # Generic fields
    "START": "Start date/time of condition, allergy, or plan.",
    "STOP": "End date/time of condition, allergy, or plan.",
    "DATE": "Event date.",
    "CODE": "Standard code (diagnosis, procedure, encounter, etc.).",
    "DESCRIPTION": "Textual description of the code or record.",
    "REASONCODE": "Code indicating the reason for the event/procedure.",
    "REASONDESCRIPTION": "Text description of the reason for the event/procedure.",
    "VALUE": "Observed or measured value.",

    # Patient-specific
    "BIRTHDATE": "Patient’s birth date.",
    "DEATHDATE": "Patient’s death date (if applicable).",
    "SSN": "Social Security Number (masked).",
    "DRIVERS": "Driver’s license number (masked).",
    "PASSPORT": "Passport number (masked).",
    "PREFIX": "Name prefix (e.g., Mr, Dr).",
    "FIRST": "First/given name.",
    "LAST": "Last/family name.",
    "SUFFIX": "Name suffix (e.g., Jr, Sr).",
    "MAIDEN": "Maiden name.",
    "MARITAL": "Marital status.",
    "RACE": "Race.",
    "ETHNICITY": "Ethnicity.",
    "GENDER": "Gender.",
    "BIRTHPLACE": "Place of birth.",
    "ADDRESS": "Address (masked).",
}

# ===============================================
# 3) Build VIA table (Original_table_name + Column + Description)
# ===============================================
rows = []
for table, cols in schema.items():
    for col in cols:
        desc = variable_desc.get(col, None)  # leave blank if not in dict
        rows.append({
            "Original_table_name": table,
            "Variable_name": col,
            "Description": desc
        })

via = (
    pd.DataFrame(rows)
    .drop_duplicates()
    .sort_values(["Original_table_name", "Variable_name"])
    .reset_index(drop=True)
)

# Save

via.to_csv("VIA_table.csv", index=False, encoding="utf-8-sig")

print("✅ VIA_table.csv created with exactly these columns: Original_table_name, Variable_name, Description")
via

✅ VIA_table.csv created with exactly these columns: Original_table_name, Variable_name, Description


Unnamed: 0,Original_table_name,Variable_name,Description
0,allergies,CODE,"Standard code (diagnosis, procedure, encounter..."
1,allergies,DESCRIPTION,Textual description of the code or record.
2,allergies,START,"Start date/time of condition, allergy, or plan."
3,allergies,STOP,"End date/time of condition, allergy, or plan."
4,careplans,CODE,"Standard code (diagnosis, procedure, encounter..."
5,careplans,DESCRIPTION,Textual description of the code or record.
6,careplans,REASONCODE,Code indicating the reason for the event/proce...
7,careplans,REASONDESCRIPTION,Text description of the reason for the event/p...
8,careplans,START,"Start date/time of condition, allergy, or plan."
9,careplans,STOP,"End date/time of condition, allergy, or plan."


In [32]:
import os
import glob
import pandas as pd

# CSV들이 들어있는 폴더 경로
folder_path = r"G:\SYNTHEA_NEW\QUIQ"

# 폴더 안의 모든 csv 파일 경로 불러오기
all_csv_files = glob.glob(os.path.join(folder_path, "*_QUIQ.csv"))

# 각 파일을 읽어서 리스트에 담기
dfs = []
for file in all_csv_files:
    df = pd.read_csv(file)
    dfs.append(df)

# 하나로 합치기 (행 방향으로 이어붙이기)
merged_df = pd.concat(dfs, ignore_index=True)
merged_df = merged_df.astype(str).replace('\u2013', '-', regex=True)

In [33]:
# 저장하기
merged_df.to_csv(os.path.join(folder_path, "merged.csv"), index=False)
print("CSV 합치기 완료! -> merged.csv")


CSV 합치기 완료! -> merged.csv
