## 0. Graph DB연결하기
* 영화 데이터 셋 활용

In [1]:
from neo4j import GraphDatabase, basic_auth


driver = GraphDatabase.driver(
  "neo4j://54.197.76.205:7687",
  auth=basic_auth("neo4j", "decorations-option-independence"))

In [2]:
# 로컬에 있는 .env 를 통해 API key 불러옴 
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

### DB 스키마 생성하기
- Text2Cyper 활용할 것임
- DB 스키마를 LLM이 잘 알고 있어야함

In [4]:
from collections import defaultdict

def get_schema():
    schema = ""
    with driver.session() as session:
        # 모든 노드 라벨과 속성 추출
        node_schema = session.run("""
        CALL db.schema.nodeTypeProperties() YIELD nodeType, propertyName, propertyTypes
        RETURN nodeType, propertyName, propertyTypes
        """)

        nodes = defaultdict(dict)
        for record in node_schema:
            label = record["nodeType"].replace(":", "")
            prop = record["propertyName"]
            types = record["propertyTypes"]
            nodes[label][prop] = types[0] if types else "UNKNOWN"

        # 모든 관계 타입과 속성 추출
        rel_schema = session.run("""
        CALL db.schema.relTypeProperties() YIELD relType, propertyName, propertyTypes
        RETURN relType, propertyName, propertyTypes
        """)

        relationships = defaultdict(dict)
        for record in rel_schema:
            rel = record["relType"]
            prop = record["propertyName"]
            types = record["propertyTypes"]
            relationships[rel][prop] = types[0] if types else "UNKNOWN"

        # 관계 방향 및 타입 추출
        rel_types = session.run("""
        MATCH (a)-[r]->(b)
        RETURN DISTINCT labels(a) AS from_labels, type(r) AS rel_type, labels(b) AS to_labels
        """)

        rel_directions = set()
        for record in rel_types:
            from_label = f":{record['from_labels'][0]}"
            to_label = f":{record['to_labels'][0]}"
            rel_type = record['rel_type']
            rel_directions.add(f"({from_label})-[:{rel_type}]->({to_label})")

    # 출력
    schema += "\nNode properties:\n"
    for label, props in nodes.items():
        prop_str = ", ".join(f"{k}: {v}" for k, v in props.items())
        schema += f"{label} {{{prop_str}}}\n"

    schema += "\nRelationship properties:\n"
    for rel, props in relationships.items():
        prop_str = ", ".join(f"{k}: {v}" for k, v in props.items())
        schema += f"{rel} {{{prop_str}}}\n"

    schema += "\nThe relationships:\n"
    for rel in sorted(rel_directions):
        schema += f"{rel}\n"
    return schema

schema = get_schema()



In [5]:
print(schema)


Node properties:
`Genre` {name: String}
`User` {name: String, userId: String}
`Director``Person` {name: String, imdbId: String, tmdbId: String, poster: String, born: Date, died: Date, bornIn: String, bio: String, url: String}
`Actor``Person` {name: String, imdbId: String, tmdbId: String, poster: String, born: Date, died: Date, bornIn: String, bio: String, url: String}
`Actor``Director``Person` {name: String, imdbId: String, tmdbId: String, poster: String, born: Date, died: Date, bornIn: String, bio: String, url: String}
`Movie` {movieId: String, imdbId: String, title: String, tmdbId: String, year: Long, countries: StringArray, languages: StringArray, plot: String, imdbRating: Double, imdbVotes: Long, released: String, runtime: Long, poster: String, revenue: Long, budget: Long, url: String}

Relationship properties:
:`IN_GENRE` {None: UNKNOWN}
:`RATED` {rating: Double, timestamp: Long}
:`ACTED_IN` {role: String}
:`DIRECTED` {role: String}

