<a href="https://colab.research.google.com/github/RCGom/RCGom/blob/main/BusinessFlows_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install openai neo4j python-dotenv
%pip install neo4j # Ensure neo4j is installed in this environment
%pip install gradio

Collecting neo4j
  Downloading neo4j-5.28.2-py3-none-any.whl.metadata (5.9 kB)
Downloading neo4j-5.28.2-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.2/313.2 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: neo4j
Successfully installed neo4j-5.28.2


In [None]:
# main.py THIS is an RAG based on Scenario retrieval, the query adds all related processes
# pip install openai neo4j python-dotenv


import json
import os
import sys
import time
import logging
from pathlib import Path
from typing import Any, Dict, List

from neo4j import GraphDatabase
from openai import OpenAI
import subprocess

import time # Import time for a potential delay

import requests
import zipfile
import gradio as gr
from google.colab import userdata

In [None]:


OPENAI_API_KEY   = userdata.get("OPENAI_API_KEY")
NEO4J_URI        = userdata.get("NEO4J_URI")
NEO4J_USER       = userdata.get("NEO4J_USER")
NEO4J_PASSWORD   = userdata.get("NEO4J_PASSWORD")
NEO4J_DATABASE   = userdata.get("NEO4J_DATABASE")
NEO4J_INDEX      = userdata.get("NEO4J_INDEX")
CHAT_MODEL = "gpt-4o-mini"                  # reasoning + tool use
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")


In [None]:
# SYSTEM_PROMPT = (
#     "You are a process assistant helping users find relevant process definitions. "
#     "When the user asks a process-related question, call get_processes with a short description of the process"
#     "the function uses a deep search, not a similarity search."
#     "Perform several function calls, if you need different processes to answer the question."
#     "Use the returned results to answer and cite the process names you used. "
#     "If results are poor, say so and ask for clarification."
# )
SYSTEM_PROMPT = """

You are a process assistant. You trying to answer questions based on a process repository
which contains End-to-end Scenarios and Business Processes.
These can be queried by calling functions
- get_scenarios()
- get_processes()
The function calls provide name, description and related objects of suitable entries.

Scenarios have an n:m relation to Processes.

1. When the user asks a questions, first interpret the intent
   and map it to one or more formal names or descriptions.
   - Example: User says "indirect processing"
     → Rewrite as "Indirect Procurement Process"
   - Example: User says "goods receipt"
     → Rewrite as "Goods Receipt in Inventory Management"

2. Always call the functions - get_scenarios(), get_processes()
with the rewritten name(s), not the original vague query.

3. If no processes are found (empty results):
   - Retry with a broader or parent term (e.g., "procurement" if "indirect
     procurement" fails).
   - If still nothing is found, return the most general process in that
     domain (e.g., "Procurement Process") and tell the user results may not
     be exact.
 please
4. Use multiple function calls if several processes are needed.

5. Use only the returned results to answer and always cite the process names.

6. If results are poor, ask the user to clarify, especially to clarify if a scenario, process or SAP Solution
supports answering the question best.

"""


In [None]:
# -----------------------------
# Logging setup
# -----------------------------
logger = logging.getLogger("proc-assistant")
handler = logging.StreamHandler(sys.stdout)
fmt = logging.Formatter(
    "%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(fmt)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

In [None]:
# -----------------------------
# Clients
# -----------------------------
client = OpenAI(api_key=OPENAI_API_KEY)
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))


# -----------------------------
# Startup checks
# -----------------------------
def check_vector_index_exists(index_name: str) -> None:
    cypher = """
    SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, properties
    WHERE type = 'VECTOR' AND name = $name
    RETURN name, labelsOrTypes, properties
    """
    with driver.session(database=NEO4J_DATABASE) as sess:
        rec = sess.run(cypher, name=index_name).single()
        if not rec:
            logger.warning(
                "Vector index '%s' not found. "
                "Create it or set NEO4J_INDEX_NAME correctly.", index_name
            )
        else:
            logger.info("Using vector index '%s' on %s %s",
                        rec["name"], rec["labelsOrTypes"], rec["properties"])

check_vector_index_exists(NEO4J_INDEX)

2025-09-16 12:03:04 | INFO | proc-assistant | Using vector index 'ProcessDescriptions' on ['Process'] ['embedding']


INFO:proc-assistant:Using vector index 'ProcessDescriptions' on ['Process'] ['embedding']


In [None]:
# -----------------------------
# Embedding + search helpers
# -----------------------------
# was: EMBEDDING_MODEL = "text-embedding-3-large"  # 3072 dims
# EMBEDDING_MODEL = "text-embedding-3-small"         # 1536 dims


