In [1]:
!pip install --quiet llama-index-core llama-index-utils-workflow llama-index-llms-openai llama-index-graph-stores-neo4j 

In [1]:
import os
from llama_index.llms.openai import OpenAI
from neo4j.exceptions import CypherSyntaxError

from llama_index.core import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing import List, Literal, Union, Optional

from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)

  from pandas.core import (


In [2]:
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore

graph_store = Neo4jPropertyGraphStore(
    username="recommendations",
    password="recommendations",
    database="recommendations",
    url="neo4j+s://demo.neo4jlabs.com:7687",
    enhanced_schema=True,
    create_indexes=False
)

In [8]:
os.environ["OPENAI_API_KEY"] = "sk-"
llm = OpenAI(model="gpt-4o", temperature=0)

In [59]:
class Guardrail(BaseModel):
    """Guardrail"""

    decision: Literal["movie", "end"] = Field(
        description="Decision on whether the question is related to movies"
        
    )
guardrails_system_prompt = """As an intelligent assistant, your primary objective is to decide whether a given question is related to movies or not.
If the question is related to movies, output "movie". Otherwise, output "end".
To make this decision, assess the content of the question and determine if it refers to any movie, actor, director, film industry,
or related topics. Provide only the specified output: "movie" or "end"."""
# Refine Prompt
chat_refine_msgs = [
    (
        "system",
        guardrails_system_prompt,
    ),
    ("user", "The question is: {question}"),
]
guardrails_template = ChatPromptTemplate.from_messages(chat_refine_msgs)

In [60]:
class SubqueriesOutput(BaseModel):
    """Defines the output format for transforming a question into parallel-optimized retrieval steps."""
    
    plan: List[List[str]] = Field(description=("""A list of query groups where:
        - Each group (inner list) contains queries that can be executed in parallel
        - Groups are ordered by dependency (earlier groups must be executed before later ones)
        - Each query must be a specific information retrieval request
        - No reasoning or comparison tasks, only data fetching queries"""))

subqueries_system = """You are a query planning optimizer. Your task is to break down complex questions into efficient, parallel-optimized retrieval steps. Focus ONLY on information retrieval queries, not analysis or reasoning steps.

Key Requirements:
- Group queries that can be executed in parallel into the same list
- Order groups based on data dependencies
- Include ONLY specific information retrieval queries
- Exclude reasoning tasks, comparisons, or analysis steps
- Prioritize queries that can be executed first and in parallel

For simple, directly answerable questions, return a single query in a single group.

Example 1:
User: "What was the impact of the 2008 financial crisis on Bank of America's stock price and employee count?"
Assistant: [
    # Group 1 - These can be fetched in parallel
    [
        "What was Bank of America's stock price history from 2007 to 2009?",
        "What was Bank of America's total employee count in 2007?",
        "What was Bank of America's total employee count in 2009?"
    ]
]

Example 2:
User: "Compare the performance of Tesla's Model 3 with BMW's competing models in terms of range and acceleration."
Assistant: [
    # Group 1 - Basic specs can be fetched in parallel
    [
        "What is the EPA range of the Tesla Model 3?",
        "What is the 0-60 mph acceleration time of the Tesla Model 3?",
        "What BMW models compete directly with the Tesla Model 3?"
    ],
    # Group 2 - Depends on knowing competing models from group 1
    [
        "What is the EPA range of each identified BMW competitor model?",
        "What is the 0-60 mph acceleration time of each identified BMW competitor model?"
    ]
]

Remember:
- Focus on data retrieval only
- Maximize parallel execution opportunities
- Maintain necessary sequential ordering
- Keep queries specific and self-contained
- Prioritize independent queries first"""

query_decompose_msgs = [
    ("system", subqueries_system),
    ("user", "{question}")
]

subquery_template = ChatPromptTemplate.from_messages(query_decompose_msgs)

In [61]:
def guardrails_step(question):
    guardrails_output = (
        llm.as_structured_llm(Guardrail)
        .complete(guardrails_template.format(question=question))
        .raw
    ).decision
    if guardrails_output == 'end':
        context = "The question is not about movies or their case, so I cannot answer this question"
        return {"next_event": "generate_final_answer", "arguments": {"context": context, "question": question}}
    # Refactor into separate step
    queries_output = (
        llm.as_structured_llm(SubqueriesOutput)
        .complete(subquery_template.format(question=question))
        .raw
    ).plan
    return {"next_event": "generate_cypher", "arguments": {"plan": queries_output, "question": question}}


In [62]:
guardrails_step("Who has appeared in more movies: Leonardo DiCaprio or the actor who has co-starred most frequently with the director of Tom Hanks' most critically acclaimed movie??")

{'next_event': 'generate_cypher',
 'arguments': {'plan': [["What is Tom Hanks' most critically acclaimed movie?"],
   ["Who directed Tom Hanks' most critically acclaimed movie?"],
   ['Which actor has co-starred most frequently with the director identified?'],
   ['How many movies has Leonardo DiCaprio appeared in?',
    'How many movies has the actor identified in the previous step appeared in?']],
  'question': "Who has appeared in more movies: Leonardo DiCaprio or the actor who has co-starred most frequently with the director of Tom Hanks' most critically acclaimed movie??"}}

In [63]:
from llama_index.core.schema import TextNode
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding

embed_model = OpenAIEmbedding(model="text-embedding-3-small")


examples = [
    {
        "question": "How many artists are there?",
        "query": "MATCH (a:Person)-[:ACTED_IN]->(:Movie) RETURN count(DISTINCT a)",
    },
    {
        "question": "Which actors played in the movie Casino?",
        "query": "MATCH (m:Movie {title: 'Casino'})<-[:ACTED_IN]-(a) RETURN a.name",
    },
    {
        "question": "How many movies has Tom Hanks acted in?",
        "query": "MATCH (a:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie) RETURN count(m)",
    },
    {
        "question": "List all the genres of the movie Schindler's List",
        "query": "MATCH (m:Movie {title: 'Schindler's List'})-[:IN_GENRE]->(g:Genre) RETURN g.name",
    },
    {
        "question": "Which actors have worked in movies from both the comedy and action genres?",
        "query": "MATCH (a:Person)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g1:Genre), (a)-[:ACTED_IN]->(:Movie)-[:IN_GENRE]->(g2:Genre) WHERE g1.name = 'Comedy' AND g2.name = 'Action' RETURN DISTINCT a.name",
    },
    {
        "question": "Which directors have made movies with at least three different actors named 'John'?",
        "query": "MATCH (d:Person)-[:DIRECTED]->(m:Movie)<-[:ACTED_IN]-(a:Person) WHERE a.name STARTS WITH 'John' WITH d, COUNT(DISTINCT a) AS JohnsCount WHERE JohnsCount >= 3 RETURN d.name",
    },
    {
        "question": "Identify movies where directors also played a role in the film.",
        "query": "MATCH (p:Person)-[:DIRECTED]->(m:Movie), (p)-[:ACTED_IN]->(m) RETURN m.title, p.name",
    },
    {
        "question": "Find the actor with the highest number of movies in the database.",
        "query": "MATCH (a:Actor)-[:ACTED_IN]->(m:Movie) RETURN a.name, COUNT(m) AS movieCount ORDER BY movieCount DESC LIMIT 1",
    },
]

few_shot_nodes = []
for line in examples:
    few_shot_nodes.append(TextNode(text=f"{{'query':{line['query']}, 'question': {line['question']}))"))

few_shot_index = VectorStoreIndex(few_shot_nodes, embed_model=embed_model)
few_shot_retriever = few_shot_index.as_retriever(similarity_top_k=5)


def get_fewshots(question):
    return [el.text for el in few_shot_retriever.retrieve(question)]

In [64]:
generate_system = """Given an input question, convert it to a Cypher query. No pre-amble.
Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!"""

generate_user = """You are a Neo4j expert. Given an input question, create a syntactically correct Cypher query to run.
Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!
Here is the schema information
{schema}

Below are a number of examples of questions and their corresponding Cypher queries.

{fewshot_examples}

User input: {question}
Cypher query:"""

generate_cypher_msgs = [
    (
        "system",
        generate_system,
    ),
    ("user", generate_user),
]

text2cypher_prompt = ChatPromptTemplate.from_messages(generate_cypher_msgs)

In [65]:
schema = graph_store.get_schema_str(exclude_types=["Actor", "Director"])

async def generate_cypher(subquery):
    fewshot_examples = get_fewshots(subquery)
    resp = await llm.achat(text2cypher_prompt.format_messages(question=subquery, schema=schema, fewshot_examples=fewshot_examples))
    return resp.message.content

In [66]:
validate_cypher_system = """
You are a Cypher expert reviewing a statement written by a junior developer.
"""

validate_cypher_user = """You must check the following:
* Are there any syntax errors in the Cypher statement?
* Are there any missing or undefined variables in the Cypher statement?
* Are any node labels missing from the schema?
* Are any relationship types missing from the schema?
* Are any of the properties not included in the schema?
* Does the Cypher statement include enough information to answer the question?

Examples of good errors:
* Label (:Foo) does not exist, did you mean (:Bar)?
* Property bar does not exist for label Foo, did you mean baz?
* Relationship FOO does not exist, did you mean FOO_BAR?

Schema:
{schema}

The question is:
{question}

The Cypher statement is:
{cypher}

Make sure you don't make any mistakes!"""

validate_cypher_msgs = [
    (
        "system",
        validate_cypher_system,
    ),
    ("user", validate_cypher_user),
]

validate_cypher_prompt = ChatPromptTemplate.from_messages(validate_cypher_msgs)

class Property(BaseModel):
    """
    Represents a filter condition based on a specific node property in a graph in a Cypher statement.
    """

    node_label: str = Field(
        description="The label of the node to which this property belongs."
    )
    property_key: str = Field(description="The key of the property being filtered.")
    property_value: str = Field(
        description="The value that the property is being matched against."
    )


class ValidateCypherOutput(BaseModel):
    """
    Represents the validation result of a Cypher query's output,
    including any errors and applied filters.
    """

    errors: Optional[List[str]] = Field(
        description="A list of syntax or semantical errors in the Cypher statement. Always explain the discrepancy between schema and Cypher statement"
    )
    filters: Optional[List[Property]] = Field(
        description="A list of property-based filters applied in the Cypher statement."
    )

In [67]:
from llama_index.graph_stores.neo4j import CypherQueryCorrector, Schema

# Cypher query corrector is experimental
corrector_schema = [
    Schema(el["start"], el["type"], el["end"])
    for el in graph_store.get_schema().get("relationships")
]
cypher_query_corrector = CypherQueryCorrector(corrector_schema)

In [68]:
def validate_cypher(question, cypher):
    """
    Validates the Cypher statements and maps any property values to the database.
    """
    errors = []
    mapping_errors = []
    # Check for syntax errors
    try:
        graph_store.structured_query(f"EXPLAIN {cypher}")
    except CypherSyntaxError as e:
        errors.append(e.message)
    # Experimental feature for correcting relationship directions
    corrected_cypher = cypher_query_corrector(cypher)
    if not corrected_cypher:
        errors.append("The generated Cypher statement doesn't fit the graph schema")
    # Use LLM to find additional potential errors and get the mapping for values
    llm_output =   (
        llm.as_structured_llm(ValidateCypherOutput)
        .complete(validate_cypher_prompt.format(question=question, cypher=cypher, schema=schema))
        .raw
    )
    print(f"LLM:{llm_output}")
    if llm_output.errors:
        errors.extend(llm_output.errors)
    if llm_output.filters:
        for filter in llm_output.filters:
            # Do mapping only for string values
            if (
                not [
                    prop
                    for prop in graph_store.get_schema()["node_props"][
                        filter.node_label
                    ]
                    if prop["property"] == filter.property_key
                ][0]["type"]
                == "STRING"
            ):
                continue
            print(f"Mapping: {filter}")
            mapping = graph_store.structured_query(
                f"MATCH (n:{filter.node_label}) WHERE toLower(n.`{filter.property_key}`) = toLower($value) RETURN 'yes' LIMIT 1",
                {"value": filter.property_value},
            )
            if not mapping:
                print(
                    f"Missing value mapping for {filter.node_label} on property {filter.property_key} with value {filter.property_value}"
                )
                mapping_errors.append(
                    f"Missing value mapping for {filter.node_label} on property {filter.property_key} with value {filter.property_value}"
                )
    if mapping_errors:
        next_action = "end"
    elif errors:
        next_action = "correct_cypher"
    else:
        next_action = "execute_cypher"

    return {
        "next_action": next_action,
        "cypher_statement": corrected_cypher,
        "cypher_errors": errors,
        "mapping_errors": mapping_errors,
        "steps": ["validate_cypher"],
    }

In [69]:
correct_cypher_system = """You are a Cypher expert reviewing a statement written by a junior developer. 
You need to correct the Cypher statement based on the provided errors. No pre-amble."
Do not wrap the response in any backticks or anything else. Respond with a Cypher statement only!"""

correct_cypher_user = """Check for invalid syntax or semantics and return a corrected Cypher statement.

Schema:
{schema}

Note: Do not include any explanations or apologies in your responses.
Do not wrap the response in any backticks or anything else.
Respond with a Cypher statement only!

Do not respond to any questions that might ask anything else than for you to construct a Cypher statement.

The question is:
{question}

The Cypher statement is:
{cypher}

The errors are:
{errors}

Corrected Cypher statement: """

# Correct cypher
correct_cypher_msgs = [
    (
        "system",
        correct_cypher_system,
    ),
    ("user", correct_cypher_user),
]

correct_cypher_prompt = ChatPromptTemplate.from_messages(generate_cypher_msgs)

In [70]:
async def correct_cypher(subquery, cypher, errors):
    resp = await llm.achat(correct_cypher_prompt.format_messages(question=subquery, schema=schema, errors=errors))
    return resp.message.content

In [71]:
information_check_system = """
You are an expert assistant that evaluates whether a set of subqueries, their results, any existing condensed information, and the current query plan provide enough details to answer a given question. Your task is to:

1. Analyze if the available information is sufficient
2. Review the remaining steps in the query plan (if any) to determine if they:
   - Should be kept as is
   - Need to be modified
   - Can be skipped (if information is already available)
   - Should be reorganized for better parallel execution

Follow this process:
1. Analyze the original question to understand the required information
2. Review the subqueries, their results, and provided condensed information
3. Check for gaps between the original question and available information
4. Use the condensed information as a **dynamic notebook**:
   - Continuously update it with key details from subquery results and partial answers
   - Refine the summary to integrate newly available information into a coherent and concise format
   - Ensure the condensed summary reflects the most current state of knowledge, including connections between facts
   - Use this notebook as the basis for answering the original question or refining further subqueries
5. If sufficient information exists:
   - Generate a concise answer to the original question based on the updated condensed summary
   - Indicate if remaining plan steps can be skipped
6. If insufficient:
   - Suggest additional subqueries, focusing on information retrieval only
   - Organize suggested queries to maximize parallel execution
   - Only include sequential steps when there's a true data dependency

### Key Guidelines:
- Focus only on information retrieval queries
- Group parallel-executable queries together
- Maintain sequential order only when strictly necessary
- Exclude reasoning/analysis tasks from the query plan
- Treat the condensed information as the **central knowledge base**:
   - Continuously refine it to close gaps between the question and the available data
   - Ensure it is detailed enough to serve as the sole reference for generating a final answer or planning further steps

""" 

information_check_user = """
Original question: {question}  
Subqueries and their results:  
{subqueries}  
Existing dynamic notebook:  
{dynamic_notebook}
Current remaining plan (if any):
{plan}
"""

information_check_msgs = [
    (
        "system",
        information_check_system,
    ),
    ("user", information_check_user),
]

information_check_prompt = ChatPromptTemplate.from_messages(information_check_msgs)

class IFOutput(BaseModel):
    """
    Represents the output of an information sufficiency evaluation process. 
    Contains either a condensed summary of the available information or additional subqueries needed to answer the original question.
    """

    dynamic_notebook: str = Field(
        description="A continuously updated and refined summary integrating subquery results and condensed information. Serves as the central knowledge base to address the original question and guide further subqueries if necessary."
    )
    modified_plan: Optional[List[List[str]]] = Field(
        description="Modified version of remaining plan steps, if any changes are needed. Each group contains queries that can be executed in parallel. Null if no changes needed or no remaining plan exists."
    )

In [72]:
def format_subqueries_for_prompt(information_checks: list) -> str:
    """
    Converts a list of InformationCheck objects into a string that can be added to a prompt.
    
    Args:
        information_checks (List[InformationCheck]): List of information checks to process.
    
    Returns:
        str: A formatted string representing subqueries and their results.
    """
    subqueries_and_results = []
    
    for check in information_checks:
        # Extract the first result if available, otherwise use "No result available."
        result = (
            check.database_output[0] if check.database_output else "No result available."
        )
        subqueries_and_results.append(
            f"- Subquery: {check.subquery}\n  Result: {result}"
        )
    
    return "\n".join(subqueries_and_results)

def information_check(subquery_events, original_question, dynamic_notebook, plan):
    subqueries = format_subqueries_for_prompt(subquery_events)
    print(f"Before: {dynamic_notebook}")
    llm_output =   (
        llm.as_structured_llm(IFOutput)
        .complete(information_check_prompt.format(subqueries=subqueries, original_question=original_question, dynamic_notebook=dynamic_notebook, plan=plan))
        .raw
    )
    print(f"After: {llm_output.dynamic_notebook}")
    return {'dynamic_notebook': llm_output.dynamic_notebook, 'modified_plan': llm_output.modified_plan}
        

In [73]:
final_answer_system = """You are a highly intelligent assistant trained to provide concise and accurate answers. You will be given a context and a user question. Your task is to analyze the context and answer the user question based on the information provided in the context. If the context lacks sufficient information to answer the question, inform the user and suggest what additional details are needed.

Focus solely on the context to form your response. Avoid making assumptions or using external knowledge unless explicitly stated in the context.
Ensure the final answer is clear, relevant, and directly addresses the user’s question.
If the question is ambiguous, ask clarifying questions to ensure accuracy before proceeding."""

final_answer_user = """
Based on this context:
{context}

Answer the following question:
<question>
{question}
</question>

Provide your answer based on the context above explain your reasoning.
If clarification or additional information is needed, explain why and specify what is required.
"""

final_answer_msgs = [
    (
        "system",
        final_answer_system,
    ),
    ("user", final_answer_user),
]

final_answer_prompt = ChatPromptTemplate.from_messages(final_answer_msgs)

async def generate_final_answer(question, context):
    resp = await llm.achat(final_answer_prompt.format_messages(question=question, context=context))
    return resp.message.content                 
    

In [74]:
class GenerateCypher(Event):
    subquery: str
    
class ValidateCypher(Event):
    subquery: str
    generated_cypher: str

class CorrectCypher(Event):
    cypher: str
    subquery: str
    errors: List[str]

class ExecuteCypher(Event):
    validated_cypher: str
    subquery: str

class InformationCheck(Event):
    cypher: str
    subquery: str
    database_output: list
    
class GenerateFinalAnswer(Event):
    context: str

class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> GenerateCypher | GenerateFinalAnswer:
        original_question = ev.input
        await ctx.set("original_question", original_question)
        await ctx.set("dynamic_notebook", "")
        await ctx.set("subqueries_cypher_history", {})
        guardrails_output = guardrails_step(original_question)
        if guardrails_output.get("next_event") == "generate_final_answer":
            context = "The question is not about movies or cast, so I cannot answer the question"
            return GenerateFinalAnswer(context=context)

        # store in global context 
        subqueries = guardrails_output["arguments"].get("plan")
        await ctx.set("count_of_subqueries", len(subqueries[0])) #we use this in ctx.collect()
        await ctx.set("plan", subqueries[1:]) #we use this in information check
        # Send events
        for subquery in subqueries[0]:
            print(subquery)
            ctx.send_event(GenerateCypher(subquery=subquery))

    @step(num_workers=4)
    async def generate_cypher_step(self, ctx: Context, ev: GenerateCypher) -> ValidateCypher:
        print("Running generate_cypher ", ev.subquery)
        generated_cypher = await generate_cypher(ev.subquery)
        return ValidateCypher(subquery=ev.subquery, generated_cypher=generated_cypher)

    @step(num_workers=4)
    async def validate_cypher_step(self, ctx: Context, ev: ValidateCypher) -> GenerateFinalAnswer | ExecuteCypher | CorrectCypher:
        print("Running validate_cypher ", ev)
        results = validate_cypher(ev.subquery, ev.generated_cypher)
        print(results)
        if results['next_action'] == "end": # DB value mapping
            return GenerateFinalAnswer(context=str(results["mapping_errors"]))
        if results['next_action'] == "execute_cypher":
            return ExecuteCypher(subquery=ev.subquery, validated_cypher=ev.generated_cypher)
        if results['next_action'] == "correct_cypher":
            return CorrectCypher(subquery=ev.subquery, cypher=ev.generated_cypher, errors=results['cypher_errors'])

    @step(num_workers=4)
    async def correct_cypher_step(self, ctx: Context, ev: CorrectCypher) -> ValidateCypher:
        print("Running validate_cypher ", ev)
        results = await correct_cypher(ev.subquery, ev.cypher, ev.errors)
        return ValidateCypher(subquery=ev.subquery, generated_cypher=results)
    
    @step
    async def execute_cypher_step(self, ctx: Context, ev: ExecuteCypher) -> InformationCheck:
        # wait until we receive all events
        print("Running execute_cypher_step ", ev)
        database_output = graph_store.structured_query(ev.validated_cypher)
        return InformationCheck(subquery=ev.subquery, cypher=ev.validated_cypher, database_output=database_output)

    @step
    async def information_check_step(self, ctx: Context, ev: InformationCheck) -> GenerateCypher | GenerateFinalAnswer:
        # wait until we receive all events
        print("Running information_check_step", ev)
        # retrieve from context
        number_of_subqueries = await ctx.get("count_of_subqueries")
        result = ctx.collect_events(ev, [InformationCheck] * number_of_subqueries)
        if result is None:
            return None
        # Add executed cypher statements to global state
        subqueries_cypher_history = await ctx.get("subqueries_cypher_history")
        new_subqueries_cypher = {
                item.subquery: {
                    "cypher": item.cypher,
                    "database_output": item.database_output
                } for item in result
            }
        await ctx.set("subqueries_cypher_history", {**subqueries_cypher_history, **new_subqueries_cypher})

        original_question = await ctx.get("original_question")
        dynamic_notebook = await ctx.get("dynamic_notebook")
        plan = await ctx.get("plan")

        # Do the information check
        data = information_check(result, original_question, dynamic_notebook, plan)
        # Go fetch additional information if needed
        if data.get("modified_plan"):
            await ctx.set("count_of_subqueries", len(data['modified_plan'][0])) # this is used for ctx.collect()
            await ctx.set("dynamic_notebook", data["dynamic_notebook"])
            await ctx.set("plan", data.get("modified_plan")[1:])
            for subquery in data["modified_plan"][0]:
                ctx.send_event(GenerateCypher(subquery=subquery))
        else:
            return GenerateFinalAnswer(context=data['dynamic_notebook'])

    @step
    async def final_answer(self, ctx: Context, ev: GenerateFinalAnswer) -> StopEvent:
        original_question = await ctx.get("original_question")
        subqueries_cypher_history = await ctx.get("subqueries_cypher_history")
        # wait until we receive all events
        print("Running final_answer ", ev)
        resp = await generate_final_answer(original_question, ev.context)
        return StopEvent(result={"text":resp, "subqueries_cypher_history": subqueries_cypher_history})

In [75]:
w = ConcurrentFlow(timeout=25, verbose=True)
result = await w.run(input="Who made more movies, Leonardo DiCaprio or Tom Hanks?")
print(result)

Running step start
How many movies has Leonardo DiCaprio acted in?
Step start produced no event
Running step generate_cypher_step
Running generate_cypher  How many movies has Leonardo DiCaprio acted in?
Step generate_cypher_step produced event ValidateCypher
Running step validate_cypher_step
Running validate_cypher  subquery='How many movies has Leonardo DiCaprio acted in?' generated_cypher="MATCH (a:Person {name: 'Leonardo DiCaprio'})-[:ACTED_IN]->(m:Movie) RETURN count(m)"
LLM:errors=None filters=[Property(node_label='Person', property_key='name', property_value='Leonardo DiCaprio')]
Mapping: node_label='Person' property_key='name' property_value='Leonardo DiCaprio'
{'next_action': 'execute_cypher', 'cypher_statement': "MATCH (a:Person {name: 'Leonardo DiCaprio'})-[:ACTED_IN]->(m:Movie) RETURN count(m)", 'cypher_errors': [], 'mapping_errors': [], 'steps': ['validate_cypher']}
Step validate_cypher_step produced event ExecuteCypher
Running step execute_cypher_step
Running execute_cyphe

In [76]:
w = ConcurrentFlow(timeout=30, verbose=False)
result = await w.run(input="Who made more movies, Leonardo di Caprio or Tom Hanks most frequent coactor?")
print(result)

Who is Tom Hanks' most frequent coactor?
Running generate_cypher  Who is Tom Hanks' most frequent coactor?
Running validate_cypher  subquery="Who is Tom Hanks' most frequent coactor?" generated_cypher="MATCH (p:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)<-[:ACTED_IN]-(coactor:Person)\nRETURN coactor.name, COUNT(m) AS appearances\nORDER BY appearances DESC\nLIMIT 1"
LLM:errors=None filters=[Property(node_label='Person', property_key='name', property_value='Tom Hanks')]
Mapping: node_label='Person' property_key='name' property_value='Tom Hanks'
{'next_action': 'execute_cypher', 'cypher_statement': "MATCH (p:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)<-[:ACTED_IN]-(coactor:Person)\nRETURN coactor.name, COUNT(m) AS appearances\nORDER BY appearances DESC\nLIMIT 1", 'cypher_errors': [], 'mapping_errors': [], 'steps': ['validate_cypher']}
Running execute_cypher_step  validated_cypher="MATCH (p:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)<-[:ACTED_IN]-(coactor:Person)\nRETU

In [77]:
w = ConcurrentFlow(timeout=60, verbose=True)
result = await w.run(input="Who has appeared in more movies: Leonardo DiCaprio or the actor who has co-starred most frequently with the director of Tom Hanks' most critically acclaimed movie??")
print(result)

Running step start
What is Tom Hanks' most critically acclaimed movie?
List all movies Leonardo DiCaprio has appeared in.
Step start produced no event
Running step generate_cypher_step
Running generate_cypher  What is Tom Hanks' most critically acclaimed movie?
Running step generate_cypher_step
Running generate_cypher  List all movies Leonardo DiCaprio has appeared in.
Step generate_cypher_step produced event ValidateCypher
Running step validate_cypher_step
Running validate_cypher  subquery="What is Tom Hanks' most critically acclaimed movie?" generated_cypher="MATCH (p:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)\nRETURN m.title, m.imdbRating\nORDER BY m.imdbRating DESC\nLIMIT 1"
LLM:errors=None filters=[Property(node_label='Person', property_key='name', property_value='Tom Hanks')]
Mapping: node_label='Person' property_key='name' property_value='Tom Hanks'
{'next_action': 'execute_cypher', 'cypher_statement': "MATCH (p:Person {name: 'Tom Hanks'})-[:ACTED_IN]->(m:Movie)\nRETURN 

In [None]:
w = ConcurrentFlow(timeout=30, verbose=False)
result = await w.run(input="What")
print(result)

In [29]:
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

draw_most_recent_execution(w, filename="joke_flow_recent.html")
draw_all_possible_flows(w, filename="joke_flow_recenst.html")

joke_flow_recent.html
<class 'NoneType'>
<class '__main__.ValidateCypher'>
<class '__main__.InformationCheck'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.ValidateCypher'>
<class '__main__.GenerateCypher'>
<class '__main__.GenerateFinalAnswer'>
<class '__main__.GenerateCypher'>
<class '__main__.GenerateFinalAnswer'>
<class '__main__.GenerateFinalAnswer'>
<class '__main__.ExecuteCypher'>
<class '__main__.CorrectCypher'>
joke_flow_recenst.html