The relationships:
(:Actor)-[:ACTED_IN]->(:Mov

## 1. Text2Cyper기반 GraphRAG Agent 만들기

### 1) Graph State 설정하기

In [6]:
from operator import add
from typing import Annotated, List

from typing_extensions import TypedDict

class InputState(TypedDict):
    question: str

class OverallState(TypedDict):
    question: str # 사용자 질문을 받아옴
    next_action: str # 다음단계
    cypher_statement: str # Cypher 쿼리
    cypher_errors: List[str] # Cypher 쿼리 오류
    database_records: List[dict] # DB 실행 결과 
    steps: Annotated[List[str], add] # 그래프 실행 단계

class OutputState(TypedDict):
    answer: str
    steps: List[str]
    cypher_statement: str

### 2) 가드레일(일반질문 vs 영화질문) 노드 추가 

In [7]:
from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

guardrails_system = """
As an intelligent assistant, your primary objective is to decide whether a given question is related to movies or not. 
If the question is related to movies, output "movie". Otherwise, output "end".
To make this decision, assess the content of the question and determine if it refers to any movie, actor, director, film industry, 
or related topics. Provide only the specified output: "movie" or "end".
"""
guardrails_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            guardrails_system,
        ),
        (
            "human",
            ("{question}"),
        ),
    ]
)


class GuardrailsOutput(BaseModel):
    decision: Literal["movie", "end"] = Field(
        description="Decision on whether the question is related to movies"
    )


guardrails_chain = guardrails_prompt | llm.with_structured_output(GuardrailsOutput)

In [8]:
def guardrails(state: InputState) -> OverallState: # 타입 힌트 
    """
    Decides if the question is related to movie or not.
    """
    print("-- GUARDRAILS --")
    question = state["question"]
    guardrails_output = guardrails_chain.invoke({"question": question}) # 랭체인 실행
    print("Guardrails output:", guardrails_output)

    database_records = None
    if guardrails_output.decision =="end":
        database_records = "This questions is not about moveis of their cast. Therefore I cannot answer this question"
    return {
        "next_action": guardrails_output.decision, 
        "database_records": database_records,
        "steps": ["guardrail"] # 그래프의 흐름을 추적
    } # OverallState의 일부만 반환, LangGraph가 기존 State에 병합

### 3) Text2Cypher 쿼리 생성 노드 추가 

In [9]:
examples = [
    {
        "question": "How many movies has Tom Hanks acted in?",
        "query": "MATCH (a:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie) RETURN count(m)",
    },
    {
        "question": "List all the genres of the movie Schindler's List",
        "query": "MATCH (m:Movie {title: 'Schindler's List'})-[:IN_GENRE]->(g:Genre) RETURN g.name",
    },
    {
        "question": "Which actors have worked in movies from both the comedy and action genres?",
        "query": "MATCH (a:Person)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g1:Genre), (a)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g2:Genre) WHERE g1.name = 'Comedy' AND g2.name = 'Action' RETURN DISTINCT a.name",
    },
    {
        "question": "Find the actor with the highest number of movies in the database.",
        "query": "MATCH (a:Actor)-[:ACTED_IN]->(m:Movie) RETURN a.name, COUNT(m) AS movieCount ORDER BY movieCount DESC LIMIT 1",
    },
]

In [10]:
# 프롬프트를 통해 싸이퍼쿼리문 생성
# 시스템 프롬프트
# 휴먼 프롬프트: 3가지 파라미터 입력 (스키마 정보, 퓨샷 예제, 유저 질문)


from langchain_core.output_parsers import StrOutputParser

text2cypher_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "Given an input question, convert it to a Cypher query. No pre-amble."
                "Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!"
            ),
        ),
        (
            "human",
            (
                """You are a Neo4j expert. Given an input question, create a syntactically correct Cypher query to run.
                    Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!
                    Here is the schema information
                    {schema}

                    Below are a number of examples of questions and their corresponding Cypher queries.

                    {fewshot_examples}

                    User input: {question}
                    Cypher query:
                """
            ),
        ),
    ]
)


text2cypher_chain = text2cypher_prompt | llm | StrOutputParser() # Str 형태로 만드는 체인 


