# Neo4j와 LangChain을 활용한 테슬라 10-K 보고서 RAG 시스템

---

## 1. Neo4J Desktop 환경 설정

- **Neo4j Desktop 소개**:
    - Neo4j 작업을 위한 클라이언트 애플리케이션
    - 로컬 환경에서 Neo4j를 학습하고 실험하는 데 필요한 모든 것을 포함함
    - 사용자의 컴퓨터 리소스가 허용하는 한 **여러 로컬 데이터베이스**를 생성할 수 있음
    - **Enterprise Edition 라이센스**: 단, 개발자 개인에 대해서는 1개 계정을 테스트 목적으로 지원

- **다운로드 및 설치**: https://neo4j.com/deployment-center/?desktop-gdb
    - Neo4J 5.24.0 선택
    - 새 프로젝트 생성 및 DBMS 추가

- **APOC 플러그인 설정**: 
    - APOC 플러그인을 설치하려는 데이터베이스가 있는 프로젝트(Graph DBMS)를 선택
    - Graph DBMS 메뉴 클릭하고, APOC 플러그인(Plugin) 설치

- **설정 파일 수정**: 데이터베이스를 중지한 상태에서 데이터베이스 카드의 오른쪽에 있는 `...` (메뉴) 버튼을 클릭

    - 메뉴에서 **Settings** 선택하고 다음 내용을 추가 (`neo4j.conf` 파일)
        ```
        dbms.security.procedures.unrestricted=apoc.meta.*,apoc.*
        ```

In [None]:
import os
from dotenv import load_dotenv

# 환경 변수 로드
load_dotenv()

In [None]:
from langchain_neo4j import Neo4jGraph

# Neo4j Desktop 연결 설정
graph = Neo4jGraph(
    url=os.getenv("NEO4J_URI"),
    username=os.getenv("NEO4J_USERNAME"),
    password=os.getenv("NEO4J_PASSWORD"),
    database=os.getenv("NEO4J_DATABASE"),
    enhanced_schema=True
)

In [None]:
# 테스트 쿼리 실행 
cypher_query = """
CREATE (n:Test {name: "Hello Neo4J DB"}) 
RETURN n
"""

graph.query(cypher_query)

In [None]:
def reset_database(graph):
    """
    APOC 없이 데이터베이스 초기화하기
    """
    # 모든 노드와 관계 삭제
    graph.query("MATCH (n) DETACH DELETE n")
    
    # 모든 제약조건 삭제
    constraints = graph.query("SHOW CONSTRAINTS")
    for constraint in constraints:
        constraint_name = constraint.get("name")
        if constraint_name:
            graph.query(f"DROP CONSTRAINT {constraint_name}")
    
    # 모든 인덱스 삭제
    indexes = graph.query("SHOW INDEXES")
    for index in indexes:
        index_name = index.get("name")
        index_type = index.get("type")
        if index_name and index_type != "CONSTRAINT":
            graph.query(f"DROP INDEX {index_name}")
    
    print("데이터베이스가 초기화되었습니다.")

# 데이터베이스 초기화
reset_database(graph)

In [None]:
graph.refresh_schema()
graph.schema

## 2. 지식그래프 스키마 설계

* **주요 엔티티 (노드)**:

   1. **Document (문서)**: 
      - `id`: STRING - 문서의 고유 식별자 (예: '10k_data/tsla-20241231-gen.pdf')
      - `source`: STRING - 문서의 출처 (예: '10k_data/tsla-20241231-gen.pdf')

   2. **Section (섹션)**:
      - `name`: STRING - 섹션 이름 (예: "Business")
      - `document_id`: STRING - 소속 문서 ID (예: '10k_data/tsla-20241231-gen.pdf')

   3. **Chunk (청크)**:
      - `document_id`: STRING - 소속 문서 ID (예: '10k_data/tsla-20241231-gen.pdf')
      - `chunk_id`: STRING - 청크 고유 ID (예: "71ffd0db-fe12-45cb-8f33-81d89982ad60")
      - `section_start_page`: INTEGER - 섹션 시작 페이지 (범위: 5-98)
      - `section_name`: STRING - 소속 섹션 이름 (예: "Business")
      - `parent_id`: STRING - 부모 요소 ID (예: "736e213533bb2f95baf5ac36cd9fb0be")
      - `element_id`: STRING - 요소 ID (예: "bccf193d025e02740a6628e721ac2f73")
      - `content`: STRING - 실제 텍스트 내용 (예: "ITEM 1. BUSINESS  Overview  We design, develop, ma")
      - `order`: INTEGER - 순서 (범위: 1-112)
      - `embedding`: VECTOR - 벡터 임베딩 (차원: 1536)

