# Tokenflow: Running the RAI GRS Pipeline on Agents Data

### Pipeline Execution Methods
After installing the native app in your Snowflake account, you have three options to run the pipeline for your input documents:

1. **SQL Worksheet** – Execute pipeline steps directly from a new SQL worksheet in Snowsight. Copy the code from the file `unstructured-graphrag-SQL-workflow.sql` and paste it into a new SQL worksheet on Snowsight.
2. **Streamlit User Interface** - Run the pipeline through the application's UI by clicking on the **RAI GRS** app that you can find under the menu `Data Products` → `Apps` ([here](https://drive.google.com/drive/u/1/folders/1RCCpP5XSGh8LOi4x-MzpnOmfMBwQ-V8R) you can find some slides for the UI usage).
3. **Notebook-Based Pipeline Execution** - You can use this notebook, which leverages the Python Snowflake Connector, to execute the SQL queries one by one.


#### Notebook-Based Pipeline Execution
This notebook demonstrates running the pipeline by wrapping SQL statements into Python Snowflake Connector calls. The notebook involves:
- Configuration Loading: Read the configuration YAML file from disk.
- Pipeline Invocation: Execute corresponding pipeline steps using direct SQL calls throught the Python Snowflake Connector.
- Visualization: Generate and display the Knowledge Graph visualization.


**Note:** In this notebook, we demonstrate only the basic pipeline execution. However, the application also supports fine-tuning one of the available Cortex LLMs on your own documents. For the purposes of this demo, we use the default LLMs for both the extraction and question answering steps.

**Alternative Workflow:** If you have already run the pipeline through the UI, you can use this notebook by skipping the initial sections related to pipeline execution. Instead, go directly to the *'Load extracted graph data from Snowflake'* section. This allows you to proceed straight to the visualization phase.

In [None]:
import pandas as pd
import yaml
import json
from datetime import datetime, timezone
import gravis as gv
import networkx as nx
from snowflake.snowpark.context import get_active_session
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

In [None]:
session = get_active_session()
conn = snowflake.connector.connect(session=session)
cursor = conn.cursor()

## Create your project

In [None]:
project_id = input("Please give your project_id: ")  # e.g. tokenflow
project_name = project_id
project_comments = f'Project deployment for {project_id}'  # You could add custom comments

print(f'Project with id "{project_id}" has comments "{project_comments}".')

In [None]:
# If this is a new project, we need to insert it as a new record into the PROJECTS table.
# All changes made using the cursor are automatically saved to the SF cloud database.
# Define the SQL query with the NOT EXISTS check.
extraction_call = f"""
    INSERT INTO RAI_GRS.DATA.PROJECTS (ID, NAME, CONFIG, COMMENTS)
    SELECT %s, %s, %s, %s
    WHERE NOT EXISTS (
        SELECT 1 
        FROM RAI_GRS.DATA.PROJECTS 
        WHERE ID = %s
    )
"""

# Execute the SQL call, providing the values for the placeholders.
cursor.execute(extraction_call, (
    project_id,
    project_name,
    None, # We will use the default config and provide parameters in some of the functions.
    project_comments,
    project_id  # The ID is also used in the WHERE clause to check for existence.
))

# Fetch and print all rows returned by the Snowflake query executed via the cursor
results = cursor.fetchall() 
for row in results:
    print(row)

In [None]:
# Choose 'CORPUS' if your run the pipeline for the Tokenflow use case, or 'PDF' if you have uploaded files to the FILES stage.
data_source = input("Are the documents coming from PDF files in the stage, or will you upload them directly into the corpus table? Type 'pdf' or 'corpus': ")
data_source = data_source.lower()

In [None]:
llm_family = input("Do you want to use an LLM inside Snowflake or a model from the OpenAI API? Type 'openai' or 'snowflake': ")
llm_family = llm_family.lower()
if llm_family == 'openai':
    openai_api_key = input("Please input OpenAI API key: ")
else:
    openai_api_key = None

In [None]:
# Examples for OpenAI: "gpt-4.1" and "gpt-4o".
# Examples for Snowflake Cortex: "claude-3-5-sonnet" and "llama3.3-70b".
completion_model = input("Select the LLM: ")

