In [1]:
from google.colab import drive

# 구글 드라이브 마운트
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# -*- coding: utf-8 -*-
import os, json, csv, io, re, zipfile
from pathlib import Path
from collections import Counter, defaultdict

import pandas as pd

# ================== 사용자 설정 ==================
BASE_DIR = Path("/content/drive/MyDrive/DILAB/OK/DI_LAB/MARS_Datathon/Datasets/AI_HUB")  # Train/ Valid/ 가 있는 루트
OUT_JSONL = BASE_DIR / "qwen_sft_all.jsonl"                    # 최종 통합 파일
ALLOWED_EXT = {".json", ".jsonl", ".csv", ".tsv", ".xlsx", ".xls"}  # 압축 내 파싱 대상
# =================================================

def unzip_all(src_dir: Path, out_dir: Path) -> list[Path]:
    out_dir.mkdir(parents=True, exist_ok=True)
    extracted_files = []
    for zpath in sorted(src_dir.rglob("*.zip")):
        target = out_dir / zpath.stem
        target.mkdir(parents=True, exist_ok=True)
        with zipfile.ZipFile(zpath, 'r') as zf:
            zf.extractall(target)
        extracted_files.extend([p for p in target.rglob("*") if p.is_file()])
    return extracted_files

def read_any_file(path: Path) -> list[dict]:
    """여러 포맷을 파싱해 '레코드 딕트' 리스트로 반환. (아직 messages 변환 전, 원본 dict 유지)"""
    ext = path.suffix.lower()
    try:
        if ext == ".jsonl":
            rows = []
            with path.open("r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    rows.append(json.loads(line))
            return rows
        elif ext == ".json":
            with path.open("r", encoding="utf-8") as f:
                obj = json.load(f)
            if isinstance(obj, list):
                return obj
            elif isinstance(obj, dict):
                # dict 하나만 있으면 리스트로 감싸서 반환
                # 종종 {"data":[...]} 형태 -> data 키 펼치기
                if "data" in obj and isinstance(obj["data"], list):
                    return obj["data"]
                else:
                    return [obj]
        elif ext in {".csv", ".tsv"}:
            sep = "\t" if ext == ".tsv" else ","
            return list(pd.read_csv(path, sep=sep, dtype=str, keep_default_na=False).to_dict(orient="records"))
        elif ext in {".xlsx", ".xls"}:
            # 모든 시트 읽어서 이어붙임
            xls = pd.read_excel(path, sheet_name=None, dtype=str, engine="openpyxl" if ext == ".xlsx" else None)
            frames = []
            for _, df in xls.items():
                frames.append(df)
            if not frames:
                return []
            df_all = pd.concat(frames, ignore_index=True)
            df_all = df_all.fillna("")
            return list(df_all.to_dict(orient="records"))
        else:
            return []
    except Exception as e:
        print(f"[WARN] read fail: {path} -> {e}")
        return []

# 휴리스틱: 다양한 키 이름을 표준 ‘질문/입력/정답/요약/본문’으로 매핑
KEY_ALIASES = {
    "user": ["instruction","query","question","input","prompt","질문","원문","문장","본문","context","source","text"],
    "assistant": ["output","answer","response","label","summary","요약","응답","정답","답변","라벨"],
    # 추가적인 보조 키(메타)
    "id": ["id","sample_id","doc_id","문서ID","문항ID","index"],
    "title": ["title","제목"],
}

def pick_first(d: dict, keys: list[str]) -> str | None:
    for k in keys:
        if k in d and isinstance(d[k], str) and d[k].strip():
            return d[k].strip()
    return None

def normalize_record(raw: dict) -> dict | None:
    """원시 레코드(raw dict) -> {'messages':[...]} or None (미매핑)"""
    # 1) 흔한 QA/요약 포맷: (user, assistant) 바로 뽑기
    user = pick_first(raw, KEY_ALIASES["user"])
    assistant = pick_first(raw, KEY_ALIASES["assistant"])

    # 2) 일부는 instruction + input 분리형
    instr = raw.get("instruction") or raw.get("지시문")
    inp   = raw.get("input") or raw.get("입력") or raw.get("context") or raw.get("원문") or raw.get("본문")

    # 3) 아주 흔한 조합 처리
    if instr and assistant:
        user_msg = instr if not inp else f"{instr}\n\n[추가정보]\n{inp}"
        return {"messages":[{"role":"user","content":user_msg},{"role":"assistant","content":assistant}]}
    if user and assistant:
        return {"messages":[{"role":"user","content":user},{"role":"assistant","content":assistant}]}

    # 4) 요약형(본문 -> 요약) 추정
    if inp and not instr and assistant:
        return {"messages":[{"role":"user","content":inp},{"role":"assistant","content":assistant}]}

    # 5) 문장/태그 시퀀스 라벨링형(토큰열+태그열)일 수 있음 -> 스킵(별도 태스크)
    #    여기서는 SFT 대화 포맷만 생성하므로, 매칭 실패로 판단
    return None

def build_jsonl_from_dir(extract_root: Path, out_jsonl: Path):
    rec_ok, rec_bad = 0, 0
    bad_samples = []
    files = [p for p in extract_root.rglob("*") if p.is_file() and p.suffix.lower() in ALLOWED_EXT]
    files = sorted(files, key=lambda p: (p.suffix, str(p)))
    print(f"[INFO] parse targets: {len(files)} files")

    with out_jsonl.open("w", encoding="utf-8") as out:
        for fp in files:
            rows = read_any_file(fp)
            if not rows:
                continue
            for r in rows:
                norm = normalize_record(r)
                if norm is None:
                    rec_bad += 1
                    if len(bad_samples) < 10:
                        bad_samples.append({"file": str(fp), "raw_keys": list(r.keys())[:20]})
                    continue
                out.write(json.dumps(norm, ensure_ascii=False) + "\n")
                rec_ok += 1

    print(f"[DONE] written={rec_ok}, unmapped={rec_bad}, out={out_jsonl}")
    if bad_samples:
        print("[HINT] 첫 몇 개 미매핑 레코드 키:")
        for s in bad_samples:
            print(" - file:", s["file"], "| keys:", s["raw_keys"])

# ================= 실행 파트 =================
# 0) 작업 폴더 준비
EXTRACT_ROOT = BASE_DIR / "_extracted"
EXTRACT_ROOT.mkdir(parents=True, exist_ok=True)

# 1) 모든 zip 풀기 (Train/Valid 각각의 SourceData, LabelingData 하위)
for split in ["Train", "Valid"]:
    for sub in ["SourceData", "LabelingData"]:
        src_dir = BASE_DIR / split / sub
        if not src_dir.exists():
            continue
        out_dir = EXTRACT_ROOT / split / sub
        extracted = unzip_all(src_dir, out_dir)
        print(f"[UNZIP] {split}/{sub}: {len(extracted)} files extracted")

# 2) 풀린 전체 파일에서 파싱 & 통합 JSONL 생성
build_jsonl_from_dir(EXTRACT_ROOT, OUT_JSONL)

# 3) 샘플 확인
print("\n[CHECK] preview 3 lines:")
with OUT_JSONL.open("r", encoding="utf-8") as f:
    for i, line in enumerate(f):
        print(line.strip())
        if i >= 2: break


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
[WARN] read fail: /content/drive/MyDrive/DILAB/OK/DI_LAB/MARS_Datathon/Datasets/AI_HUB/_extracted/Train/SourceData/TS_영문_온라인 의료 정보 제공 사이트/cid_428610_1.json -> Unexpected UTF-8 BOM (decode using utf-8-sig): line 1 column 1 (char 0)
[WARN] read fail: /content/drive/MyDrive/DILAB/OK/DI_LAB/MARS_Datathon/Datasets/AI_HUB/_extracted/Train/SourceData/TS_영문_온라인 의료 정보 제공 사이트/cid_428610_2.json -> Unexpected UTF-8 BOM (decode using utf-8-sig): line 1 column 1 (char 0)
[WARN] read fail: /content/drive/MyDrive/DILAB/OK/DI_LAB/MARS_Datathon/Datasets/AI_HUB/_extracted/Train/SourceData/TS_영문_온라인 의료 정보 제공 사이트/cid_428611_1.json -> Unexpected UTF-8 BOM (decode using utf-8-sig): line 1 column 1 (char 0)
[WARN] read fail: /content/drive/MyDrive/DILAB/OK/DI_LAB/MARS_Datathon/Datasets/AI_HUB/_extracted/Train/SourceData/TS_영문_온라인 의료 정보 제공 사이트/cid_428612_1.json -> Unexpected UTF-8 B

KeyboardInterrupt: 