* **관계**:

   1. **HAS_SECTION**: 문서가 섹션을 포함하는 관계
      - Document → Section

   2. **CONTAINS**: 섹션이 청크를 포함하는 관계
      - Section → Chunk

   3. **NEXT**: 청크 간의 순서 관계
      - Chunk → Chunk

* **제약조건**:

   1. Document의 id는 고유해야 함
   2. Section은 (name, document_id) 조합으로 고유하게 식별
   3. Chunk의 chunk_id는 고유해야 함

In [None]:
# Document 노드 레이블 및 속성 정의 (제약조건 설정)

cypher_query = """
CREATE CONSTRAINT IF NOT EXISTS    // 제약조건 생성
FOR (d:Document)   // Document 레이블을 가진 노드에 대해
REQUIRE d.id IS UNIQUE;  // id 속성이 유일해야 함
"""

graph.query(cypher_query)

In [None]:
# Section 노드 레이블 및 속성 정의 (제약조건 설정)

cypher_query = """
CREATE CONSTRAINT IF NOT EXISTS  // 제약조건 생성
FOR (s:Section)  // Section 레이블을 가진 노드에 대해
REQUIRE (s.name, s.document_id) IS NODE KEY; // name과 document_id 속성이 유일해야 함 (복합키)
"""

graph.query(cypher_query)

In [None]:
# Chunk 노드 레이블 및 속성 정의 (제약조건 설정)

cypher_query = """
CREATE CONSTRAINT IF NOT EXISTS   // 제약조건 생성
FOR (c:Chunk)  // Chunk 레이블을 가진 노드에 대해
REQUIRE c.chunk_id IS UNIQUE;  // chunk_id 속성이 유일해야 함
"""

graph.query(cypher_query)

In [None]:
# 벡터 인덱스 생성
cypher_query = """
CREATE VECTOR INDEX chunk_content_index IF NOT EXISTS
FOR (c:Chunk) 
ON (c.embedding)
OPTIONS {
  indexConfig: {
    `vector.dimensions`: 1536,
    `vector.similarity_function`: 'cosine'
  }
}
"""

graph.query(cypher_query)

## 3. 데이터 구조화 및 저장

* **노드 구조**:
   - **Document 노드**: 전체 10-K 문서 표현
   - **Section 노드**: 문서의 각 섹션 표현 (Business, Risk Factors 등)
   - **Chunk 노드**: 분할된 텍스트 청크

* **관계 구조**:
   - `(:Document)-[:HAS_SECTION]->(:Section)`
   - `(:Section)-[:CONTAINS]->(:Chunk)`
   - `(:Chunk)-[:NEXT]->(:Chunk)` 


In [None]:
import pickle

# 저장된 섹션 데이터 로드
with open("10k_data/tesla_10k_sections_split.pkl", "rb") as f:
    section_docs_split = pickle.load(f)

# 섹션 데이터 확인
print(f"Number of sections: {len(section_docs_split)}")

In [None]:
# 섹션 데이터 확인
section_docs_split.keys()

In [None]:
# Business 섹션 데이터 확인 (첫 번째 청크)
section_docs_split["Business"][0].to_json()

In [None]:
# Business 섹션 데이터 확인 (마지막 청크)
section_docs_split["Business"][-1].to_json()

In [None]:
import os

# 첫 번째 문서의 메타데이터에서 Source와 문서 ID 추출 
first_doc = next(iter(section_docs_split.values()))[0]
source = first_doc.metadata['source']
doc_id = os.path.split(source)[-1].split(".")[0]