In [None]:
# For OpenAI: "text-embedding-3-small" and "text-embedding-3-large".
# Examples for Snowflake Cortex: "snowflake-arctic-embed-l-v2.0" and "e5-base-v2".
embeddings_model = input("Select the embeddings: ")

In [None]:
# Here we provide default values for additional parameters, which can be customized based on the experiment.
is_fine_tuned_completion_model = False
summarization_context="chunk"

## Run the pipeline

#### 0. Corpus conversion
The very first step is to upload your documents to the `FILES` stage (under the schema `RAI_GRS.DATA`) in a new folder named after the `project_id`. Once uploaded, you can run the corpus conversion, which extracts text content from your documents.

There are two options available: standard conversion and visual parsing. Visual parsing leverages an LLM with vision capabilities to interpret and extract meaning from visual content in your documents, such as images and diagrams.

For this example, we already have a table with textual data, so there are no PDF documents to process through the corpus conversion endpoint. Instead, we take the relevant textual column from the input file, and format it to match the structure of the `CORPUS` table, as it would appear after running the conversion on PDF documents (i.e., with the same columns and structure). 

Once formatted, we upload this data to Snowflake so it is ready for the first step of our pipeline.

In [None]:
# Step 0
# -- Corpus conversion (e.g. PDF to MarkDown).


if data_source == 'pdf':
    # Form the SQL call for the execute_convert_corpus using the default parameters.
    extraction_call = f"""CALL RAI_GRS.app.execute_convert_corpus('{project_id}');"""
    
    # Or if the documents contain visual content, we could run this process (note that it may be costly due to extensive LLM calls): 
    # extraction_call = f"""CALL RAI_GRS.app.execute_llm_convert_corpus('{project_id}');"""
    
    # Execute the SQL call.
    cursor.execute(extraction_call)
    
    # See the results of the call.
    results = cursor.fetchall() 
    for row in results:
        print(row)
else:
    # Corpus convertion no needed here as we already have the raw text in a CSV file, so we will provide the corpus table.
    # But if we had uploaded some PDF documents in a folder, then we sould run the following statement:
    # We read the table of interest that has the textual data. 
    db_name = "TF_DB" 
    schema_name = "TF_SCHEMA" 
    data_table_name = "VIRTUALS_AGENTS"
    sql = f"SELECT * FROM {db_name}.{schema_name}.{data_table_name};"
    cursor.execute(sql)
    
    # Fetch the result set from the cursor and deliver it as the pandas DataFrame.
    virtual_agents_raw = cursor.fetch_pandas_all()
    # In this experiment, we extract the name, symbol, and description of the agents, and combine them into a single string.
    # This aggregated text will serve as the input document (one for each agent) for our algorithm.
    corpus = virtual_agents_raw[["NAME", "SYMBOL", "DESCRIPTION"]].copy()
    
    # Create the necessary columns to match the schema of the Snowflake corpus table.
    corpus["PROJECT_ID"] = project_id
    corpus["CHUNK_ID"] = corpus["NAME"].apply(lambda x: f"{project_id}/{x}")
    
    # We could have custom metadata for the different documents, or let the LLM generate some (e.g. short title) but for now we use the same 
    # metadata for all the records. Note that if the 
    now_utc = datetime.now(timezone.utc)
    formatted_time = now_utc.strftime("D:%Y%m%d%H%M%SZ")
    metadata_for_all_entries = {
      "creationDate": formatted_time,
      "subject": "Text with descriptions of the Tokenflow agents.",
      "source": data_table_name
    }
    corpus["METADATA"] = corpus.apply(lambda _: metadata_for_all_entries, axis=1)
    corpus["CONTENT"] = corpus.apply(lambda row: f"Agent with name {row['NAME']} has symbol: {row['SYMBOL']} and description: {row['DESCRIPTION']}.", axis=1)
    
    # The final table format is matches the CORPUS table schema on Snowflake.
    final_corpus_df = corpus[["PROJECT_ID", "CHUNK_ID", "CONTENT", "METADATA"]]
    # Now that we have ready the textual data into the corpus table, we upload it to SF instead of running the corpus conversion step.
    # Then we can run the GraphRAG Native App pipeline.
    
    # Upload corpus table to snowflake usint write_pandas from Snowflake API.
    success, nchunks, nrows, _ = write_pandas(conn=conn,
                                              df=final_corpus_df,
                                              database=f'RAI_GRS',
                                              schema='DATA',
                                              table_name='CORPUS')

