## 1. GDPR 법률 데이터와 사건 데이터 불러오기

In [7]:
import pandas as pd
import json

# 파일 경로
laws_path = "../HF_cache/KBs/GDPR/data-00000-of-00001.jsonl"
cases_path = "../HF_cache/cases/GDPR/data-00000-of-00001.jsonl"

# JSONL 파일 로드 함수
def load_jsonl(path):
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]

# 데이터 불러오기
laws_data = load_jsonl(laws_path)
cases_data = load_jsonl(cases_path)

# DataFrame 변환
df_laws = pd.DataFrame(laws_data, columns=[
    "reference", "norm_type", "sender", "sender_role", "recipient", "recipient_role",
    "subject", "subject_role", "information_type", "consent_form", "purpose",
    "sender_is_subject", "recipient_is_subject", "regulation_id", "regulation_content"
])

df_cases = pd.DataFrame(cases_data, columns=[
    "norm_type", "sender", "sender_role", "recipient", "recipient_role",
    "subject", "subject_role", "information_type", "consent_form", "purpose",
    "followed_articles", "violated_articles", "case_content"
])

# 확인
print("=== GDPR 법률 데이터 ===")
display(df_laws.head())

print("\n=== GDPR 사건 데이터 ===")
display(df_cases.head())


=== GDPR 법률 데이터 ===


