## 필수 패키지 설치

In [1]:
!pip -q install transformers accelerate torch openai python-dotenv pandas tqdm requests numpy

## 하이퍼파라미터

In [22]:
# ---------- 사용자 설정 (여기를 바꿔 쓰세요) ----------
THRESHOLD = None     # 코사인 유사도 임계값
TOP_K = 8            # 상위 몇 개의 관련 조항을 정보로 넣을지
EMBED_MODEL = "text-embedding-3-large"

## 설정 & 유틸

In [3]:
import os
import re
import json
import time
import math
import numpy as np
import pandas as pd
import requests
from typing import List, Dict, Any, Tuple

from tqdm import tqdm
from dotenv import load_dotenv

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM


# 로컬 저장 옵션
SAVE_OUTPUT_CSV = True
OUTPUT_CSV_PATH = "gdpr_violation_predictions.csv"

# LLM 모델 (Qwen 권장)
MODEL_ID = "Qwen/Qwen2-7B-Instruct"  # 대안: "google/gemma-7b-it", "meta-llama/Llama-3.1-8B-Instruct"

# 생성 파라미터
GEN_ARGS = dict(
    max_new_tokens=8,
    do_sample=False,
    temperature=0.0,
)

# ---------- 환경 변수 ----------
#load_dotenv()
#OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_API_KEY = ""
if not OPENAI_API_KEY:
    raise RuntimeError("환경변수 OPENAI_API_KEY가 설정되지 않았습니다. .env 파일을 확인하세요.")

# ---------- OpenAI Embeddings ----------
from openai import OpenAI
openai_client = OpenAI(api_key=OPENAI_API_KEY)


def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    denom = (np.linalg.norm(a) * np.linalg.norm(b))
    if denom == 0:
        return 0.0
    return float(np.dot(a, b) / denom)

def get_embedding(text: str, model: str = EMBED_MODEL, max_retries: int = 5) -> List[float]:
    text = (text or "").strip()
    last_err = None
    for attempt in range(1, max_retries + 1):
        try:
            resp = openai_client.embeddings.create(model=model, input=text)
            return resp.data[0].embedding
        except Exception as e:
            last_err = e
            sleep_s = (1.8 ** attempt) + 0.1 * attempt
            print(f"[임베딩 재시도 {attempt}/{max_retries}] {e} → {sleep_s:.1f}s 대기")
            time.sleep(sleep_s)
    raise RuntimeError(f"임베딩 생성 실패: {last_err}")

def extract_first_binary_digit(text: str) -> str:
    """
    모델 출력에서 가장 먼저 등장하는 0 또는 1 한 글자만 추출.
    없으면 '0'으로 보수적 판정.
    """
    m = re.search(r"[01]", text)
    return m.group(0) if m else "0"


## 데이터 로드

In [None]:
# =========================================================
# Clone repo via git + git-lfs, then load target files
# =========================================================
import os, json, subprocess, shlex, tempfile, sys
from pathlib import Path
import pandas as pd

# ---------- 사용자 설정 ----------
# 1) GitHub 토큰: 비공개 리포/LFS 권한이 필요하면 넣으세요. 공개라면 빈 문자열 가능
GITHUB_TOKEN = ""  # 예: "ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

# 2) 읽을 대상들 (URL에서 구조만 추출하여 명시)
REPO_OWNER  = "beefed-up-geek"
REPO_NAME   = "compliance_checking"
REPO_REF    = "main"  # branch or tag or commit
# 이벤트 JSON (LFS 파일)
EVENTIC_REL_PATH = "environments/GDPR-13/eventic_with_embedding.json"
# TSV (원래 raw URL이지만, 리포 경로와 동일하므로 로컬에서 읽음)
TSV_REL_PATH     = "environments/GDPR-13/data/original/sample.tsv"

# ---------- 유틸 ----------
def _run(cmd: str, cwd: str | None = None, check: bool = True) -> subprocess.CompletedProcess:
    """Run shell command, raising with readable message on failure."""
    p = subprocess.run(
        cmd if isinstance(cmd, list) else shlex.split(cmd),
        cwd=cwd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )
    if check and p.returncode != 0:
        msg = (
            f"[CMD FAIL] {cmd}\n"
            f"[STDOUT]\n{p.stdout}\n"
            f"[STDERR]\n{p.stderr}\n"
        )
        raise RuntimeError(msg)
    return p