# Source 확인
print(f"Source: {source}")

# 문서 ID 확인
print(f"Document ID: {doc_id}")

`(1) Document 노드 생성`

In [None]:
import uuid
from langchain_openai import OpenAIEmbeddings

# OpenAI Embeddings 객체 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Document 노드 생성 함수
def create_document_node(
        graph: Neo4jGraph,   # Neo4j 그래프 객체
        doc_id: str,         # 문서 ID
        source: str          # 문서 소스
    ):
    """
    문서 노드 생성 함수
    """
    query = """
    MERGE (d:Document {id: $doc_id})
    SET d.source = $source
    RETURN d
    """
    return graph.query(
        query,
        params={
            "doc_id": doc_id,
            "source": source
        }
    )

# Document 노드 생성
create_document_node(graph, doc_id, source)

In [None]:
# Section 노드 생성 함수
def create_section_node(
        graph: Neo4jGraph,   # Neo4j 그래프 객체
        section_name: str,   # 섹션 이름
        doc_id: str          # 문서 ID
    ):
    """
    섹션 노드 생성 함수
    """
    query = """
    MATCH (d:Document {id: $doc_id})
    MERGE (s:Section {name: $section_name, document_id: $doc_id})
    MERGE (d)-[:HAS_SECTION]->(s)
    RETURN s
    """
    return graph.query(query, params={"section_name": section_name, "doc_id": doc_id})

# Chunk 노드 생성 
import numpy as np
def create_chunk_node(
        graph: Neo4jGraph,   # Neo4j 그래프 객체
        section_name: str,   # 섹션 이름
        doc_id: str,         # 문서 ID
        chunk_id: str,       # 청크 ID
        content: str,        # 청크 내용
        embedding: np.ndarray | list, # 임베딩 벡터
        metadata: dict       # 청크 메타데이터
    ):
    """
    청크 노드 생성 함수
    """
    # 임베딩 벡터가 numpy 배열인 경우 리스트로 변환
    if hasattr(embedding, "tolist"):  
        embedding = embedding.tolist()
    
    # 벡터 임베딩 생성 (db.create.setNodeVectorProperty 사용)
    query = """
    MATCH (s:Section {name: $section_name, document_id: $doc_id})
    MERGE (c:Chunk {
        chunk_id: $chunk_id,   // 청크 ID
        content: $content,   // 청크 내용
        order: $order,      // 청크 순서
        element_id: $element_id, // 요소 ID
        parent_id: $parent_id, // 부모 ID
        document_id: $doc_id, // 문서 ID
        section_name: $section_name, // 섹션 이름
        section_start_page: $page_number // 섹션 시작 페이지
    })
    WITH c, s
    CALL db.create.setNodeVectorProperty(c, 'embedding', $embedding)
    MERGE (s)-[:CONTAINS]->(c)
    RETURN c
    """
    return graph.query(
        query,
        params={
            "chunk_id": chunk_id,
            "content": content,
            "embedding": embedding,
            "order": metadata['order'],
            "element_id": metadata['element_id'],
            "parent_id": metadata['parent_id'],
            "page_number": metadata['page_number'],
            "section_name": section_name,
            "doc_id": doc_id
        }
    )


# 청크 간 순서 관계 생성 함수
def create_next_relationship(
        graph: Neo4jGraph,   # Neo4j 그래프 객체
        prev_chunk_id: str,  # 이전 청크 ID
        next_chunk_id: str   # 다음 청크 ID
    ):
    """
    청크 간 순서 관계 생성 함수
    """
    query = """
    MATCH (prev:Chunk {chunk_id: $prev_chunk_id})
    MATCH (next:Chunk {chunk_id: $next_chunk_id})
    MERGE (prev)-[:NEXT]->(next)
    """
    return graph.query(
        query,
        params={
            "prev_chunk_id": prev_chunk_id,
            "next_chunk_id": next_chunk_id
        }
    )

In [None]:
section_docs_split