def embed(text: str) -> list[float]:
    logger.info("Embedding model in use: %s", EMBEDDING_MODEL)
    e = client.embeddings.create(model=EMBEDDING_MODEL, input=[text])
    vec = e.data[0].embedding
    logger.info("Embedding length: %d", len(vec))
    return vec


In [None]:
def deep_search_processes_with_scenarios(search_text: str, limit_processes: int = 6) -> list[dict]:
    """
    Text search over :Process (name, Description) and collect related :Scenario nodes.
    Returns:
    [
      { "process": {"name": str, "description": str},
        "scenarios": [{"name": str, "description": str}, ...] },
      ...
    ]
    """
    cypher = """
CALL db.index.fulltext.queryNodes('process_ft', $q) YIELD node AS p, score
WITH p, score
CALL (p) {
  MATCH (p)-[:PROCESS_OF]->(s:Scenario)
  RETURN collect({
    name: s.name,
    description: coalesce(s.Description, "")
  }) AS scenarios
}
RETURN {
  process:  { name: p.name, description: coalesce(p.Description, "") },
  scenarios: scenarios
} AS result
ORDER BY score DESC, p.name
LIMIT $limit
"""
    t0 = time.perf_counter()
    with driver.session(database=NEO4J_DATABASE) as sess:
        rs = sess.run(cypher, q=search_text, limit=limit_processes)
        rows = [r["result"] for r in rs]
    logger.info("Deep search returned %d row(s) in %.1f ms", len(rows), (time.perf_counter()-t0)*1000)
    for r in rows:
        logger.info("Process=%s | Scenarios=%d", r["process"]["name"], len(r["scenarios"]))
    return rows

In [None]:
def deep_search_scenarios_with_processes(search_text: str, limit_processes: int = 5) -> list[dict]:
    """
    Text search over :Scenario (name, Description) and collect related :Process nodes.
    Returns:
    [
      { "scenario": {"name": str, "description": str},
        "processes": [{"name": str, "description": str}, ...] },
      ...
    ]
    """
    cypher = """
CALL db.index.fulltext.queryNodes('scenario_ft', $q) YIELD node AS s, score
WITH s, score
CALL (s) {
  MATCH (p)-[:PROCESS_OF]->(s:Scenario)
  RETURN collect({
    name: p.name,
    description: coalesce(p.Description, "")
  }) AS processes
}
RETURN {
  scenario:  { name: s.name, description: coalesce(s.Description, "") },
  processes: processes
} AS result
ORDER BY score DESC, s.name
LIMIT $limit
"""
    t0 = time.perf_counter()
    with driver.session(database=NEO4J_DATABASE) as sess:
        rs = sess.run(cypher, q=search_text, limit=limit_processes)
        rows = [r["result"] for r in rs]
    logger.info("Deep search returned %d row(s) in %.1f ms", len(rows), (time.perf_counter()-t0)*1000)
    for r in rows:
        logger.info("Process=%s | Scenarios=%d", r["process"]["name"], len(r["scenarios"]))
    return rows

In [None]:

# -----------------------------
# Tool implementation / processes
# -----------------------------
def get_processes(query_text: str, k: int = 5) -> str:
    """
    Takes a natural-language query, finds up to k matching Processes via
    deep text search, and for each returns all connected Scenarios.

    Returns a JSON string:
    [
      {
        "process":  {"name": str, "description": str},
        "scenarios": [{"name": str, "description": str}, ...]
      },
      ...
    ]
    """
    logger.info("get_processes (Deep text search) query=%r | k=%d", query_text, k)

    # Run deep text search instead of vector similarity
    hits = deep_search_processes_with_scenarios(query_text, limit_processes=k)

    # Log concise summary for debugging
    logger.info("Matched processes: %s", [h["process"]["name"] for h in hits])

    # Return JSON for chatbot to use
    return json.dumps(hits, ensure_ascii=False)


In [None]:

# -----------------------------
# Tool implementation / scenarios
# -----------------------------
def get_scenarios(query_text: str, k: int = 5) -> str:
    """
    Takes a natural-language query, finds up to k matching Scenarios via
    deep text search, and for each returns all connected Processes.

    Returns a JSON string:
    [
      {
        "scenario":  {"name": str, "description": str},
        "processes": [{"name": str, "description": str}, ...]
      },
      ...
    ]
    """
    logger.info("get_scenarios (Deep text search) query=%r | k=%d", query_text, k)

    # Run deep text search instead of vector similarity
    hits = deep_search_scenarios_with_processes(query_text, limit_processes=k)

    # Log concise summary for debugging
    logger.info("Matched scenarios: %s", [h["process"]["name"] for h in hits])

    # Return JSON for chatbot to use
    return json.dumps(hits, ensure_ascii=False)

