Imports

In [38]:
import os
import glob
import json
import re
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any

import numpy as np
import torch
from datasets import Dataset
from transformers import AutoTokenizer, AutoModel

Config

In [39]:
@dataclass(frozen=True)
class PipelineConfig:
    data_dirs: Tuple[str, ...]           # 로컬 데이터 폴더들(각 폴더 안에 *.json)
    out_root: str = ""                   # 출력 루트 (out_root/source_folder/*.npy + metadata.csv)

    model_name: str = "beomi/KcELECTRA-base-v2022"
    max_len: int = 128
    batch_size: int = 32

    text_keys: Tuple[str, ...] = ("text", "utterance", "transcript", "sentence", "content")
    recursive: bool = False
    metadata_csv_name: str = "metadata.csv"

Text / JSON Utils

In [40]:
def clean_text(text: str) -> str:
    text = re.sub(r"[^가-힣a-zA-Z0-9\s]", "", str(text))
    text = re.sub(r"\s+", " ", text).strip()
    return text


def dict_of_lists_to_list_of_dicts(d: dict) -> List[dict]:
    keys = list(d.keys())
    n = min(len(d[k]) for k in keys if isinstance(d[k], list))
    out = []
    for i in range(n):
        row = {}
        for k in keys:
            v = d[k]
            row[k] = v[i] if isinstance(v, list) and i < len(v) else v
        out.append(row)
    return out


def find_utter_list(obj) -> Optional[List[dict]]:
    if isinstance(obj, list) and obj and isinstance(obj[0], dict):
        return obj

    if isinstance(obj, dict):
        # 1) value 중 list[dict] 우선
        for v in obj.values():
            if isinstance(v, list) and v and isinstance(v[0], dict):
                return v

        # 2) value 중 dict-of-lists -> list-of-dicts로 변환 시도
        for v in obj.values():
            if isinstance(v, dict) and any(isinstance(x, list) for x in v.values()):
                lod = dict_of_lists_to_list_of_dicts(v)
                if lod and isinstance(lod[0], dict):
                    return lod

    return None

Local File Collection

In [41]:
def collect_json_files(cfg: PipelineConfig) -> List[str]:
    all_files = []
    for d in cfg.data_dirs:
        if cfg.recursive:
            pat = os.path.join(d, "**", "*.json")
            all_files.extend(glob.glob(pat, recursive=True))
        else:
            pat = os.path.join(d, "*.json")
            all_files.extend(glob.glob(pat))

    # 재현성 위해 정렬(원치 않으면 sorted 제거 가능)
    return sorted(set(all_files))

Build Dataset (parse/clean)

In [42]:
def build_dataset_from_local(cfg: PipelineConfig) -> Dataset:
    files = collect_json_files(cfg)
    print("로컬 JSON 파일 수:", len(files))
    if not files:
        raise RuntimeError("지정한 폴더에서 .json 파일을 찾지 못했습니다. 경로를 확인하세요.")

    examples: List[Dict[str, Any]] = []
    bad = 0

    for fp in files:
        try:
            with open(fp, "r", encoding="utf-8") as f:
                obj = json.load(f)

            utter_list = find_utter_list(obj)
            if utter_list is None:
                bad += 1
                continue

            parts = []
            for u in utter_list:
                if not isinstance(u, dict):
                    continue
                t = next((u.get(k) for k in cfg.text_keys if u.get(k) is not None), None)
                if t is None:
                    continue
                t = clean_text(t)
                if t:
                    parts.append(t)

            if not parts:
                bad += 1
                continue

            merged_text = " ".join(parts)

            examples.append({
                "id": os.path.basename(fp), # 파일명
                "path": fp, # 원본 경로
                "source_folder": os.path.basename(os.path.dirname(fp)),  # 상위 폴더명
                "text": merged_text,
                "text_len_chars": len(merged_text),
            })

        except Exception:
            bad += 1

    print("생성 샘플:", len(examples), "| 스킵:", bad)
    if not examples:
        raise RuntimeError("파싱 성공 샘플이 0개입니다. JSON 구조/키를 확인하세요.")

    return Dataset.from_list(examples)

Model Loader

In [43]:
def load_text_encoder(cfg: PipelineConfig):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokenizer = AutoTokenizer.from_pretrained(cfg.model_name)
    model = AutoModel.from_pretrained(cfg.model_name).to(device).eval()
    return tokenizer, model, device

Embedding (CLS) 및 text_len_tokens

In [44]:
#add_embedding 최적화: set_format(torch) 사용 (빠름/안정)
def add_embedding(cfg: PipelineConfig, ds: Dataset, tokenizer, model, device) -> Dataset:
    # tokenize: 기존 컬럼 유지 + 새 컬럼 추가
    def tok_fn(batch):
        enc = tokenizer(
            batch["text"],
            padding="max_length",
            truncation=True,
            max_length=cfg.max_len,
            return_attention_mask=True
        )
        tok_len = [int(sum(m)) for m in enc["attention_mask"]]
        enc["text_len_tokens"] = tok_len
        return enc

    ds_tok = ds.map(tok_fn, batched=True)

    # torch 텐서 포맷 지정(빠르고 안정적)
    ds_tok.set_format(type="torch", columns=["input_ids", "attention_mask"])

    def embed_batch(batch):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)

        with torch.no_grad():
            out = model(input_ids=input_ids, attention_mask=attention_mask)
            cls = out.last_hidden_state[:, 0, :]  # (B, 768)

        batch["text_embedding"] = cls.detach().cpu().numpy().astype(np.float32)

        return batch

    ds_emb = ds_tok.map(embed_batch, batched=True, batch_size=cfg.batch_size)
    return ds_emb

Save .npy per file 및 metadata.csv

In [45]:
def safe_stem(filename: str) -> str:
    return os.path.splitext(filename)[0]


def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)