Unnamed: 0,reference,norm_type,sender,sender_role,recipient,recipient_role,subject,subject_role,information_type,consent_form,purpose,sender_is_subject,recipient_is_subject,regulation_id,regulation_content
0,{},"""General Definition""",[],[],[],[],[],[],[],,[],"""Not Sure""","""Not Sure""",Article 1,"""Subject-matter and objectives"""
1,{},"""General Definition""",[],[],[],[],[],[],[],,[],"""Not Sure""","""Not Sure""",Article 1(1),"""This Regulation lays down rules relating to t..."
2,{},"""General Definition""",[],[],[],[],[],[],[],,[],"""Not Sure""","""Not Sure""",Article 1(2),"""This Regulation protects fundamental rights a..."
3,{},"""Permit""",[],[],[],[],[],[],[],,[],"""Not Sure""","""Not Sure""",Article 1(3),"""The free movement of personal data within the..."
4,{},"""General Definition""",[],[],[],[],[],[],[],,[],"""Not Sure""","""Not Sure""",Article 2,"""Material scope"""



=== GDPR 사건 데이터 ===


Unnamed: 0,norm_type,sender,sender_role,recipient,recipient_role,subject,subject_role,information_type,consent_form,purpose,followed_articles,violated_articles,case_content
0,prohibit,[Meta Platforms Ireland Limited],[Data Controller],[Internal Systems],[Data Storage],[Users],[Data Subjects],[Passwords],,Data Storage,[],"[Article 32, Article 33, Article 34]",Meta Platforms Ireland Limited (MPIL) stored u...
1,prohibit,"[LOCAL VERTICALS, S.L.]",[Data Controller],[Third-party company],[Data Processor],[Individual],[Data Subject],[Privacy Policy Information],,Providing information about data processing,[],"[Article 12, Article 13]",An individual attempted to register on the web...
2,prohibit,"[SANTANDER CONSUMER FINANCE, S.A.]",[Company],[Individual],[Customer],[Individual],[Customer],[Personal Data],Authorization,Managing Credit Card,[],"[Article 6 - Lawfulness of processing, Article...","An individual, who is a customer of SANTANDER ..."
3,prohibit,[University of Agder],[Data Controller],[Employees with no business need],[Unauthorized Accessors],"[Employees, Students, External Individuals]",[Data Subjects],[Personal Data],,,[],"[Article 5(1)(f), Article 32]",The University of Agder (UiA) stored documents...
4,prohibit,[mBank employee],[Bank Employee],[Wrong recipient],[Unintended Recipient],[mBank customers],[Bank Customer],"[Names, Account numbers, Dates of birth, ID ca...",Authorization,Banking operations,[],"[Article 33, Article 34]",An employee of mBank mistakenly sent documents...


## 2. Role, Attribute Graph 불러오기

In [6]:
# !pip install -q networkx lxml pandas

import networkx as nx
import pandas as pd
from pathlib import Path
from itertools import islice

# 파일 경로
ATTR_GRAPH_PATH = "/Users/taeyoonkwack/Documents/PrivaCI-Bench/updated_kgs/attribute_kg_88k.graphml"
ROLE_GRAPH_PATH = "/Users/taeyoonkwack/Documents/PrivaCI-Bench/updated_kgs/role_kg_45k.graphml"

# -------- 그래프 로드 --------
def load_graph(path: str) -> nx.DiGraph:
    G = nx.read_graphml(Path(path))
    if isinstance(G, (nx.MultiDiGraph, nx.MultiGraph)):
        H = nx.DiGraph()
        H.add_nodes_from(G.nodes(data=True))
        for u, v, data in G.edges(data=True):
            if not H.has_edge(u, v):
                H.add_edge(u, v, **data)
        G = H
    return G

# -------- 노드 DataFrame 변환 --------
def nodes_to_df(G: nx.DiGraph, limit: int | None = None) -> pd.DataFrame:
    rows = []
    iterator = G.nodes(data=True)
    if limit is not None:
        iterator = islice(iterator, limit)
    for n, attrs in iterator:
        row = {"node_id": n}
        row.update(attrs or {})
        rows.append(row)
    return pd.DataFrame(rows)

# -------- 엣지 DataFrame 변환 --------
def edges_to_df(G: nx.DiGraph, limit: int | None = None) -> pd.DataFrame:
    rows = []
    iterator = G.edges(data=True)
    if limit is not None:
        iterator = islice(iterator, limit)
    for u, v, data in iterator:
        d = dict(data or {})
        # label 또는 relation 키로 subsume / is subsumed by 들어있음
        relation = d.get("label") or d.get("relation") or ""
        # edge_source도 데이터에 들어있을 수 있음
        edge_source = d.get("source") or d.get("edge_source") or ""
        rows.append({
            "src_node": u,
            "dst_node": v,
            "edge_source": edge_source,
            "relation": relation
        })
    return pd.DataFrame(rows)

# -------- 그래프 불러오기 --------
G_attr = load_graph(ATTR_GRAPH_PATH)
G_role = load_graph(ROLE_GRAPH_PATH)

# -------- 노드/엣지 DataFrame 만들기 --------
df_attr_nodes = nodes_to_df(G_attr)
df_attr_edges = edges_to_df(G_attr)

df_role_nodes = nodes_to_df(G_role)
df_role_edges = edges_to_df(G_role)

# -------- 확인 --------
print("=== Attribute Graph: Nodes sample ===")
display(df_attr_nodes.head(5))

print("=== Attribute Graph: Edges sample ===")
display(df_attr_edges.head(5))

print("=== Role Graph: Nodes sample ===")
display(df_role_nodes.head(5))

print("=== Role Graph: Edges sample ===")
display(df_role_edges.head(5))

# relation 값 분포 확인
print("\nRelation values (Attribute Graph):", df_attr_edges["relation"].unique())
print("Relation values (Role Graph):", df_role_edges["relation"].unique())


=== Attribute Graph: Nodes sample ===


Unnamed: 0,node_id,domain
0,Financial Account,<https://w3id.org/dpv/dpv-owl/dpv-pd#Financial...
1,Account Identifier,<https://w3id.org/dpv/dpv-owl/dpv-pd#AccountId...
2,Behavioral,<https://w3id.org/dpv/dpv-owl/dpv-pd#Behavioral>
3,Vehicle Usage,<https://w3id.org/dpv/dpv-owl/dpv-pd#VehicleUs...
4,Identifying,<https://w3id.org/dpv/dpv-owl/dpv-pd#Identifying>


=== Attribute Graph: Edges sample ===


Unnamed: 0,src_node,dst_node,edge_source,relation
0,Financial Account,Account Identifier,GPT-4o,subsume
1,Financial Account,Payment Card,origin,subsume
2,Financial Account,Bank Account,GPT-4o,subsume
3,Financial Account,Financial,origin,is subsumed by
4,Financial Account,Savings Account,GPT-4o,subsume


=== Role Graph: Nodes sample ===


Unnamed: 0,node_id
0,person
1,inhabitant
2,female sibling
3,sister
4,evaluator


=== Role Graph: Edges sample ===


Unnamed: 0,src_node,dst_node,edge_source,relation
0,person,inhabitant,GPT-4o,is subsumed by
1,person,bad person,GPT-4o,subsume
2,person,female,GPT-4o,subsume
3,person,relative,GPT-4o,subsume
4,person,contestant,GPT-4o,subsume



Relation values (Attribute Graph): ['subsume' 'is subsumed by']
Relation values (Role Graph): ['is subsumed by' 'subsume']


## 3. 사건 데이터에 적용 가능한 법률 가져와보기 
### 3.1. adjacency list 구현 / 관련 함수 구현

In [11]:
import pandas as pd
from collections import defaultdict, deque

def build_child_adj(df_edges: pd.DataFrame) -> dict[str, set[str]]:
    """
    subsume / is subsumed by 두 타입만 존재한다고 가정.
    부모→자식(child) 인접리스트를 구축.
    """
    adj = defaultdict(set)
    for _, r in df_edges.iterrows():
        rel = str(r.get("relation") or "").strip()
        u = str(r.get("src_node"))
        v = str(r.get("dst_node"))
        if not u or not v:
            continue

        if rel == "subsume":
            parent, child = u, v
        elif rel == "is subsumed by":
            parent, child = v, u
        else:
            # 알 수 없는 relation 은 스킵
            continue
        adj[parent].add(child)
    return adj

def descendants_or_self(anchor: str, child_adj: dict[str, set[str]]) -> set[str]:
    """
    anchor 포함하여, anchor에서 하위로 내려가며 도달 가능한 모든 노드(자손)를 반환.
    """
    if not anchor:
        return set()
    seen = {anchor}
    q = deque([anchor])
    while q:
        cur = q.popleft()
        for nxt in child_adj.get(cur, ()):
            if nxt not in seen:
                seen.add(nxt)
                q.append(nxt)
    return seen  # 자신 포함

# 인접리스트 구성 (역할/속성 그래프 각각)
role_child_adj = build_child_adj(df_role_edges)
attr_child_adj = build_child_adj(df_attr_edges)

def to_list(x):
    """
    - 리스트면 그대로
    - 문자열이면 가능한 경우 JSON 디코드 (예: "[]", "\"foo\"" 등) 후 리스트화
    - None / 'null' / 빈문자열 -> []
    - 스칼라 문자열 -> [문자열]
    """
    if x is None:
        return []
    if isinstance(x, list):
        return [str(t).strip() for t in x if str(t).strip()]
    if isinstance(x, str):
        s = x.strip()
        if not s or s.lower() == "null":
            return []
        # JSON-like이면 파싱 시도
        if (s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}")) or (s.startswith('"') and s.endswith('"')):
            try:
                j = json.loads(s)
                if isinstance(j, list):
                    return [str(t).strip() for t in j if str(t).strip()]
                if isinstance(j, str):
                    return [j.strip()] if j.strip() else []
            except Exception:
                pass
        # 일반 문자열
        return [s]
    # 기타 스칼라
    return [str(x).strip()]

def all_law_terms_covered_by_case(law_terms: list[str], case_terms: list[str], child_adj: dict[str, set[str]]) -> bool:
    """
    '법조항의 각 용어'가 '사건의 적어도 하나의 용어'의 하위(또는 동일)인지 검사.
    law_terms가 비어있으면 True.
    """
    if not law_terms:
        return True
    if not case_terms:
        return False

    # 사건의 각 용어에 대해 '하위 포함 집합(자기 자신 포함)'을 미리 계산
    cover_sets = [descendants_or_self(ct, child_adj) for ct in case_terms]

    for lt in law_terms:
        # lt가 사건 용어 중 하나의 하위/동일이면 OK
        if any(lt in cov for cov in cover_sets):
            continue
        return False
    return True

def get_applicable_laws_for_case(
    case_row: pd.Series,
    df_laws: pd.DataFrame,
    role_child_adj: dict[str, set[str]],
    attr_child_adj: dict[str, set[str]],
    return_debug_cols: bool = True,
    only_permit: bool = False
) -> pd.DataFrame:

    case_sender    = to_list(case_row.get("sender"))
    case_recipient = to_list(case_row.get("recipient"))
    case_subject   = to_list(case_row.get("subject"))
    case_info_type = to_list(case_row.get("information_type"))

    rows = []
    for _, law in df_laws.iterrows():
        law_sender    = to_list(law.get("sender"))
        law_recipient = to_list(law.get("recipient"))
        law_subject   = to_list(law.get("subject"))
        law_info_type = to_list(law.get("information_type"))

        cond1 = all_law_terms_covered_by_case(law_sender,    case_sender,    role_child_adj)
        cond2 = all_law_terms_covered_by_case(law_subject,   case_subject,   attr_child_adj)
        cond3 = all_law_terms_covered_by_case(law_recipient, case_recipient, role_child_adj)
        cond4 = all_law_terms_covered_by_case(law_info_type, case_info_type, attr_child_adj)

        if cond1 and cond2 and cond3 and cond4:
            row = {
                "regulation_id": law.get("regulation_id"),
                "norm_type": law.get("norm_type"),
                "sender": law_sender,
                "recipient": law_recipient,
                "subject": law_subject,
                "information_type": law_info_type,
                "regulation_content": law.get("regulation_content"),
            }
            if return_debug_cols:
                row.update({
                    "_cond_sender(role)": cond1,
                    "_cond_subject(attr)": cond2,
                    "_cond_recipient(role)": cond3,
                    "_cond_info_type(attr)": cond4,
                })
            rows.append(row)

    df = pd.DataFrame(rows)

    if only_permit and not df.empty:
        df = df[df["norm_type"].str.strip('"') == "Permit"]

    return df


### 3.2. 사건 <- 적용가능 법규 가져오기

In [18]:
case_idx = 34  # 필요시 바꾸세요
case_row = df_cases.iloc[case_idx]

print("=== 선택한 사건 요약 ===")
display(pd.DataFrame({
    "norm_type": [case_row.get("norm_type")],
    "sender": [to_list(case_row.get("sender"))],
    "recipient": [to_list(case_row.get("recipient"))],
    "subject": [to_list(case_row.get("subject"))],
    "information_type": [to_list(case_row.get("information_type"))],
    "followed_articles": [case_row.get("followed_articles")],
    "violated_articles": [case_row.get("violated_articles")],
    "case_content": [case_row.get("case_content")[:300] + ("..." if case_row.get("case_content") and len(case_row.get("case_content")) > 300 else "")]
}))

# -------------------------------------------------------
# 사건의 followed_articles, violated_articles에 대응되는 법조항 찾기
# -------------------------------------------------------
followed_articles = to_list(case_row.get("followed_articles"))
violated_articles = to_list(case_row.get("violated_articles"))

related_articles = set(followed_articles + violated_articles)

df_related_laws = df_laws[df_laws["regulation_id"].isin(related_articles)]

print("=== 사건에 명시적으로 연관된 법조항 (followed_articles + violated_articles) ===")
if not df_related_laws.empty:
    display(df_related_laws[[
        "regulation_id", "norm_type", "sender", "recipient", "subject", "information_type", "regulation_content"
    ]])
else:
    print("사건의 followed_articles, violated_articles에 해당하는 법조항이 없습니다.")

# -------------------------------------------------------
# 사건에 적용 가능한 법조항 찾기 (필터 기반)
# -------------------------------------------------------
df_applicable = get_applicable_laws_for_case(
    case_row=case_row,
    df_laws=df_laws,
    role_child_adj=role_child_adj,
    attr_child_adj=attr_child_adj,
    return_debug_cols=True,
    only_permit=True   # 여기서 False로 두면 전체, True로 두면 "permit"만
)

print(f"=== 사건 #{case_idx} 에 적용 가능한 GDPR 법조항 (총 {len(df_applicable)}건) ===")
if not df_applicable.empty:
    display(df_applicable[[
        "regulation_id", "norm_type", "sender", "recipient", "subject", "information_type", "regulation_content",
    ]])
else:
    print("조건을 모두 만족하는 법조항이 없습니다.")


=== 선택한 사건 요약 ===


Unnamed: 0,norm_type,sender,recipient,subject,information_type,followed_articles,violated_articles,case_content
0,prohibit,[AFIANZA ASESORES S.L.],[Spanish Data Protection Agency (DPA)],[Individuals involved in court proceedings],"[Personal data, including data relating to cou...",[],[Article 32 - Security of processing],"AFIANZA ASESORES S.L., a data controller, expe..."


=== 사건에 명시적으로 연관된 법조항 (followed_articles + violated_articles) ===
사건의 followed_articles, violated_articles에 해당하는 법조항이 없습니다.
=== 사건 #34 에 적용 가능한 GDPR 법조항 (총 58건) ===


Unnamed: 0,regulation_id,norm_type,sender,recipient,subject,information_type,regulation_content
3,Article 1(3),"""Permit""",[],[],[],[],"""The free movement of personal data within the..."
7,Article 2(4),"""Permit""",[],[],[],[],"""This Regulation shall be without prejudice to..."
38,Article 6(2),"""Permit""",[],[],[],[],"""Member States may maintain or introduce more ..."
45,Article 9(2),"""Permit""",[],[],[],[],"""Paragraph 1 shall not apply if one of the fol..."
49,Article 11(1),"""Permit""",[],[],[],[],"""If the purposes for which a controller proces..."
51,Article 12(2),"""Permit""",[],[],[],[],"""The controller shall facilitate the exercise ..."
52,Article 12(7),"""Permit""",[],[],[],[],"""The information to be provided to data subjec..."
62,Article 23(1),"""Permit""",[],[],[],[],"""Union or Member State law to which the data c..."
69,Article 25(3),"""Permit""",[],[],[],[],"""An approved certification mechanism pursuant ..."
73,Article 27(2),"""Permit""",[],[],[],[],"""The obligation laid down in paragraph 1 of th..."