In [None]:
# 각 섹션에 대해 처리
for section_name, chunks in section_docs_split.items():

    # 각 섹션의 source와 문서 ID 추출
    source = section_docs_split[section_name][0].metadata['source']
    doc_id = os.path.split(source)[-1].split(".")[0]

    # Document 노드 생성
    create_document_node(graph, doc_id, source)

    # Section 노드 생성
    create_section_node(graph, section_name, doc_id)    

    print(f"Processing section: {section_name}")

    # 청크 ID 추적을 위한 변수
    prev_chunk_id = None
    
    # 각 청크에 대해 처리
    for chunk in chunks:
        # 청크 ID 생성
        chunk_id = str(uuid.uuid4())
        
        # 임베딩 생성
        embedding = embeddings.embed_query(chunk.page_content)
        
        # Chunk 노드 생성
        create_chunk_node(
            graph,           # Neo4j 그래프 객체
            section_name,    # 섹션 이름
            doc_id,          # 문서 ID
            chunk_id,        # 청크 ID
            chunk.page_content,  # 청크 내용
            embedding,       # 임베딩 벡터
            chunk.metadata   # 청크 메타데이터
        ) 
        
        # 이전 청크가 있으면 NEXT 관계 생성
        if prev_chunk_id:
            create_next_relationship(graph, prev_chunk_id, chunk_id)
        
        # 현재 청크 ID를 이전 청크 ID로 설정
        prev_chunk_id = chunk_id

    # 청크 노드 수 확인
    chunk_node_count = graph.query(
        "MATCH (c:Chunk {section_name: $section_name}) RETURN COUNT(c) AS count",
        params={"section_name": section_name}  # section_name 변수 전달
    )[0]['count']

    print(f"Chunk node count: {chunk_node_count}")
    print("-" * 100)

print("모든 청크 노드와 관계가 Neo4j 그래프에 성공적으로 추가되었습니다.")

In [None]:
# 청크 노드 수 확인
chunk_count_query = """
MATCH (c:Chunk)
RETURN COUNT(c) AS chunk_count
"""
# 청크 노드 수 쿼리 실행
graph.query(chunk_count_query)

In [None]:
# 섹션 노드 수 확인
section_count_query = """
MATCH (s:Section)
RETURN COUNT(s) AS section_count
"""
# 섹션 노드 수 쿼리 실행
graph.query(section_count_query)

## 4. 벡터 검색 및 RAG 구현

In [None]:
from langchain_community.vectorstores import Neo4jVector
from langchain_openai import OpenAIEmbeddings

# OpenAI Embeddings 객체 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# Neo4j 벡터 스토어 초기화 (기존 인덱스 사용)
vector_store = Neo4jVector.from_existing_index(
    embedding=embeddings,
    url=os.getenv("NEO4J_URI"),
    username=os.getenv("NEO4J_USERNAME"), 
    password=os.getenv("NEO4J_PASSWORD"),
    database=os.getenv("NEO4J_DATABASE"),
    index_name="chunk_content_index",   # 벡터 인덱스 이름
    node_label="Chunk",                # 노드 레이블
    text_node_property="content",      # 텍스트 노드 속성
    embedding_node_property="embedding", # 임베딩 노드 속성

    # 검색 쿼리: 벡터 검색을 통해 청크 노드를 검색하고, 이를 기반으로 섹션 및 이전/다음 청크 매칭
    retrieval_query="""   
    MATCH (c:Chunk {chunk_id: node.chunk_id})   // 청크 ID 기반 매칭
    OPTIONAL MATCH (c)<-[:CONTAINS]-(s:Section)  // 섹션 매칭
    OPTIONAL MATCH window = (prev:Chunk)-[:NEXT*0..1]->(c)-[:NEXT*0..1]->(next:Chunk)  // 이전 청크-> 검색 청크->다음 청크 매칭
    WITH DISTINCT c, s, score, nodes(window) as context_nodes  // 고유 청크 노드 추출
    WITH 
      // 청크 노드의 내용을 결합하여 문맥 텍스트 생성
      apoc.text.join([chunk IN context_nodes | chunk.content], '\n\n') AS context_text,
      s.name AS section_name,  // 섹션 이름
      c.section_start_page AS section_page,  // 섹션 시작 페이지
      c.order AS chunk_order,  // 청크 순서
      score  // 점수
    RETURN DISTINCT context_text AS text, score, {
      section: section_name,  // 섹션 이름
      section_page: section_page,  // 섹션 시작 페이지
      chunk_order: chunk_order  // 청크 순서
    } AS metadata
    """
)