def _ensure_git_installed():
    try:
        _run("git --version", check=True)
    except Exception as e:
        raise RuntimeError("git이 필요합니다. 환경에 git을 설치해 주세요.") from e

def _ensure_git_lfs_installed(auto_install: bool = True):
    try:
        _run("git lfs version", check=True)
        return
    except Exception:
        if not auto_install:
            raise RuntimeError("git-lfs가 필요합니다. 설치 후 다시 시도하세요.")
        # 시도: Debian/Ubuntu 계열
        print("[INFO] git-lfs가 없어 설치를 시도합니다 (Debian/Ubuntu 계열).")
        try:
            _run("sudo apt-get update", check=True)
            _run("sudo apt-get install -y git-lfs", check=True)
            _run("git lfs install", check=True)
            print("[INFO] git-lfs 설치 완료.")
            return
        except Exception as e:
            # Colab 등 sudo 없는 환경에서 대안: 패키지 미지원 시 수동 설치가 필요할 수 있음
            print("[WARN] 자동 설치 실패. 환경에 맞게 git-lfs를 수동 설치해야 할 수 있습니다.")
            # 그래도 한 번 더 시도
            try:
                _run("git lfs install", check=True)
                return
            except Exception:
                raise RuntimeError("git-lfs 설치가 필요합니다. 수동 설치 후 다시 실행하세요.") from e

def _build_auth_repo_url(owner: str, repo: str, token: str | None) -> str:
    """
    인증이 필요한 경우 토큰을 URL에 내장해서 https 클론.
    Github는 'x-access-token:<token>' 또는 '<token>' 단독도 동작.
    """
    if token:
        # 토큰이 로그에 찍히지 않도록 주의! print 금지.
        return f"https://x-access-token:{token}@github.com/{owner}/{repo}.git"
    else:
        return f"https://github.com/{owner}/{repo}.git"

def clone_repo_and_read_files(owner: str, repo: str, ref: str, rel_paths: list[str], token: str | None = None) -> dict[str, str]:
    """
    - git lfs install
    - git clone --depth=1 --branch <ref>
    - git lfs pull
    - files read as text
    Returns: {rel_path: file_text}
    """
    _ensure_git_installed()
    _ensure_git_lfs_installed(auto_install=True)

    results: dict[str, str] = {}
    auth_url = _build_auth_repo_url(owner, repo, token)
    # 임시 디렉터리에 클론
    with tempfile.TemporaryDirectory() as tmpdir:
        repo_dir = Path(tmpdir) / "repo"

        # clone
        print("[INFO] Cloning repository...")
        _run(f"git clone --depth=1 --branch {shlex.quote(ref)} {auth_url} repo", cwd=tmpdir, check=True)

        # LFS 활성화(관용적으로 1회)
        try:
            _run("git lfs install", cwd=str(repo_dir), check=False)
        except Exception:
            pass

        # lfs pull (필요 오브젝트 받기)
        print("[INFO] Pulling LFS objects (this may take a while)...")
        # 특정 경로만 당기려면: git lfs pull --include="path1,path2"
        _run("git lfs pull", cwd=str(repo_dir), check=False)

        # 파일 읽기
        for rel in rel_paths:
            target = repo_dir / rel
            if not target.exists():
                # lfs checkout 시도 (일부 환경에서 필요)
                _run("git lfs checkout", cwd=str(repo_dir), check=False)
            if not target.exists():
                raise FileNotFoundError(f"파일을 찾을 수 없습니다: {rel} (ref={ref})")
            # 바이너리일 수도 있으므로 우선 바이너리로 열어보고 디코딩
            data = target.read_bytes()
            try:
                text = data.decode("utf-8")
            except UnicodeDecodeError:
                # UTF-8이 아니면 'replace'로라도 텍스트화
                text = data.decode("utf-8", errors="replace")
            results[rel] = text

        print("[INFO] Clone + LFS + Read OK.")
    return results

# ---------- 실행 ----------
token = (GITHUB_TOKEN or "").strip() or None
files_text = clone_repo_and_read_files(
    REPO_OWNER, REPO_NAME, REPO_REF,
    [EVENTIC_REL_PATH, TSV_REL_PATH],
    token=token
)

