# 지식 그래프 구축


In [1]:
import dotenv

dotenv.load_dotenv()

True

In [8]:
from neo4j import GraphDatabase
import pandas as pd
import time
import os

GRAPHRAG_FOLDER = "./working_directory/output"

NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
NEO4J_DATABASE = "neo4j"
driver = GraphDatabase.driver(NEO4J_URI, auth=NEO4J_AUTH)


- 데이터를 효율적으로 임포트하기 위한 배치 임포트 함수와 제약조건을 설정합니다.
- 제약 조건은 DB 내 데이터 중복을 방지하기 위함입니다. Cypher문이 사용됩니다.


In [26]:
def batched_import(statement, df, batch_size=1000):
    """
    Import a dataframe into Neo4j using batched approach.

    Parameters: statement is the Cypher query to execute,
        df is the dataframe to import,
        and batch_size is the number of rows to import in each batch.
    """
    total = len(df)
    start_s = time.time()
    for start in range(0, total, batch_size):
        batch = df.iloc[start: min(start + batch_size, total)]
        result = driver.execute_query(
            "UNWIND $rows AS value " + statement,
            rows=batch.to_dict('records'),
            database_= NEO4J_DATABASE
        )
        print(result.summary.counters)
    print(f"{total} rows in {time.time() - start_s} s.")
    return total

In [22]:
# 제약 조건 설정
statements = [
    "\ncreate constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique",
    "\ncreate constraint document_id if not exists for (d:__Document__) require d.id is unique",
    "\ncreate constraint community_id if not exists for (c:__Community__) require c.community is unique",
    "\ncreate constraint entity_id if not exists for (e:__Entity__) require e.id is unique",
    "\ncreate constraint entity_title if not exists for (e:__Entity__) require e.name is unique",
    "\ncreate constraint covariate_title if not exists for (c:__Covariate__) require c.title is unique",
    "\ncreate constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique",
    "\n",
]

for statement in statements:
    if len((statement or "").strip()) > 0:
        print(statement)
        driver.execute_query(statement)


create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique

create constraint document_id if not exists for (d:__Document__) require d.id is unique

create constraint community_id if not exists for (c:__Community__) require c.community is unique

create constraint entity_id if not exists for (e:__Entity__) require e.id is unique

create constraint entity_title if not exists for (e:__Entity__) require e.name is unique

create constraint covariate_title if not exists for (c:__Covariate__) require c.title is unique

create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique


### Cypher란?

- Cypher는 Neo4j 그래프 데이터베이스에서 사용되는 쿼리 언어로, 그래프 데이터(노드, 관계, 속성)를 조작하고 질의하는 데 특화되어 있습니다.
- SQL과 비슷한 선언적 구조를 가지며, MATCH(패턴 검색), CREATE(노드/관계생성), MERGE(생성 또는 업데이트), SET(속성 설정) 같은 키워드로 매칭을 통해 노드와 관계 간의 연결을 더욱 직관적으로 표현하고 탐색할 수 있다는 차이가 있습니다.
- Neo4j Cypher 튜토리얼: https://neo4j.com/docs/getting-started/cypher/


- 모든 준비 과정이 끝났으니, GraphRAG의 결과물을 Neo4j Aura에 연동시키는 작업을 진행하겠습니다.
- 최종 문서를 DB에 업로드 합니다. documents.parquet 파일에서 문서데이터를 가져와 ** Document ** 노드를 생성합니다.


In [28]:
doc_df = pd.read_parquet(
    f"{GRAPHRAG_FOLDER}/documents.parquet", columns=["id", "title"]
)

# 문서 노드 병합
statement = """
MERGE (d: __Document__ {id:value.id})
SET d += value {.title}
"""
batched_import(statement, doc_df)

{'_contains_updates': True, 'properties_set': 1}
1 rows in 0.08096933364868164 s.


1

- 다음은 텍스트 유닛(청크)을 DB에 업로드합니다. text_units.parquet
- ** Chunk ** 노드를 생성하고 문서와 연결합니다.


In [31]:
# 텍스트 유닛(청크) 임포트
text_df = pd.read_parquet(f"{GRAPHRAG_FOLDER}/text_units.parquet", columns=["id", "text", "n_tokens", "document_ids"])

statement = """
MERGE (c: __Chunk__ {id:value.id})
SET c += value {.text, .n_tokens}
WITH c, value
UNWIND value.document_ids AS document
MATCH (d: __DOCUMENT__ {id:document})
MERGE (c)-[:PART_OF]->(d)
"""

batched_import(statement, text_df)

{'_contains_updates': True, 'properties_set': 312}
156 rows in 0.04469633102416992 s.


156

- 엔티티를 DB에 업로드합니다. - entities.parquet ** Entity ** 노드를 생성합니다.


In [None]:
# 엔티티 임포트
entity_df = pd.read_parquet(
    f"{GRAPHRAG_FOLDER}/entities.parquet",
    columns=["title", "type", "description", "human_readable_id", "id", "text_unit_ids"]
)