In [None]:
# 테스트 질문 (보고서 p.14에서 인용)
test_query = "What recognition did Tesla receive in the 2024 American Opportunity Index??"

retrieved_docs = vector_store.similarity_search_with_score(test_query, k=5)
print(f"검색된 문서의 수: {len(retrieved_docs)}")

# 파이썬에서 중복 제거
def remove_duplicates(retrieved_docs):
    unique_results = []
    seen_chunk_orders = set()

    for doc, score in retrieved_docs:
        chunk_order = doc.metadata.get('chunk_order')
        section = doc.metadata.get('section')
        
        # 고유 식별자로 chunk_order와 section 조합 사용
        unique_id = (chunk_order, section)
        
        if unique_id not in seen_chunk_orders:
            seen_chunk_orders.add(unique_id)
            unique_results.append(doc)
    
    return unique_results


unique_docs = remove_duplicates(retrieved_docs)
print(f"중복 제거 후 문서 수: {len(unique_docs)}")
print("=" * 100)

# 검색된 문서 및 점수 확인
for doc in unique_docs:
    print(f"Section: {doc.metadata['section']}")
    print(f"Section Page: {doc.metadata['section_page']}")
    print(f"Chunk Order: {doc.metadata['chunk_order']}")
    print("-" * 100)

In [None]:
from pprint import pprint

pprint(section_docs_split['Business'][12].metadata)
print("-" * 100)
pprint(section_docs_split['Business'][12].page_content)

In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# RAG용 프롬프트 템플릿
template = """
당신은 테슬라 10-K 보고서 전문가입니다. 다음 정보를 바탕으로 질문에 답변해 주세요. 
출처를 포함하여 답변해 주세요. (출처: 섹션 - 페이지 - 청크 순서)
질문과 같은 언어로 답변해 주세요.

[10-K 보고서 내용]
{context}

[질문]
{question}

[답변] 
"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

# RAG 체인 구성
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
llm = ChatOpenAI(model="gpt-4.1", temperature=0)
    
# 파이썬에서 중복 제거
def remove_duplicates(retrieved_docs):
    unique_results = []
    seen_chunk_orders = set()

    for doc in retrieved_docs:
        chunk_order = doc.metadata.get('chunk_order')
        section = doc.metadata.get('section')
        
        # 고유 식별자로 chunk_order와 section 조합 사용
        unique_id = (chunk_order, section)
        
        if unique_id not in seen_chunk_orders:
            seen_chunk_orders.add(unique_id)
            unique_results.append(doc)
    
    return unique_results

def format_retriever_results(results):
    # 검색된 결과에서 청크 내용을 추출하여 리스트로 반환 (메타데이터 포함)
    return "\n-------------------\n".join([f"출처: {doc.metadata['section']} 섹션(p.{doc.metadata['section_page']}) - 청크 순서 {doc.metadata['chunk_order']}\n\n{doc.page_content}" for doc in results])

# RAG 체인 구성
rag_chain = (
    {"context": retriever | remove_duplicates | format_retriever_results, "question": RunnablePassthrough()}
    | prompt 
    | llm 
    | StrOutputParser()
)

# 질문에 대한 답변 생성
test_query = "What recognition did Tesla receive in the 2024 American Opportunity Index?"
response = rag_chain.invoke(test_query)

print(response)

In [None]:
from pprint import pprint

pprint(section_docs_split['Business'][12].metadata)
print("-" * 100)
pprint(section_docs_split['Business'][12].page_content)

In [None]:
# 질문에 대한 답변 생성
test_query = "테슬라의 2024년 HR 정책에 대해서 설명해 주세요."
response = rag_chain.invoke(test_query)

print(response)