# JSON 로드
try:
    eventic_text = files_text[EVENTIC_REL_PATH]
    eventic_data = json.loads(eventic_text)
    print(f"[OK] eventic 항목 수: {len(eventic_data)}")
    print(type(eventic_data[0]), list(eventic_data[0].keys())[:6])
except json.JSONDecodeError as e:
    # 혹시 포인터 문서나 바이너리였을 경우 확인을 돕기 위한 헤더 일부 출력
    print("[ERROR] JSON 파싱 실패. 파일 앞부분 미리보기:")
    print(eventic_text[:400])
    raise

# TSV 로드
tsv_text = files_text[TSV_REL_PATH]
# BOM 제거
if tsv_text.startswith("\ufeff"):
    tsv_text = tsv_text.lstrip("\ufeff")

from io import StringIO
df = pd.read_csv(StringIO(tsv_text), sep="\t", dtype=str).fillna("")
print(df.head())
print(f"TSV 샘플 수: {len(df)}")


[INFO] Cloning repository...
[INFO] Pulling LFS objects (this may take a while)...
[INFO] Clone + LFS + Read OK.
[OK] eventic 항목 수: 14
<class 'dict'> ['Agent', 'Deontic', 'Action', 'Information', 'Paragraph', 'Condition']
  label                                           sentence        filename
0     0                              Frans Erenstraat 14A   www.idates.com
1     0  Support requests via compliance @ as well as e...  www.idates.com
2     0  Questions Related to Data Protection and Exerc...  www.idates.com
3     4  If you have any questions related to data prot...  www.idates.com
4     0      In which cases will we process personal data   www.idates.com
TSV 샘플 수: 361


## Qwen 모델 로드 (GPU가 있다면 자동 할당)

In [7]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype="auto",
    device_map="auto",
    trust_remote_code=True,
)

# pad 토큰 설정(경고 방지)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

print("Loaded:", MODEL_ID)
print("Model dtype:", model.dtype)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Loaded: Qwen/Qwen2-7B-Instruct
Model dtype: torch.bfloat16


## 프롬프트 템플릿

In [19]:
FEW_SHOT_TEMPLATE = """You are a classifier. Your task is to decide whether a given sentence from a website privacy policy is related to GDPR Article 13 information obligations.

Output format:
- Output ONLY one number: 1 or 0.
- 1 = The sentence is related to GDPR Article 13 obligations (information that must be provided).
- 0 = The sentence is not related to GDPR Article 13 obligations (irrelevant, generic, or other).

Information about GDPR:
<information>
---

Few-shot Examples:

Example 1
Input: "Frans Erenstraat 14A"
Output: 0

Example 2
Input: "Questions Related to Data Protection and Exercising your Rights"
Output: 0

Example 3
Input: "We will process personal data which you transfer to us actively by your entries"
Output: 0

Example 4
Input: "Owner contact email : -Email-"
Output: 1

Example 5
Input: "Full name of legal entity : Jagex Limited"
Output: 1

Example 6
Input: "You can edit or delete your data at any time"
Output: 1

Example 7
Input: "Right to Restrict the Processing"
Output: 1

Example 8
Input: "Data retention period"
Output: 1

---

Now classify the following sentence:
{sentence}
"""


## Eventic 임베딩 행렬 준비 (코사인 유사도용)

In [9]:
import numpy as np