def generate_cypher(state: OverallState) -> OverallState:
    """
    Generates a cypher statement based on the provided schema and user input
    """
    print("-- GENERATE CYPHER --")

    question = state["question"]
    fewshot_examples = str(examples)
    schema = get_schema()

    generated_cypher = text2cypher_chain.invoke(
        {
            "question": question,
            "fewshot_examples": fewshot_examples,
            "schema": schema,
        }
    )
    print("Generated Cypher:", generated_cypher)
    return {"cypher_statement": generated_cypher, "steps": ["generate_cypher"]}

### 4) 쿼리 문법 검사 노드 추가 

In [11]:
from typing import List, Optional

validate_cypher_system = """
You are a Cypher expert reviewing a statement written by a junior developer.
"""

validate_cypher_user = """You must check the following:
* Are there any syntax errors in the Cypher statement?
* Are there any missing or undefined variables in the Cypher statement?
* Are any node labels missing from the schema?
* Are any relationship types missing from the schema?
* Are any of the properties not included in the schema?
* Does the Cypher statement include enough information to answer the question?

Examples of good errors:
* Label (:Foo) does not exist, did you mean (:Bar)?
* Property bar does not exist for label Foo, did you mean baz?
* Relationship FOO does not exist, did you mean FOO_BAR?

Schema:
{schema}

The question is:
{question}

The Cypher statement is:
{cypher}

Make sure you don't make any mistakes!"""

# ChatPromptTemplate.from_messages(): 메시지 리스트로 프롬프트 생성
# 구조: [("role", "content"), ("role", "content"), ...]
# - role: "system", "human", "ai" 중 하나
# - content: 문자열 또는 변수

validate_cypher_prompt = ChatPromptTemplate.from_messages(
    [
        # 튜플 1: 시스템 메시지 (LLM의 역할/지침 정의)
        (
            "system",
            validate_cypher_system,  # 시스템 프롬프트 변수
        ),
        # 튜플 2: 사용자 메시지 (실제 입력)
        (
            "human",
            validate_cypher_user,    # (validate_cypher_user)에서 불필요한 괄호 제거
        ),
    ]
)
class ValidateCypherOutput(BaseModel):
    """
    Represents the validation result of a Cypher query's output, including any errors.
    """

    errors: Optional[List[str]] = Field(
        description="A list of syntax or semantical errors in the Cypher statement. Always explain the discrepancy between schema and Cypher statement"
    )


validate_cypher_chain = validate_cypher_prompt | llm.with_structured_output(ValidateCypherOutput)

In [12]:
from neo4j.exceptions import CypherSyntaxError

def validate_cypher(state:OverallState) -> OverallState:
    """
    Validates the Cypher statements and maps any property values to the database.
    """
    print("-- VALIDATE CYPHER --")
    errors = []

    cypher = state["cypher_statement"] # 이전 노드 (generate_cypher)에서 생성한 Cypher 쿼리를 State에서 꺼냄
    try:
        # try-except구문: "예외"가 발생하면 잡아라라는 의미
        # 시도해봐
        driver.execute_query(f"EXPLAIN {cypher}") # EXPAIN 명령어: Cypher 쿼리를 실제 실행하지 않고 문법만 검증 , 문법 오류 있으면 CypherSyntaxError 예외 발생        
    except CypherSyntaxError as e: # 문법 오류 발생시 오류 메시지를 errors 리스트에 추가 
        # 예외가 발생하면 여기서 처리해 
        errors.append(e.messages)
        print("syntax error:", e.message)

    question = state["question"]
    llm_output = validate_cypher_chain.invoke(
        {
            "question": question,
            "schema": schema,
            "cypher": cypher
        }
    )

    if llm_output.errors: # LLM이 발견한 오류가 있으면 errors 리스트에 추가
        errors.extend(llm_output.errors) # extend(): 리스트에 여러 항목을 한번에 추가
    print("LLM output errors:", llm_output.errors)

    if errors:
        next_action = "correct_cypher"
    else:
        next_action = "execute_cypher"

    return {
        "next_action": next_action,         
        "cypher_statement": cypher,
        "cypher_errors": errors,
        "steps": ["validate_cypher"]
    }

### 5) 쿼리 수정 노드 추가 