#### 1. Entities and relations extraction 

After preparing the text documents, we can begin our pipeline with the first actual step of the knowledge graph (KG) construction. 
We are using the default prompt that comes with the installation of the app—it has been written to perform well across a variety of document types and domains. This process may take some time, depending on the size of the dataset.

In [None]:
# Step 1
# -- Entities / relations extraction.

# Form the SQL call.
extraction_call = f"""CALL RAI_GRS.app.execute_get_entities_relations('{project_id}',
                                                                                  '{llm_family}',
                                                                                  '{completion_model}',
                                                                                  '{is_fine_tuned_completion_model}',
                                                                                  '{openai_api_key}');"""
                                                                             

# Execute the SQL call.
cursor.execute(extraction_call)

# See the results of the call.
results = cursor.fetchall() 
for row in results:
    print(row)

#### 2. Community detection

After extracting entities and relations, the next step is community detection. Several algorithms are available for this task, with configurable parameters. For example, you can set a maximum community size to prevent the formation of overly large communities with too many nodes. 

In [None]:
# Step 2
# -- Community detection.

# Form the SQL call with some parameters.
params = {
    "algorithm": "leiden",
    "initial_membership": None,
    "weights": None,
    "n_iterations": 2,
    "max_comm_size": 0,
    "seed": None
}
append_flag = False
# Convert parameters dict to JSON string if needed
params_json = json.dumps(params)
extraction_call = f"""CALL RAI_GRS.app.execute_get_communities(
                                                                       '{project_id}',
                                                                        PARSE_JSON('{params_json}'),
                                                                        {str(append_flag).upper()}
                                                                    );"""

# Execute the SQL call.
cursor.execute(extraction_call)

# See the results of the call.
results = cursor.fetchall() 
for row in results:
    print(row)

#### 3. Graph indexing: summarization and embeddings

This step performs the following operations:

- Summarization of each community with LLM, capturing the context of the nodes it contains  
- Embedding generation for the `CORPUS` table, node and edge properties, and the community summaries

As with any LLM task of your pipeline, you can adjust the summarization prompt to guide the LLM on the desired level of abstraction and which details to include in the summaries. In this case, we use the default prompt. Note that this step is time-consuming, as it involves two separate processes with multiple AI model calls.

In [None]:
# Step 3
# -- Graph indexing: summarization and embeddings.

# Form the SQL call.
extraction_call = f"""CALL RAI_GRS.app.execute_get_embeddings('{project_id}',
                                                                          '{llm_family}',
                                                                          '{completion_model}',
                                                                          '{embeddings_model}',
                                                                          '{summarization_context}',
                                                                          '{is_fine_tuned_completion_model}',
                                                                          '{openai_api_key}');"""


# Execute the SQL call.
cursor.execute(extraction_call)

# See the results of the call.
results = cursor.fetchall() 
for row in results:
    print(row)

#### 4. Question Answering (QA)

After indexing completes, we are ready to use our app for question answering!

Note that the indexing phase occurs not only during embedding generation with your selected embedder but also within the Cortex Search service.