def _l2_normalize(mat: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    norms = np.linalg.norm(mat, axis=1, keepdims=True)
    norms = np.maximum(norms, eps)
    return mat / norms

# eventic_data에서 embedding 벡터만 추출
eventic_items = eventic_data  # 앞 셀에서 로드되어 있다고 가정
vecs = []
kept_items = []
for it in eventic_items:
    emb = it.get("embedding")
    if isinstance(emb, list) and len(emb) > 0:
        vecs.append(emb)
        kept_items.append(it)

if not vecs:
    raise RuntimeError("eventic_data에 'embedding' 벡터가 없습니다. eventic_with_embedding.json을 확인하세요.")

E = np.array(vecs, dtype=np.float32)
E = _l2_normalize(E)
print(f"[OK] Eventic 임베딩 행렬: shape={E.shape}, 사용 항목 수={len(kept_items)}")


[OK] Eventic 임베딩 행렬: shape=(14, 3072), 사용 항목 수=14


## 임베딩 & 매칭 함수 (Top‑K / THRESHOLD None 처리 로직 포함)

In [10]:
import time
from typing import List, Dict, Any, Tuple

def get_embedding(text: str, model: str = EMBED_MODEL, max_retries: int = 5) -> List[float]:
    text = (text or "").strip()
    last_err = None
    for attempt in range(1, max_retries + 1):
        try:
            resp = openai_client.embeddings.create(model=model, input=text)
            return resp.data[0].embedding
        except Exception as e:
            last_err = e
            time.sleep((1.6 ** attempt) + 0.1 * attempt)
    raise RuntimeError(f"임베딩 생성 실패: {last_err}")

def search_relevant_clauses(
    query_text: str,
    top_k: int | None = TOP_K,
    threshold: float | None = THRESHOLD
) -> List[Tuple[float, Dict[str, Any]]]:
    """
    반환: [(similarity, eventic_item), ...]
    규칙:
      - top_k is None & threshold is None  -> [] (아무 데이터도 추가하지 않음)
      - top_k is None & threshold is not None -> 임계값 이상 모두 반환 (유사도 내림차순)
      - top_k is not None & threshold is None -> 상위 top_k만 반환 (유사도 내림차순)
      - 둘 다 지정 -> 임계값 이상에서 상위 top_k만 반환
    """
    if top_k is None and threshold is None:
        return []

    q = np.array(get_embedding(query_text), dtype=np.float32)
    q = q / (np.linalg.norm(q) + 1e-12)
    sims = E @ q  # 코사인 유사도

    # 공통: 유사도 높은 순 정렬 인덱스
    order = np.argsort(-sims)

    matches = []
    if top_k is None and threshold is not None:
        # 임계값 이상 모두
        for idx in order:
            s = float(sims[idx])
            if s < threshold:
                break
            matches.append((s, kept_items[idx]))
        return matches

    if top_k is not None and threshold is None:
        # 상위 top_k
        k = max(0, int(top_k))
        for idx in order[:k]:
            matches.append((float(sims[idx]), kept_items[idx]))
        return matches

    # 둘 다 지정된 경우: 임계값 이상 중 상위 top_k
    k = max(0, int(top_k))
    for idx in order:
        s = float(sims[idx])
        if s < threshold:
            break
        matches.append((s, kept_items[idx]))
        if len(matches) >= k:
            break
    return matches


## \<information\> 블록 생성

In [15]:
def build_information_block(items: list[dict]) -> str:
    """
    items: [item_dict, ...]
    """
    if not items:
        return ""
    lines = []
    for r in items:
        lines.append(
            f"paragraph [{r.get('Paragraph','')}] "
            f"agent {r.get('Agent','')} "
            f"deontic {r.get('Deontic','')} "
            f"action {r.get('Action','')} "
            f"information {r.get('Information','')} "
            f"condition {r.get('Condition','')}"
        )
    return "\n".join(lines)



## 분류 프롬프트 & Qwen 추론 (이미 로드되어 있으면 재사용)

In [12]:
import re
import torch

# 이미 tokenizer/model을 로드했다면 재사용
try:
    tokenizer, model
except NameError:
    from transformers import AutoTokenizer, AutoModelForCausalLM
    MODEL_ID = "Qwen/Qwen2-7B-Instruct"
    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True, trust_remote_code=True)
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_ID,
        torch_dtype="auto",
        device_map="auto",
        trust_remote_code=True,
    )
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    print("Loaded:", MODEL_ID, "| dtype:", model.dtype)

GEN_ARGS = dict(max_new_tokens=8, do_sample=False, temperature=0.0)

def _extract_first_binary_digit(text: str) -> str:
    m = re.search(r"[01]", text)
    return m.group(0) if m else "0"