statement = """
MERGE (e: __Entity__ {id: value.id})
SET e.human_readable_id = value.human_readable_id,
    e.description = value.description,
    e.name = coalesce(replace(value.title, '"', ''), 'Unknown')
WITH e, value
CALL apoc.create.addLabels(e, CASE WHEN coalesce(value.type, "") = "" THEN [] ELSE [apoc.text.upperCamelCase(replace(value.type, '"', ''))] END) YIELD node
UNWIND value.text_unit_ids AS text_unit
MATCH (c: __Chunk__ {id:text_unit})
MERGE (c)-[:HAS_ENTITY]->(e)
"""

batched_import(statement, entity_df)

{'_contains_updates': True, 'labels_added': 251, 'relationships_created': 382, 'nodes_created': 251, 'properties_set': 1004}
251 rows in 0.44866323471069336 s.


251

- 관계를 DB에 업로드 합니다. relationships.parquet 파일에서 관계 데이터를 로드하여 엔티티간 RELATED 관계를 생성합니다.


In [40]:
# 관계 임포트
rel_df = pd.read_parquet(
    f"{GRAPHRAG_FOLDER}/relationships.parquet",
    columns=["source", "target", "id", "combined_degree", "weight", "human_readable_id", "description", "text_unit_ids"]
)

rel_df.rename(columns={'combined_degree': 'rank'})

rel_statment = """
MATCH (source: __Entity__ {name:replace(value.source, '"', '')})
MATCH (target: __Entity__ {name:replace(value.target, '"', '')})
MERGE (source)-[rel:RELATED {id: value.id}]->(target)
SET rel += value {.rank, .weight, .human_readable_id, .description, .text_unit_ids}
RETURN count(*) as createdRels
"""
batched_import(rel_statment, rel_df)


{'_contains_updates': True, 'properties_set': 1348}
337 rows in 0.06938576698303223 s.


337

- 커뮤니티를 DB에 업로드 합니다. communities.parquet 파일에서 커뮤니티 데이터를 로드하여 \_ Community \_\_ 노드를 생성하고, 해당 노드에서 level과 title 속성을 설정합니다.
- 각 커뮤니티와 관련된 텍스트 유닛들을 DB 내에서 연결하고, 커뮤니티에 속하는 엔티티들을 연결합니다.


In [44]:
# 커뮤니티 임포트
community_df = pd.read_parquet(
    f"{GRAPHRAG_FOLDER}/communities.parquet",
    columns=["id", "level", "title", "text_unit_ids", "relationship_ids"]
)

statement = """
MERGE (c: __Community__ {community: value.title})
SET c.title = value.title,
    c.level = value.level
WITH c, value
UNWIND value.text_unit_ids AS text_unit_id
MATCH (t: __Chunk__ {id: text_unit_id})
MERGE (c)-[:HAS_CHUNK]->(t)
WITH distinct c, value
UNWIND value.relationship_ids AS rel_id
MATCH (start: __Entity__)-[:RELATED {id: rel_id}]->(end: __Entity__)
MERGE (start)-[:IN_COMMUNITY]->(c)
MERGE (end)-[:IN_COMMUNITY]->(c)
RETURN count(distinct c) as createdCommunities
"""

batched_import(statement, community_df)

{'_contains_updates': True, 'labels_added': 56, 'relationships_created': 614, 'nodes_created': 56, 'properties_set': 168}
56 rows in 0.27317309379577637 s.


56

- 커뮤니티 보고서를 DB에 반영합니다.
- community_reports.parquet 파일에서 커뮤니티 보고서를 가져와 커뮤니티 노드에 속성을 추가하고, 커뮤니티의 레벨, 콘텐츠 등 속성을 업데이트합니다. 이후 발견 사항들을 개별적인 Finding 노드로 DB에 체계적으로 반영합니다.


In [59]:
# 커뮤니티 보고서 임포트
community_report_df = pd.read_parquet(
    f"{GRAPHRAG_FOLDER}/community_reports.parquet",
    columns=["id", "community", "level", "title", "summary", "findings", "rank", "rating_explanation", "full_content"]
)

# community 값을 "Community " + 숫자 형태로 문자열을 만들어줌
community_report_df['community'] = "Community " + community_report_df['community'].astype(str)

community_statement = """
MERGE (c: __Community__ {community: value.community})
SET c.level = value.level,
    c.name = value.title,
    c.rank = value.rank,
    c.rating_explanation = value.rating_explanation,
    c.full_content = value.full_content,
    c.summary = value.summary
WITH c, value
UNWIND range(0, size(value.findings) - 1) AS finding_idx
WITH c, value, finding_idx, value.findings[finding_idx] AS finding
MERGE (c)-[:HAS_FINDING]->(f:Finding {id: finding_idx})
SET f += finding
"""

batched_import(community_statement, community_report_df)

{'_contains_updates': True, 'properties_set': 944}
56 rows in 0.05147290229797363 s.


56