def save_embeddings_per_file_and_metadata(ds_emb: Dataset, out_root: str, metadata_csv_name: str = "metadata.csv"):
    ensure_dir(out_root)
    rows = []

    ds_emb.reset_format()

    for ex in ds_emb:
        file_id = ex.get("id")
        path = ex.get("path")

        if not file_id or not path:
            raise RuntimeError(
                "저장에 필요한 메타 컬럼(id/path)이 ds_emb에 없습니다. "
                "ds_emb.reset_format()을 했는지, build_dataset_from_local에서 id/path를 넣었는지 확인하세요."
            )

        folder = ex.get("source_folder") or os.path.basename(os.path.dirname(path)) or "unknown"
        out_dir = os.path.join(out_root, folder)
        ensure_dir(out_dir)

        npy_name = safe_stem(file_id) + ".npy"
        npy_path = os.path.join(out_dir, npy_name)

        emb = np.asarray(ex["text_embedding"], dtype=np.float32)
        np.save(npy_path, emb)

        rows.append({
            "id": file_id,
            "path": path,
            "source_folder": folder,
            "text_len_chars": int(ex.get("text_len_chars", -1)),
            "text_len_tokens": int(ex.get("text_len_tokens", -1)),
            "npy_path": npy_path,
            "embedding_dim": int(emb.shape[0]),
            "embedding_dtype": str(emb.dtype),
        })

    import csv
    csv_path = os.path.join(out_root, metadata_csv_name)
    with open(csv_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
        writer.writeheader()
        writer.writerows(rows)

    print("파일별 npy 저장 완료:", out_root)
    print("metadata.csv 저장 완료:", csv_path)
    print("총 저장된 샘플 수:", len(rows))


NumPy Sanity Check

In [46]:
def sanity_check_embeddings_from_dataset(ds_emb: Dataset, n: int = 5):
    n = min(n, len(ds_emb))
    if n == 0:
        print("체크할 샘플이 없습니다.")
        return

    embs = np.stack([np.asarray(ds_emb[i]["text_embedding"], dtype=np.float32) for i in range(n)], axis=0)
    print("샘플 스택 shape:", embs.shape)  # (n, 768)
    print("NaN:", np.isnan(embs).any(), "| Inf:", np.isinf(embs).any())
    print("min/max:", float(embs.min()), float(embs.max()))
    print("mean/std:", float(embs.mean()), float(embs.std()))
    print("L2 norms:", np.linalg.norm(embs, axis=1))

Main

In [47]:
if __name__ == "__main__":
    cfg = PipelineConfig(
        data_dirs=(
            "/content/drive/MyDrive/TL_서울_화재",
            "/content/drive/MyDrive/TL_서울_구급",
            "/content/drive/MyDrive/TL_서울_구조",
            "/content/drive/MyDrive/TL_서울_기타",
        ),
        out_root="/content/output/ex_data",
        max_len=128,
        batch_size=32,
        recursive=False,
    )

    # 로컬 JSON -> Dataset
    ds = build_dataset_from_local(cfg)

    # 모델/토크나이저 로드
    tokenizer, model, device = load_text_encoder(cfg)

    # 임베딩 생성 (CLS)
    ds_emb = add_embedding(cfg, ds, tokenizer, model, device)

    # 임베딩 품질 체크
    print("최종 샘플 수:", len(ds_emb))
    print("임베딩 shape:", ds_emb[0]["text_embedding"].shape)
    sanity_check_embeddings_from_dataset(ds_emb, n=5)

    ds_emb.reset_format()

    # 파일별 .npy + metadata.csv 저장
    save_embeddings_per_file_and_metadata(
        ds_emb,
        out_root=cfg.out_root,
        metadata_csv_name=cfg.metadata_csv_name
    )

# --- Debugging step to verify data path and files ---
'''
data_path = "/content/drive/MyDrive/TL_서울_화재_small"
print(f"Checking path: {data_path}")

if not os.path.exists(data_path):
    print(f"Error: Directory does not exist at {data_path}")
else:
    print(f"Directory exists: {data_path}")
    json_files_found = glob.glob(os.path.join(data_path, "**", "*.json"), recursive=True)
    print(f"Found {len(json_files_found)} JSON files in total (including subdirectories).")
    if json_files_found:
        print("First 5 found JSON files:")
        for f in json_files_found[:5]:
            print(f)
    else:
        print("No JSON files found in the directory or its subdirectories.")
'''

로컬 JSON 파일 수: 10000
생성 샘플: 10000 | 스킵: 0


Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

최종 샘플 수: 10000
임베딩 shape: torch.Size([768])
샘플 스택 shape: (5, 768)
NaN: False | Inf: False
min/max: -3.4013233184814453 5.190710067749023
mean/std: -0.071928009390831 0.5128692984580994
L2 norms: [14.58765   14.406179  13.974519  14.356531  14.4286585]
파일별 npy 저장 완료: /content/output/ex_data
metadata.csv 저장 완료: /content/output/ex_data/metadata.csv
총 저장된 샘플 수: 10000
Checking path: /content/drive/MyDrive/TL_서울_화재_small
Directory exists: /content/drive/MyDrive/TL_서울_화재_small
Found 276 JSON files in total (including subdirectories).
First 5 found JSON files:
/content/drive/MyDrive/TL_서울_화재_small/651e5029d029fc04ba3a708f_20221003.json
/content/drive/MyDrive/TL_서울_화재_small/651e5029d029fc04ba3a6fd0_20221003.json
/content/drive/MyDrive/TL_서울_화재_small/651e50c72f06ed4a6e31e021_20221006.json
/content/drive/MyDrive/TL_서울_화재_small/651e5029d029fc04ba3a6db8_20221003.json
/content/drive/MyDrive/TL_서울_화재_small/651e5013d7e77fe70c0b01bc_20221002.json


Drive에 추가

In [None]:
'''
from google.colab import drive
drive.mount("/content/drive")

cp -r /content/output/data /content/drive/MyDrive/multimodal_text_embeddings
'''

'\nfrom google.colab import drive\ndrive.mount("/content/drive")\n\ncp -r /content/output/data /content/drive/MyDrive/multimodal_text_embeddings\n'