In [None]:
TOOLS = [{
    "type": "function",
    "function": {
        "name": "get_processes",
        "description": "Return the most similar process definitions by name and description, based on a text query.",
        "parameters": {
            "type": "object",
            "properties": {
                "query_text": {
                    "type": "string",
                    "description": "User's description of the process they need."
                }
            },
            "required": ["query_text"],
            "additionalProperties": False
        }
    }
},
         {
    "type": "function",
    "function": {
        "name": "get_scenarios",
        "description": "Return the most similar scenario definitions by name and description, based on a text query.",
        "parameters": {
            "type": "object",
            "properties": {
                "query_text": {
                    "type": "string",
                    "description": "User's description of the process they need."
                }
            },
            "required": ["query_text"],
            "additionalProperties": False
        }
    }
}]

In [None]:
# -----------------------------
# Chatbot assistant function
# -----------------------------
def assistant_reply(user_query: str, history: list[dict] = []) -> str:
    """
    Provides a conversational response to the user query using the OpenAI Chat API,
    optionally calling tools to get relevant information from Neo4j.

    Args:
        user_query: The user's question or request.
        history: List of previous message dictionaries in the conversation,
                 formatted as [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...].

    Returns:
        The assistant's reply as a string.
    """
    logger.info("assistant_reply received user_query: %r", user_query)
    # Log history carefully to avoid excessive output with large histories
    if len(history) < 20: # Log full history if relatively small
        logger.info("assistant_reply received history: %r", history)
    else: # Log summary if history is longer
        logger.info("assistant_reply received history length: %d", len(history))
        logger.info("assistant_reply first 10 history items: %r", history[:10])
        logger.info("assistant_reply last 10 history items: %r", history[-10:])


    # Construct the initial messages list including system prompt and the provided history
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    # Extend messages with the received history (already in the correct role/content dict format from chatbot_interface)
    messages.extend(history)
    # Append the current user query as the latest user message dictionary
    messages.append({"role": "user", "content": user_query})

    logger.info("Messages prepared for OpenAI API (first call): %r", messages)


    try:
        # First API call: Get assistant's response or tool call
        logger.info("Calling OpenAI chat completion (first call) with model: %s", CHAT_MODEL)
        response = client.chat.completions.create(
            model=CHAT_MODEL,
            messages=messages,
            tools=TOOLS,
            tool_choice="auto",
        )
        response_message = response.choices[0].message
        logger.info("OpenAI response (first call) message object: %s", response_message) # Log the full message object


        # Handle tool calls
        if response_message.tool_calls:
            logger.info("Tool calls detected: %s", response_message.tool_calls)
            available_functions = {
                "get_processes": get_processes,
                "get_scenarios": get_scenarios,
            }

            # Append the assistant's message with tool_calls to the messages list
            # This message object returned by client.chat.completions.create is usually directly compatible
            # with the 'messages' format for the subsequent API call in a tool-using turn.
            messages.append(response_message)
            logger.info("Messages after appending assistant tool_calls message: %r", messages)

            # Execute tool calls and append results
            for tool_call in response_message.tool_calls:
                function_name = tool_call.function.name
                # Ensure the function name exists in available_functions
                function_to_call = available_functions.get(function_name)
                if function_to_call:
                    try:
                        # Parse arguments from the tool call message
                        function_args = json.loads(tool_call.function.arguments)
                        logger.info("Parsed function arguments: %r", function_args)

                        # Call the tool function
                        logger.info("Calling tool function: %s with args: %r", function_name, function_args)
                        # Ensure tool functions return a string or JSON string as content
                        function_response_content = function_to_call(
                            query_text=function_args.get("query_text") # Assuming tool functions take query_text
                        )
                        logger.info("Tool function response content (first 500 chars): %s", function_response_content[:500])

                        # Append tool output message to the messages list in the correct format
                        # Role must be 'tool', content is the string output, and must include tool_call_id and name
                        messages.append(
                            {
                                "tool_call_id": tool_call.id, # Required for tool response messages
                                "role": "tool",
                                "content": function_response_content, # Content must be a string
                                "name": function_name # Required for tool response messages
                            }
                        )
                        logger.info("Appended tool output message: %r", messages[-1])

                    except json.JSONDecodeError:
                        logger.error("Error decoding function arguments JSON: %s", tool_call.function.arguments)
                        # Append an error message as tool output if args are invalid
                        messages.append(
                            {
                                "tool_call_id": tool_call.id,
                                "role": "tool",
                                "content": f"Error: Invalid JSON arguments for tool '{function_name}'.",
                                "name": function_name
                            }
                        )
                        logger.info("Appended tool invalid args error message: %r", messages[-1])

                    except Exception as e:
                        logger.error("Error executing tool '%s': %s", function_name, e)
                        logger.exception("Tool execution traceback:")
                        # Append an error message as tool output
                        messages.append(
                            {
                                "tool_call_id": tool_call.id,
                                "role": "tool",
                                "content": f"Error executing tool '{function_name}': {e}", # Content as string
                                "name": function_name
                            }
                        )
                        logger.info("Appended tool execution error message: %r", messages[-1])

                else:
                    logger.warning("Function '%s' called by model not found in available_functions", function_name)
                    # If the model hallucinates a tool call, append a tool message indicating it wasn't found.
                    messages.append(
                         {
                             "tool_call_id": tool_call.id,
                             "role": "tool",
                             "content": f"Error: Tool '{function_name}' not found.",
                             "name": function_name # Still include the name from the model's call
                         }
                    )
                    logger.info("Appended 'tool not found' message: %r", messages[-1])


            # Second API call: Get final response after tool execution
            logger.info("Calling OpenAI chat completion (second call after tools) with model: %s", CHAT_MODEL)
            logger.info("Messages prepared for OpenAI API (second call): %r", messages)

            second_response = client.chat.completions.create(
                model=CHAT_MODEL,
                messages=messages, # Send the updated messages list including assistant tool_calls and tool outputs
            )
            final_response_message = second_response.choices[0].message
            logger.info("OpenAI response (second call) message object: %s", final_response_message) # Log the full message object

            # The content of the second response is the final assistant reply
            # Ensure the final response content is a string
            final_response = final_response_message.content if final_response_message.content is not None else ""
            logger.info("Final assistant response content (after tool): %s", final_response)
            return final_response

        else:
            # No function call needed, return the initial response content
            logger.info("No tool calls detected. Returning initial response content.")
            # Ensure the initial response content is a string
            initial_response_content = response_message.content if response_message.content is not None else ""
            logger.info("Initial assistant response content: %s", initial_response_content)
            return initial_response_content

    except Exception as e:
        logger.exception("An error occurred during OpenAI chat completion:") # Log the full exception traceback
        # Return a user-friendly error message
        return f"An error occurred while processing your request: {e}" # Include error message for debugging

In [None]:


logger = logging.getLogger("proc-assistant") # Get the logger

def chatbot_interface(user_query: str, history: list[dict]) -> tuple[list[dict], list[dict]]:
    """
    Wrapper function for assistant_reply to be used with Gradio.
    Manages conversation history. Accepts and returns history in OpenAI messages format.
    """
    logger.info(f"chatbot_interface received history (dict format): {history}")
    logger.info(f"chatbot_interface received user_query: {user_query}")

    # The history is already in the correct format (list[dict]) due to gr.Chatbot type='messages'

    # Get the response from the assistant
    try:
        # Pass the history directly to assistant_reply (it expects list[dict])
        assistant_response_content = assistant_reply(user_query, history)
        logger.info(f"chatbot_interface received assistant_response_content: {assistant_response_content}")
        assistant_message = {"role": "assistant", "content": assistant_response_content}

    except Exception as e:
        logger.exception("Error calling assistant_reply from chatbot_interface:")
        assistant_response_content = "An error occurred while getting the assistant's response. Please check the logs for details." # Provide a fallback message
        assistant_message = {"role": "assistant", "content": assistant_response_content}


    # Append the new interaction (user query and assistant response) to the history
    # History is already in list[dict] format, so we append the new messages
    updated_history = history + [{"role": "user", "content": user_query}, assistant_message]

    logger.info(f"chatbot_interface updated history (dict format): {updated_history}")


    # Return the updated history for the Chatbot component and the state
    # Both need the history in list[dict] format now because gr.Chatbot has type='messages'
    return updated_history, updated_history

# Create the Gradio interface with history components
with gr.Blocks() as iface:
    # Use gr.Chatbot with type='messages' to handle history in OpenAI message format
    chatbot = gr.Chatbot(label="Process Assistant Chatbot", type='messages')
    msg = gr.Textbox(label="Ask a question about business processes.")
    clear = gr.ClearButton([msg, chatbot])
    # gr.State is used to maintain the history between interactions
    # Initialize state as an empty list for history in dict format
    state = gr.State([])

    # The submit method connects the input (msg, state) to the function (chatbot_interface)
    # and updates the chatbot and state with the returned history (now in dict format).
    msg.submit(chatbot_interface, inputs=[msg, state], outputs=[chatbot, state])


# Note: We won't launch the interface here as per instructions.
# The launching will be handled in a subsequent step or cell.

In [None]:
# Launch the Gradio interface
iface.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://96e723069c8c876022.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