def classify_sentence_with_qwen(sentence: str, information_block: str) -> tuple[str, str]:
    prompt = FEW_SHOT_TEMPLATE.format(sentence=sentence)
    prompt = prompt.replace("<information>", (information_block if information_block else "N/A"))

    try:
        # Qwen 채팅 템플릿
        chat = [{"role": "user", "content": prompt}]
        text = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
    except Exception:
        text = prompt

    inputs = tokenizer(text, return_tensors="pt").to(model.device)
    with torch.no_grad():
        out = model.generate(**inputs, **GEN_ARGS)
    gen = tokenizer.decode(out[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
    return _extract_first_binary_digit(gen), gen


## 하나만 처리해보기

In [23]:
sample_row = df.iloc[0]
sample_sentence = str(sample_row["sentence"])

sample_matches = search_relevant_clauses(sample_sentence, top_k=TOP_K, threshold=THRESHOLD)
sample_only_items = [it for _, it in sample_matches]
sample_info_block = build_information_block(sample_only_items)

pred, raw = classify_sentence_with_qwen(sample_sentence, sample_info_block)

# 라벨 이진화
try:
    rli = int(str(sample_row["label"]).strip())
except Exception:
    rli = 0
gold_bin = 0 if rli == 0 else 1

print("Sentence:", sample_sentence)
print("Gold (bin):", gold_bin, "| Pred:", pred)
print("\n--- information block ---\n", sample_info_block if sample_info_block else "(첨부 없음)")
print("\n--- raw model output ---\n", raw)


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Sentence: Frans Erenstraat 14A 
Gold (bin): 0 | Pred: 0

--- information block ---
 paragraph [2(f)] agent Controller deontic shall action provide information existence of automated decision-making including profiling, meaningful information about the logic involved, and the significance and envisaged consequences condition only if automated decision-making including profiling under Article 22(1) and (4) exists, at the time obtained
paragraph [1(d)] agent Controller deontic shall action provide information legitimate interests pursued where processing is based on Article 6(1)(f) condition only if legal basis is Article 6(1)(f), at the time obtained
paragraph [1(f)] agent Controller deontic shall action provide information information on transfers to a third country or international organisation, including adequacy decision or safeguards and how to obtain them condition where applicable, if such transfer is intended, at the time obtained
paragraph [1(e)] agent Controller deontic shall a

## 전체 분류 루프 + Accuracy / MCC (저장 없음)

In [21]:
from sklearn.metrics import accuracy_score, matthews_corrcoef
from tqdm import tqdm

# df에는 최소 'sentence'와 'label' 컬럼이 있어야 합니다.
if "sentence" not in df.columns:
    raise ValueError("df에 'sentence' 컬럼이 필요합니다.")
if "label" not in df.columns:
    raise ValueError("df에 'label' 컬럼이 필요합니다.")

pred_labels: list[int] = []
gold_labels: list[int] = []
info_blocks: list[str] = []
raw_outputs: list[str] = []

for _, row in tqdm(df.iterrows(), total=len(df), desc="Classifying"):
    sent = str(row["sentence"])

    # 관련 조항 매칭 (TOP_K / THRESHOLD None 로직 반영된 함수)
    matches = search_relevant_clauses(sent, top_k=TOP_K, threshold=THRESHOLD)
    # build_information_block는 Dict 리스트만 받도록 구현했다면 item만 추출
    only_items = [it for _, it in matches]
    info_block = build_information_block(only_items)
    info_blocks.append(info_block)

    # Qwen 분류
    pred, raw = classify_sentence_with_qwen(sent, info_block)
    pred_labels.append(int(pred))
    raw_outputs.append(raw)

    # --- 라벨 이진화 규칙: 0이면 0, 아니면 1 ---
    raw_label = row["label"]
    try:
        rli = int(str(raw_label).strip())
    except Exception:
        # 숫자가 아니면 연관 있음(=1)으로 취급할지, 0으로 둘지 정책 필요
        # 여기서는 안전하게 '연관 없음'을 0으로 기본 처리
        rli = 0
    gold_bin = 0 if rli == 0 else 1
    gold_labels.append(gold_bin)

# 지표
acc = accuracy_score(gold_labels, pred_labels)
mcc = matthews_corrcoef(gold_labels, pred_labels)
print(f"Accuracy: {acc:.4f}")
print(f"MCC: {mcc:.4f}")


Classifying:   0%|          | 0/361 [00:00<?, ?it/s]The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Classifying:   0%|          | 1/361 [00:00<05:06,  1.17it/s]The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Classifying:   1%|          | 2/361 [00:01<05:07,  1.17it/s]The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Classifying:   1%|          | 3/361 [00:02<05:20,  1.12it/s]The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Classifying:   1%|          | 4/361 [00:03<06:12,  1.04s/it]The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. 

Accuracy: 0.7812
MCC: 0.3742