This provides two retrieval options:
- **Vector search**, which uses the embeddings generated in the previous step (step #3)  
- **Cortex Search**, the managed service provided by Snowflake Cortex, which performs hybrid retrieval and reranking behind the scenes

You can also select which sources to include in the retrieved results, since there are multiple content types to search: corpus items, community summaries, and verbalized properties of both nodes and edges. For now, we use the default behavior, which searches the most relevant across all available sources.

In [None]:
# Step 4

# Form the SQL call.
# question = "What is the meaning of the context?"
question = "What do you know about WAI Combinator?"

extraction_call = f"""CALL RAI_GRS.app.execute_get_answer('{project_id}', '{question}');"""

# Execute the SQL call.
cursor.execute(extraction_call)

# See the results of the call.
result = cursor.fetchone() 
answer = result[1]
context = json.loads(result[2])
print(f"Question: {question}")
print()
print(f"Answer: {answer}")
print()
print("-----------------------------------")
print(f"Context has {len(context)} items.")

## Visualization

#### Load extracted graph data from snowflake 

The pipeline run produces graph data such as nodes, edges, and their properties, and stores them in Snowflake tables. After running the pipeline, we can download this graph data for use in other applications or purposes. In this case, we get back the nodes and edges to create a simple visualization.

In [None]:
# We use the provided method from Snowflake Python connector to get the graph data:
# https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#fetch_pandas_all

cursor.execute(f"SELECT * FROM RAI_GRS.DATA.NODES WHERE PROJECT_ID='{project_id}'")
# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
nodes = cursor.fetch_pandas_all()
nodes.shape

In [None]:
cursor.execute(f"SELECT * FROM RAI_GRS.DATA.EDGES WHERE PROJECT_ID='{project_id}'")
edges = cursor.fetch_pandas_all()
edges.shape

In [None]:
conn.close()
cursor.close()

In [None]:
def get_node_style(node_type):
    """Get appropriate icon and color based on node type with grouped synonyms."""
    
    grouped_styles = {
        # You need to make sure every key in grouped_styles is a tuple, even if it only has one item. In Python, 
        # that means you must include a trailing comma e.g. ('employer',): ('👤💼', '#FFB6C1'),:
        ('ai_agent', 'agent', 'ai', 'ai_technology', 'ai_framework',): ('🤖', '#FFB6C1'),
        ('person', 'user', 'family_member',): ('🧑', '#DAA06D'), 
        ('platform', 'software', 'technology', 'feature',): ('🖥️', '#98FB98'),
        ('blockchain', 'cryptocurrency', 'trading_platform', 'token', 'blockchain_paradise', 'meme_coin',): ('₿', '#4682B4'),
        ('ai_agent_role',): ('🦾', '#FFD700'),
        ('financial_product', 'currency',): ('💰', '#FFA500'),
        ('company', 'organization',): ('🏢', '#90EE90'),
        ('document',): ('📄', '#ADD8E6'),
        ('legal_code', 'law', 'legislation', 'legal_document', 'legal_act'): ('⚖️', '#B0C4DE'), 
        ('country', 'place', 'ecosystem', 'location', 'territory',): ('🌍', '#DDA0DD'),
        ('certification',): ('🎖️', '#DC143C'),
        ('regulation', 'legal', 'protocol', 'algorithm',): ('📜', '#DDA0DD'),
        ('default',): ('📌', '#F0F0F0')
    }

    # Flatten into a usable dictionary
    styles = {
        synonym.lower(): style
        for keys, style in grouped_styles.items()
        for synonym in keys
    }

    return styles.get(node_type.lower(), styles['default'])

In [None]:
# Filter the nodes DataFrame to include only those nodes that are part of any edge (either as a source or destination in the edges DataFrame).

# Step 1: Get all node IDs involved in edges (both source and destination)
edge_node_ids = set(edges['SRC_NODE_ID']).union(set(edges['DST_NODE_ID']))

# Step 2: Filter the nodes DataFrame to keep only rows where ID is in edge_node_ids
filtered_nodes = nodes[nodes['ID'].isin(edge_node_ids)].copy()

In [None]:
G = nx.DiGraph()

for index, row in filtered_nodes.iterrows(): 
    node_id = row['ID']
    node_type = row['TYPE']
    entity_emoji, entity_color = get_node_style(node_type=row['TYPE']) 
    entity_label = f"{entity_emoji} {row['ID']}"
    G.add_node(node_id, 
               label=entity_label,
           type=node_type,
           color=entity_color,
           size=20)


for index, row in edges.iterrows():        
    G.add_edge(row['SRC_NODE_ID'], row['DST_NODE_ID'], label=row['TYPE'], size=1, length=500)

graph = gv.d3(
    G,
    node_label_data_source='label', 
    show_edge_label=True, 
    edge_label_data_source='label',  
    graph_height=500,   
    links_force_distance=120,     
    many_body_force_strength=-50,    
    edge_label_size_factor=1.2,
)

# Display the interactive graph
graph.display(inline=True)