In [13]:
correct_cypher_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "You are a Cypher expert reviewing a statement written by a junior developer. "
                "You need to correct the Cypher statement based on the provided errors. No pre-amble."
                "Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!"
            ),
        ),
        (
            "human",
            (
                """Check for invalid syntax or semantics and return a corrected Cypher statement.

                Schema:
                {schema}

                Note: Do not include any explanations or apologies in your responses.
                Do not wrap the response in any backticks or anything else.
                Respond with a Cypher statement only!

                Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.

                The question is:
                {question}

                The Cypher statement is:
                {cypher}

                The errors are:
                {errors}

                Corrected Cypher statement: """
            ),
        ),
    ]
)
# 입력: 4종 - 스키마, 질문, 생성되었던 싸이퍼 쿼리문, 생성 싸이퍼 쿼리문의 에러
# 출력: 에러 수정된 싸이퍼 쿼리문


correct_cypher_chain = correct_cypher_prompt | llm | StrOutputParser()


def correct_cypher(state: OverallState) -> OverallState:
    """
    Correct the Cypher statement based on the provided errors.
    """
    print("-- CORRECT CYPHER --")
    question = state["question"]
    errors = state["cypher_errors"]
    cypher = state["cypher_statement"]

    corrected_cypher = correct_cypher_chain.invoke(
        {
            "question": question,
            "errors": errors,  
            "cypher": cypher,
            "schema": schema,
        }
    )
    print("Corrected Cypher statement:", corrected_cypher)

    return {
        "next_action": "validate_cypher",
        "cypher_statement": corrected_cypher,
        "steps": ["correct_cypher"],
    }

### 6) DB 검색 노드 (쿼리 실행) 추가

In [None]:
no_results = "I couldn't find any relevant information in the database"

def execute_cypher(state: OverallState) -> OverallState:
    """
    Executes the given Cypher statement.
    """
    print("-- EXECUTE CYPHER --")
    cypher_statement = state["cypher_statement"] #이전 노드에서 생성된 Cypher 쿼리 가져오기 
    try:
        with driver.session(database="neo4j") as session:
            records = session.execute_read(
                lambda tx: tx.run(cypher_statement).data()) # 읽기전용 트랜잭션 실행, .data(): 결과를 Python 딕셔너리 리스트로 변환환
                                                            # 실행 흐름: tx.run(cypher) → Result 객체 → .data() → [{"title": "The Matrix", ...}, ...]

    except Exception as e:
            records = str(e)

    print("Cypher execution results:", records)
    return {
        "database_records": records if records else no_results,  # 다음 노드에서 사용할 DB 조회 결과 (쿼리 결과 또는 no_results 메시지)
        "next_action": "end", # 다음 단계 지시 (종료)
        "steps": ["execute_cypher"] # 실행 이력 기록
    }

### 7) 조회 결과의 관련성 평가 노드

In [None]:
from pydoc import describe


context_relevance_prompt = ChatPromptTemplate.from_messages(
    [
        ("system",
        (
            "You are an expert assistant trained to judge whether a set of database query results "
            "provides sufficient information to answer a user's natural language question."
            "You must make a binary decision: either the results contain enough information to fully answer the question,"
            "or they do not. Your judgement should be based strictly on the content provided in the results."
        )),
        ("human",
        (
            """Given the user question and the query results from a database, 
            determine whether the results contain enough information to answer the question. 

            Return your response with two fields:
            score: 'yes' if sufficient, 'no' if not
                - If the results fully answer the question, respond only with "yes". 
                - If the results do not provide enough context or are incompete, respond only with "no"
            feedback: a brief explanation of why the results are or are not sufficient

            Question:
            {question}

            Results:
            {results}
            """
        ))
    ]
)

class RelevanceScore(BaseModel):
    scores: str = Field(description="Relevance score 'yes' or 'no'")
    feedback: str = Field(
        description="Feedback on the relevance of the results to the question"
    )

context_relevance_chain = context_relevance_prompt | llm.with_structured_output(RelevanceScore)

In [None]:
def relevance(state: OverallState) -> OverallState: