In [1]:
pip install scikit-learn numpy

Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp311-cp311-macosx_12_0_arm64.whl.metadata (11 kB)
Collecting numpy
  Downloading numpy-2.3.3-cp311-cp311-macosx_14_0_arm64.whl.metadata (62 kB)
Collecting scipy>=1.8.0 (from scikit-learn)
  Downloading scipy-1.16.2-cp311-cp311-macosx_14_0_arm64.whl.metadata (62 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloading joblib-1.5.2-py3-none-any.whl.metadata (5.6 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.7.2-cp311-cp311-macosx_12_0_arm64.whl (8.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.6/8.6 MB[0m [31m42.8 MB/s[0m  [33m0:00:00[0m
[?25hDownloading numpy-2.3.3-cp311-cp311-macosx_14_0_arm64.whl (5.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m40.1 MB/s[0m  [33m0:00:00[0m
[?25hDownloading joblib-1.5.2-py3-none-any.whl (308 kB)
Downloading scipy

In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
중복검사 + 주제별 재배치 스크립트 (수정판)
- 기준: SEQ_THRESH (SequenceMatcher), TFIDF_THRESH (Cosine-TFIDF)
- 입력:  input.txt  (각 라인 하나의 문장, (O)/(X) 표기 허용)
- 출력:  output.txt  (주제별 정리본)
        removed_output.txt (임계치 근처로 판단되는 제거 그룹 2개: 평균·쌍별 유사도 표기)
"""
import re
from pathlib import Path
from collections import OrderedDict
from difflib import SequenceMatcher
import math

# --- 설정 (필요 시 조정) ---
INPUT_FILE = Path("input.txt")
OUTPUT_FILE = Path("output.txt")
REMOVED_FILE = Path("removed_output.txt")

SEQ_THRESH = 0.70     # SequenceMatcher 임계값
TFIDF_THRESH = 0.75   # Cosine-TFIDF 임계값
TOP_K_REMOVED = 2     # removed_output에 기록할 군집 수 (임계치 근접 순)
# ----------------------------

def read_input(path):
    raw = path.read_text(encoding="utf-8")
    lines = []
    for ln in raw.splitlines():
        s = ln.strip().rstrip('",')
        if not s:
            continue
        if s in ("[", "]", "{", "},"):
            continue
        if s.startswith('"') and s.endswith('"'):
            s = s[1:-1]
        # detect trailing (O) or (X) possibly with *n
        m = re.search(r"\((O|X)(?:\*\d+)?\)\s*$", s)
        if m:
            mark = m.group(1)
            core = s[:m.start()].strip()
        else:
            m2 = re.search(r"\((O|X)\)", s)
            if m2:
                mark = m2.group(1)
                core = re.sub(r"\((O|X)\)", "", s).strip()
            else:
                mark = ""
                core = s.strip()
        lines.append({"orig": s, "core": core, "mark": mark})
    return lines

def normalize_for_seq(s):
    t = re.sub(r"\{\@.*?\}", "", s)
    t = re.sub(r"[^\w\s가-힣]", " ", t)
    t = re.sub(r"\s+", " ", t).strip().lower()
    return t

def compute_similarities(texts):
    """TF-IDF 코사인 유사도 행렬과 정규화된 문자열 목록(SequenceMatcher용)을 반환."""
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
    except Exception as e:
        raise RuntimeError("scikit-learn 필요: `pip install scikit-learn` 후 재실행하세요.") from e

    vectorizer = TfidfVectorizer(token_pattern=r"(?u)\b\w+\b")
    X = vectorizer.fit_transform(texts)
    sim_tfidf = cosine_similarity(X)   # (n,n) numpy array

    norm_texts = [normalize_for_seq(t) for t in texts]
    return sim_tfidf, norm_texts

def build_clusters(sim_tfidf, norm_texts, seq_thresh, tfidf_thresh):
    n = len(norm_texts)
    adj = [[] for _ in range(n)]
    # TF-IDF 먼저 필터 → SequenceMatcher 적용
    for i in range(n):
        for j in range(i+1, n):
            if sim_tfidf[i, j] >= tfidf_thresh:
                seq_sim = SequenceMatcher(None, norm_texts[i], norm_texts[j]).ratio()
                if seq_sim >= seq_thresh:
                    adj[i].append(j)
                    adj[j].append(i)
    # 연결성분 추출
    visited = [False]*n
    clusters = []
    for i in range(n):
        if visited[i]:
            continue
        stack = [i]
        comp = []
        while stack:
            v = stack.pop()
            if visited[v]:
                continue
            visited[v] = True
            comp.append(v)
            for nb in adj[v]:
                if not visited[nb]:
                    stack.append(nb)
        clusters.append(sorted(comp))
    return clusters

def choose_representatives(clusters, sim_tfidf):
    """각 군집의 대표: 군집 내 TF-IDF 평균 유사도 가장 높은 인덱스 선택."""
    reps = []
    for comp in clusters:
        if len(comp) == 1:
            reps.append((comp[0], comp))
            continue
        best_idx = comp[0]
        best_avg = -1.0
        for i in comp:
            ssum = sum(sim_tfidf[i,j] for j in comp if j!=i)
            avg = ssum / (len(comp)-1) if len(comp)>1 else 0.0
            if avg > best_avg:
                best_avg = avg
                best_idx = i
        reps.append((best_idx, comp))
    return reps

def topic_classify(dedup_entries):
    topic_keywords = OrderedDict([
        ("지방자치", ["지방자치", "지방자치단체", "지방자치법", "주민", "지방의회", "조례","주민소송","자치사무","기관위임사무"]),
        ("권한위임·위탁·내부위임", ["위임","위탁","재위임","수임","수탁","내부위임","권한위임"]),
        ("회계·변상·배상", ["회계","변상","배상","감사원","예산","공유재산","국고"]),
        ("인사·징계·공무원법", ["징계","소청","임용","직위해제","임용결격","공무원","소청심사","공무원법"]),
        ("행정절차·행정소송·행정심판", ["행정소송","항고소송","행정심판","처분","취소소송","항고","행정절차","재결"]),
        ("행정행위·처분·대집행·주민소송", ["행정행위","처분","무효","취소","대집행","점용허가","주민소송"]),
        ("감독·시정·분쟁조정", ["시정명령","감독","분쟁조정","주무부장관","분쟁조정결정","시정요구"]),
        ("공용재산·점용·재산관리", ["공물","점용","공유재산","재산의 취득","관리","점용허가","공유수면"]),
        ("기타", [])
    ])
    topics_out = {k: [] for k in topic_keywords.keys()}
    topics_out["기타"] = []
    for rep_text, members in dedup_entries:
        low = rep_text.lower()
        assigned = False
        for topic, keywords in topic_keywords.items():
            for kw in keywords:
                if kw.lower() in low:
                    topics_out[topic].append((rep_text, members))
                    assigned = True
                    break
            if assigned:
                break
        if not assigned:
            topics_out["기타"].append((rep_text, members))
    return topics_out

def write_output_file(dedup_entries, lines_info, out_path):
    topics_out = topic_classify(dedup_entries)
    out_lines = []
    for topic, items in topics_out.items():
        if not items:
            continue
        out_lines.append(f"## {topic} (총 {len(items)}개)")
        out_lines.append("")
        o_items, x_items, other_items = [], [], []
        for rep_text, members in items:
            countO = sum(1 for m in members if lines_info[m]["mark"] == "O")
            countX = sum(1 for m in members if lines_info[m]["mark"] == "X")
            suffix = []
            if countO: suffix.append(f"(O*{countO})")
            if countX: suffix.append(f"(X*{countX})")
            text_with_suffix = rep_text + (" " + " ".join(suffix) if suffix else "")
            if countO:
                o_items.append(text_with_suffix)
            elif countX:
                x_items.append(text_with_suffix)
            else:
                other_items.append(text_with_suffix)
        out_lines.append(f"### O (표기 O) — {len(o_items)}개")
        if o_items:
            for i, it in enumerate(o_items, start=1):
                out_lines.append(f"{i}-O{i} {it}")
        else:
            out_lines.append("(없음)")
        out_lines.append("")
        out_lines.append(f"### X (표기 X) — {len(x_items)}개")
        if x_items:
            for i, it in enumerate(x_items, start=1):
                out_lines.append(f"{i}-X{i} {it}")
        else:
            out_lines.append("(없음)")
        out_lines.append("")
        if other_items:
            out_lines.append(f"### 기타 (표기 없음) — {len(other_items)}개")
            for i, it in enumerate(other_items, start=1):
                out_lines.append(f"{i}-A{i} {it}")
            out_lines.append("")
    out_path.write_text("\n".join(out_lines), encoding="utf-8")

def build_removed_report(clusters, sim_tfidf, norm_texts, lines_info, removed_path, top_k=TOP_K_REMOVED):
    """
    제거(병합)된 군집 중에서 '임계치 근접(경계)' 군집 top_k 를 골라서 저장합니다.
    - 군집의 (평균 TF-IDF, 평균 Seq) 값을 계산하고, (TFIDF_THRESH, SEQ_THRESH)와의 거리합이 작은 순으로 선택.
    - 출력: 대표문장, 멤버 목록, 평균값, 쌍별 tfidf/seq 행렬
    """
    merged = [c for c in clusters if len(c) > 1]
    if not merged:
        removed_path.write_text("병합(제거)된 군집이 없습니다.\n", encoding="utf-8")
        return

    def cluster_metrics(comp):
        pairs = [(i, j) for i in comp for j in comp if i < j]
        if not pairs:
            return (0.0, 0.0)
        avg_tf = sum(sim_tfidf[i, j] for i, j in pairs) / len(pairs)
        avg_seq = sum(SequenceMatcher(None, norm_texts[i], norm_texts[j]).ratio() for i, j in pairs) / len(pairs)
        return (avg_tf, avg_seq)

    # 거리 측정 함수: 임계치와의 거리합
    def distance_to_thresholds(comp):
        avg_tf, avg_seq = cluster_metrics(comp)
        return abs(avg_tf - TFIDF_THRESH) + abs(avg_seq - SEQ_THRESH)

    merged_sorted = sorted(merged, key=distance_to_thresholds)
    chosen = merged_sorted[:top_k]

    lines = []
    for idx, comp in enumerate(chosen, start=1):
        avg_tf, avg_seq = cluster_metrics(comp)
        lines.append(f"=== Removed Group #{idx}  (size={len(comp)}) ===\n")
        # 대표(최대 average tfidf)
        best = None
        best_avg = -1.0
        for i in comp:
            ssum = sum(sim_tfidf[i,j] for j in comp if j!=i)
            avg = ssum / (len(comp)-1) if len(comp)>1 else 0.0
            if avg > best_avg:
                best_avg = avg
                best = i
        lines.append(f"Representative (index {best}):\n{lines_info[best]['core']}  [{lines_info[best]['mark']}]\n")
        lines.append(f"Average TF-IDF = {avg_tf:.4f}, Average Sequence = {avg_seq:.4f}\n")
        lines.append("Members:\n")
        for m in comp:
            lines.append(f"- ({m}) {lines_info[m]['core']}  [{lines_info[m]['mark']}]\n")
        lines.append("\nPairwise matrix (tfidf/seq):\n")
        for i in comp:
            row = []
            for j in comp:
                if i == j:
                    row.append("1.000/1.000")
                else:
                    tf = sim_tfidf[i,j]
                    seq = SequenceMatcher(None, norm_texts[i], norm_texts[j]).ratio()
                    row.append(f"{tf:.4f}/{seq:.4f}")
            lines.append("\t".join(row) + "\n")
        lines.append("\n")
    removed_path.write_text("\n".join(lines), encoding="utf-8")

def main():
    if not INPUT_FILE.exists():
        print(f"입력파일 {INPUT_FILE}이 없습니다. 동일폴더에 input.txt를 넣고 재실행하세요.")
        return

    lines_info = read_input(INPUT_FILE)
    texts = [li["core"] for li in lines_info]

    # 1) 유사도 계산
    sim_tfidf, norm_texts = compute_similarities(texts)

    # 2) 군집화 (TF-IDF 필터 -> SequenceMatcher)
    clusters = build_clusters(sim_tfidf, norm_texts, SEQ_THRESH, TFIDF_THRESH)

    # 3) 대표 선정 (인덱스 기준)
    dedup_entries_idx = choose_representatives(clusters, sim_tfidf)

    # 4) 인덱스 -> (문장, 멤버) 포맷으로 변환
    dedup_entries = [(lines_info[rep_idx]["core"], comp) for rep_idx, comp in dedup_entries_idx]

    # 5) 출력 파일 생성
    write_output_file(dedup_entries, lines_info, OUTPUT_FILE)

    # 6) removed report (임계치 근처 그룹 top K)
    build_removed_report(clusters, sim_tfidf, norm_texts, lines_info, REMOVED_FILE, top_k=TOP_K_REMOVED)

    total_in = len(texts)
    total_out = len(dedup_entries)
    removed_count = total_in - total_out
    print(f"완료: 입력 {total_in} → 출력(대표문장) {total_out} (제거된 항목 수 {removed_count})")
    print(f"결과 파일: {OUTPUT_FILE.resolve()}")
    print(f"임계치 근접 그룹 리포트: {REMOVED_FILE.resolve()}")

if __name__ == "__main__":
    main()


완료: 입력 3992 → 출력(대표문장) 3128 (제거된 항목 수 864)
결과 파일: /Users/esomin/runtorun/output.txt
임계치 근접 그룹 리포트: /Users/esomin/runtorun/removed_output.txt


In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
중복검사 + 대표문장 선정 스크립트 (콘솔 출력 버전)
- 기준: SEQ_THRESH (SequenceMatcher), TFIDF_THRESH (Cosine-TFIDF)
- 입력: input.txt (각 라인 하나의 문장, (O)/(X) 표기 허용)
- 출력: 콘솔 (대표문장 및 군집 정보)
"""
import re
from pathlib import Path
from difflib import SequenceMatcher

# --- 설정 ---
INPUT_FILE = Path("input.txt")
SEQ_THRESH = 0.70
TFIDF_THRESH = 0.75
# -------------

def read_input(path):
    raw = path.read_text(encoding="utf-8")
    lines = []
    for ln in raw.splitlines():
        s = ln.strip().rstrip('",')
        if not s:
            continue
        if s in ("[", "]", "{", "},"):
            continue
        if s.startswith('"') and s.endswith('"'):
            s = s[1:-1]
        # detect trailing (O) or (X) possibly with *n
        m = re.search(r"\((O|X)(?:\*\d+)?\)\s*$", s)
        if m:
            mark = m.group(1)
            core = s[:m.start()].strip()
        else:
            m2 = re.search(r"\((O|X)\)", s)
            if m2:
                mark = m2.group(1)
                core = re.sub(r"\((O|X)\)", "", s).strip()
            else:
                mark = ""
                core = s.strip()
        lines.append({"orig": s, "core": core, "mark": mark})
    return lines

def normalize_for_seq(s):
    t = re.sub(r"\{\@.*?\}", "", s)
    t = re.sub(r"[^\w\s가-힣]", " ", t)
    t = re.sub(r"\s+", " ", t).strip().lower()
    return t

def compute_similarities(texts):
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    vectorizer = TfidfVectorizer(token_pattern=r"(?u)\b\w+\b")
    X = vectorizer.fit_transform(texts)
    sim_tfidf = cosine_similarity(X)
    norm_texts = [normalize_for_seq(t) for t in texts]
    return sim_tfidf, norm_texts

def build_clusters(sim_tfidf, norm_texts, seq_thresh, tfidf_thresh):
    n = len(norm_texts)
    adj = [[] for _ in range(n)]
    for i in range(n):
        for j in range(i+1, n):
            if sim_tfidf[i, j] >= tfidf_thresh:
                seq_sim = SequenceMatcher(None, norm_texts[i], norm_texts[j]).ratio()
                if seq_sim >= seq_thresh:
                    adj[i].append(j)
                    adj[j].append(i)
    visited = [False]*n
    clusters = []
    for i in range(n):
        if visited[i]:
            continue
        stack = [i]
        comp = []
        while stack:
            v = stack.pop()
            if visited[v]:
                continue
            visited[v] = True
            comp.append(v)
            for nb in adj[v]:
                if not visited[nb]:
                    stack.append(nb)
        clusters.append(sorted(comp))
    return clusters

def choose_representatives(clusters, sim_tfidf):
    reps = []
    for comp in clusters:
        if len(comp) == 1:
            reps.append((comp[0], comp))
            continue
        best_idx = comp[0]
        best_avg = -1.0
        for i in comp:
            ssum = sum(sim_tfidf[i,j] for j in comp if j!=i)
            avg = ssum / (len(comp)-1) if len(comp)>1 else 0.0
            if avg > best_avg:
                best_avg = avg
                best_idx = i
        reps.append((best_idx, comp))
    return reps

def main():
    if not INPUT_FILE.exists():
        print(f"입력파일 {INPUT_FILE}이 없습니다. 동일폴더에 input.txt를 넣고 재실행하세요.")
        return

    lines_info = read_input(INPUT_FILE)
    texts = [li["core"] for li in lines_info]

    # 1) 유사도 계산
    sim_tfidf, norm_texts = compute_similarities(texts)

    # 2) 군집화
    clusters = build_clusters(sim_tfidf, norm_texts, SEQ_THRESH, TFIDF_THRESH)

    # 3) 대표 선정
    dedup_entries_idx = choose_representatives(clusters, sim_tfidf)

    # 4) 결과 콘솔 출력
    print("=== 대표 문장 및 군집 결과 (Step 5) ===")
    for rep_idx, comp in dedup_entries_idx:
        rep_text = lines_info[rep_idx]["core"]
        marks = [lines_info[m]["mark"] for m in comp]
        print(f"\n[대표문장] {rep_text}")
        print(f"  - 군집 크기: {len(comp)}")
        print(f"  - 멤버 인덱스: {comp}")
        print(f"  - 표기 분포: O={marks.count('O')}, X={marks.count('X')}, 없음={marks.count('')}")

    print("\n총 입력:", len(texts))
    print("대표 문장 개수:", len(dedup_entries_idx))
    print("제거된 항목 수:", len(texts) - len(dedup_entries_idx))

if __name__ == "__main__":
    main()


=== 대표 문장 및 군집 결과 (Step 5) ===

[대표문장] "부당해고 금지 (X - 헌법에 부당해고 금지에 대한 명문의 규정은 없다.)
  - 군집 크기: 1
  - 멤버 인덱스: [0]
  - 표기 분포: O=0, X=0, 없음=1

[대표문장] "국가의 고용증진 의무
  - 군집 크기: 2
  - 멤버 인덱스: [1, 36]
  - 표기 분포: O=2, X=0, 없음=0

[대표문장] "상이군경의 유가족의 근로기회우선보장 (X - 헌법 제 32조 ⑥ 국가유공자·상이군경 및 전몰군경의 유가족은 법률이 정하는 바에 의하여 우선적으로 근로의 기회를 부여받는다.\r\n{@※ 해당 지문에 대한 판례(헌법재판소 2006. 2. 23. 2004헌마675)에서는 해당 지문의 대상을 ‘국가유공자’, ‘상이군경’, ‘전몰군경의 유가족’으로 해석하였기 때문에 해당 문제는 복수정답 처리되었습니다.})
  - 군집 크기: 1
  - 멤버 인덱스: [2]
  - 표기 분포: O=0, X=0, 없음=1

[대표문장] "최저임금제 시행
  - 군집 크기: 2
  - 멤버 인덱스: [3, 39]
  - 표기 분포: O=2, X=0, 없음=0

[대표문장] "{@헌법재판관이} 중대한 심신상의 장해로 직무를 수행할 수 없을 때에는 법률이 정하는 바에 의하여 퇴직하게 할 수 있다.
  - 군집 크기: 1
  - 멤버 인덱스: [4]
  - 표기 분포: O=1, X=0, 없음=0

[대표문장] "누구든지 체포 또는 구속을 당한 때에는 즉시 변호인의 조력을 받을 권리를 가진다. 다만, {@형사피의자}가 스스로 변호인을 구할 수 없을 때에는 법률이 정하는 바에 의하여 국가가 변호인을 붙인다.
  - 군집 크기: 1
  - 멤버 인덱스: [5]
  - 표기 분포: O=1, X=0, 없음=0

[대표문장] "국정의 중요한 사항에 관한 대통령의 자문에 응하기 위하여 국가원로로 구성되는 국가원로자문회의를 {@둔다.}
  - 군집 크기: 1
  - 멤버 인덱스: [6]
  - 표기 분포: O=1