### Setup

In [1]:
import nest_asyncio
import os
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import Settings, SimpleDirectoryReader, StorageContext, SummaryIndex, VectorStoreIndex

nest_asyncio.apply()

os.environ["OPENAI_API_KEY"] = "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"
Settings.llm = OpenAI(model="gpt-3.5-turbo-1106", temperature=0.2)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small") # dimensions: 1536

### Load Data

load wiki information and datalineage information separately. (.md files)
Thus far, we merely load the databases using a markdown parser (already included in SimpleDirectoryReader)
In the future we may consider loading images and tables differently.

In [2]:
# load documents
documents_wiki = SimpleDirectoryReader("database_wiki").load_data()
documents_datalineage = SimpleDirectoryReader("database_datalineage").load_data()

# initialize settings (set chunk size)
Settings.chunk_size = 1024
nodes_wiki = Settings.node_parser.get_nodes_from_documents(documents_wiki)
nodes_datalineage = Settings.node_parser.get_nodes_from_documents(documents_datalineage)


### Create Pinecone Index

In [3]:
from pinecone import Pinecone, ServerlessSpec

# Initialize Pinecone
pc = Pinecone(api_key="0b599dcc-57d9-4d1a-8c46-d245c8d1a7ec")

# Create Index
wiki_index_name = "internalrag-mini-wiki"
datalineage_index_name = "internalrag-mini-datalineage"

'''
# Delete Index
if index_name in [index.name for index in pc.list_indexes()]:
    pc.delete_index(index_name)
    
# Create Index
pc.create_index(
    name=datalineage_index_name,
    dimension=1536, # Replace with your model dimensions
    metric="cosine", # Replace with your model metric
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1" # Note: free plan does not support indexes in the eu-west-1 region of aws
    ) 
)
'''

index = pc.Index(datalineage_index_name)
index.describe_index_stats()

  from tqdm.autonotebook import tqdm


{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 3}},
 'total_vector_count': 3}

In [4]:
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.pinecone import PineconeVectorStore
from IPython.display import Markdown, display

# initialize storage contexts (by default it's in-memory)
# Create Index
wiki_index = pc.Index(wiki_index_name)
datalineage_index = pc.Index(datalineage_index_name)

# Wiki storage
vector_store_wiki = PineconeVectorStore(pinecone_index=wiki_index)
storage_context_wiki = StorageContext.from_defaults(vector_store=vector_store_wiki)
storage_context_wiki.docstore.add_documents(nodes_wiki)

# Datalineage storage
vector_store_datalineage = PineconeVectorStore(pinecone_index=datalineage_index)
storage_context_datalineage = StorageContext.from_defaults(vector_store=vector_store_datalineage)
storage_context_datalineage.docstore.add_documents(nodes_datalineage)

In [5]:
# Look at document chunks created

for i in range(len(documents_wiki)):
    print(f"Doc chunk: {i}")
    filename = documents_wiki[i].metadata["file_name"]
    print(f"Doc id:  {filename}")
    print(documents_wiki[i].text)


Doc chunk: 0
Doc id:  DataCatalog.md


Curated Zone

Schema Name: emea_cz
Doc chunk: 1
Doc id:  DataCatalog.md


Dimensions

Last update: 2024.05.09
| Table Name | Associated Dimensions | Description |
|:--|:--|:--|
|dbid_billing_document|  |Contains descriptive information about billing documents, payment status payment terms, and detailing transaction dates. |
|dacc_account|  |Holds account information including account status, type and structure.|
|dcus_dm_80_customer_v|  |Contains customer information like demographies, geograhpies, industires and company wide classifications. |
|dmat_material_v|  |Contains material master information containing details on materials, such as descriptions, categories and weight information.|
|dpor_purchase_order|  |Contains Purchase Order information like receiving, preparation and shipping dates, vendor, shipper and receiver details as well as references to the content of the Purchase.|

Last update: 2024.05.10
Doc chunk: 2
Doc id:  DataCatalog.md


### Create Vector Indices

create 2 sepearte Vector indices for Wiki and Datalineage

In [27]:
'''
from llama_index.core import VectorStoreIndex

vector_index_wiki = VectorStoreIndex(nodes_wiki, storage_context=storage_context_wiki)
vector_index_datalineage = VectorStoreIndex(nodes_datalineage, storage_context=storage_context_datalineage)
'''

Upserted vectors: 100%|██████████| 33/33 [00:02<00:00, 16.41it/s]
Upserted vectors: 100%|██████████| 3/3 [00:01<00:00,  2.92it/s]


In [6]:
# Initialize Pinecone
pc = Pinecone(api_key="0b599dcc-57d9-4d1a-8c46-d245c8d1a7ec")

# Initialize your index 
wiki_index = pc.Index("internalrag-mini-wiki")
datalineage_index = pc.Index("internalrag-mini-datalineage")

# Initialize VectorStore
vector_store_wiki = PineconeVectorStore(pinecone_index=wiki_index)
vector_store_datalineage = PineconeVectorStore(pinecone_index=datalineage_index)

# Instantiate VectorStoreIndex object from your vector_store object
vector_index_wiki = VectorStoreIndex.from_vector_store(vector_store=vector_store_wiki)
vector_index_datalineage = VectorStoreIndex.from_vector_store(vector_store=vector_store_datalineage)

### React Pipeline

In [7]:
# Current Version

from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, StorageContext, load_index_from_storage, PromptTemplate
from llama_index.core.tools import QueryEngineTool, BaseTool, FunctionTool, ToolMetadata
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector, LLMMultiSelector
from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.llms import ChatMessage
from llama_index.core.selectors import PydanticMultiSelector, PydanticSingleSelector
import snowflake.connector as snowflake
from snowflake.connector import DictCursor
import pandas as pd
from dotenv import load_dotenv
from utils import make_sql_query
from llama_index.core.agent import AgentRunner
from llama_index.agent.openai import OpenAIAgentWorker, OpenAIAgent

# Loading Environment variables:
dotenv_path = 'KEYs.env'  
_ = load_dotenv(dotenv_path)

# Query Engines
vector_query_engine_wiki = vector_index_wiki.as_query_engine()
vector_query_engine_datalineage = vector_index_datalineage.as_query_engine()

# Create Query Engine Tool
vector_tool_wiki = QueryEngineTool.from_defaults(
    query_engine=vector_query_engine_wiki,
    description=(
        "Useful for retrieving specific context from wiki entries and naming conventions"
    ),
)

vector_tool_datalineage = QueryEngineTool.from_defaults(
    query_engine=vector_query_engine_datalineage,
    description=(
        "Useful only to answer questions regarding the source systems for specific tables (e.g. What are the source systems for table x, where does the information for table x come from, etc.)." 
        "the output of this query engine are NOT the final source systems, but only the correct Table Name(s) (i.e. LVL0_TargetName from the data catalogue). The correct LVL0_TargetName(s) by finding the most fitting table descriptions, based on the table name or description given in the user query."
    ),
)

# Define Router Query Engine
query_engine = RouterQueryEngine(
    selector=PydanticSingleSelector.from_defaults(),
    query_engine_tools=[
        vector_tool_wiki,
        vector_tool_datalineage,
    ],
    verbose=True
)       

# SET USER QUERY:
#user_query = "Explain the Micro Batch Loading Framework for the Curated Zone"
user_query = "What are the source systems flowing into the dashbaord of the Purchase Order"
#user_query = "What does RZ_SAPOE_MM and RZ_SAPOW_SD mean?"

response = query_engine.query(user_query)
print(str(response))

if response.metadata['selector_result'].selections[0].index == 1: # if Query Engine 1 is used (i.e. Data lineage) --> perform sql function call (always)
    
    # Create Function Tool
    sql_query_tool = FunctionTool.from_defaults(fn=make_sql_query)

    #llm = OpenAI(model="gpt-3.5-turbo-instruct", temperature=0) # try gpt-3.5-turbo-0613 as well
    llm = OpenAI(model="gpt-4-1106-preview", temperature=0) # try gpt-3.5-turbo-0613 as well
    #llm = OpenAI(model="gpt-4-1106-preview", temperature=0) # Model name gpt-3.5-turbo-instruct does not support function calling API. 

    agent = ReActAgent.from_tools([sql_query_tool], llm=llm, verbose=True)
    #agent = OpenAIAgent.from_tools([sql_query_tool], llm=llm, verbose=True)

    final_response_template = f"""Given these preliminary table names identified in the "Previous Reponse", perform make_sql_query and answer the question given as "Initial Question": 
                                ---
                                Initial Questions: {user_query}
                                Previous Response: {response}
                                ---
                                SQL TABLE LEGEND: 
                                LvL0_TargetName: Name of the target table in the Data Mart and Curated Zone
                                DV_Physical_Schema: Name of the Schema for the Vault and Standardized Zone
                                DV_Table_Name: Name of the Table or Vault object in the Vault and Standardized Zone
                                SRC_Physical_Schema: Name of the Schema for the Raw Zone table, containing the source systems
                                SRC_Table_Name: Name of the Table in the Raw Zone, containing the table name in the respective source systems

                                NOTE: If the Initial question is out of scope or has nothing to do with data lineage, output: "Question is out of chatbot scope".
                                """
    response_final = agent.chat(final_response_template)

[1;3;38;5;200mSelecting query engine 1: The question specifically asks for the source systems for a specific table, and the second choice is focused on answering questions about the source systems for specific tables..
[0mThe source systems flowing into the dashboard of the Purchase Order include the tables "dpor_purchase_order" and "fpor_dm_80_billed_purchase_order_all_v."
[1;3;38;5;200mThought: The current language of the user is English. I need to use a tool to help me answer the question about the source systems for the tables "dpor_purchase_order" and "fpor_dm_80_billed_purchase_order_all_v."
Action: make_sql_query
Action Input: {'table_names': ['dpor_purchase_order', 'fpor_dm_80_billed_purchase_order_all_v']}
[0m[1;3;34mObservation: {'dpor_purchase_order':         LVL0_TARGETNAME DV_PHYSICAL_SCHEMA  \
0   dpor_purchase_order    emea_obis_dv_fl   
1   dpor_purchase_order   emea_obis_ref_fl   
2   dpor_purchase_order    emea_obis_dv_fl   
3   dpor_purchase_order    emea_obis_d

### Router Query Engine Options
**Option 1: Pyndantic Selector**

The Pydantic selectors (currently only supported by gpt-4-0613 and gpt-3.5-turbo-0613 (the default)) use the OpenAI Function Call API to produce pydantic selection objects, rather than parsing raw JSON.

In [15]:
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector, LLMMultiSelector
from llama_index.core.selectors import (
    PydanticMultiSelector,
    PydanticSingleSelector,
)

query_engine = RouterQueryEngine(
    selector=PydanticSingleSelector.from_defaults(),
    query_engine_tools=[
        vector_tool_wiki,
        vector_tool_datalineage,
    ],
    verbose=True
)

In [16]:

response = query_engine.query("Explain the Micro Batch Loading Framework for the Curated Zone")
print(str(response))

[1;3;38;5;200mSelecting query engine 0: The question requires specific context from wiki entries and naming conventions, which aligns with the purpose of this choice..
[0mThe Micro Batch Loading Framework for the Curated Zone involves constantly adding new transactions to the Standardized Zone with a unique identifier and timestamp. The framework utilizes SQL queries to extract only the newer transactions based on the ascending timestamp. Replicated data may be slightly out-of-date compared to the Standardized Zone due to newly added transactions. Small batches are used to incrementally add transactions to the replicated table, resulting in a large amount of accumulated data over time. However, individual data loads only need to handle a small number of rows, making the incremental data extraction fast and enabling microbatching to closely approach real-time updates.


In [17]:

response = query_engine.query("What are the source systems flowing into the dashbaord of the Purchase Order")
print(str(response))

[1;3;38;5;200mSelecting query engine 1: The question specifically asks for the source systems for a specific table, which aligns with the functionality described in choice (2)..
[0mThe source systems flowing into the dashboard of the Purchase Order are the tables "dpor_purchase_order" and "fpor_dm_80_billed_purchase_order_all_v."


**Option 2: LLMSingleSelector**

The LLM selectors use the LLM to output a JSON that is parsed, and the corresponding indexes are queried.

In [88]:
query_engine = RouterQueryEngine(
    selector=LLMSingleSelector.from_defaults(),
    query_engine_tools=[
        vector_tool_wiki,
        vector_tool_datalineage,
    ],
    verbose=True
)

In [89]:
response = query_engine.query("Explain the Micro Batch Loading Framework for the Curated Zone")
print(str(response))

[1;3;38;5;200mSelecting query engine 0: Useful for retrieving specific context from wiki entries and naming conventions.
[0mThe Micro Batch Loading Framework for the Curated Zone involves constantly adding new transactions to the Standardized Zone with a unique identifier and timestamp. By leveraging the ascending nature of the transaction timestamps, it becomes easy to select only the newer transactions. This technique forms the basis of the SQL query used to extract new records from the Standardized Zone. The replicated data may be slightly out-of-date compared to the Standardized Zone due to the addition of new transactions, but small batches are continuously added to the replicated table, resulting in a small increment of transactions. This incremental approach allows for fast data extraction and enables microbatching to closely align with real-time updates.


In [90]:
response = query_engine.query("What are the source systems flowing into the dashbaord of the Purchase Order")
print(str(response))

[1;3;38;5;200mSelecting query engine 1: This tool should return the specific Table Name(s) by finding the most fitting table descriptions, based on the table name given in the user query..
[0mThe source systems flowing into the dashboard of the Purchase Order are the tables "dpor_purchase_order" and "fpor_dm_80_billed_purchase_order_all_v."


In [91]:
# [optional] look at selected results
print(str(response.metadata["selector_result"]))

selections=[SingleSelection(index=1, reason='This tool should return the specific Table Name(s) by finding the most fitting table descriptions, based on the table name given in the user query.')]


In [92]:
print(len(response.source_nodes))
print(response.source_nodes[0])
print(response.source_nodes[1])

2
Node ID: e8887394-77bc-450a-ab33-84524b3c8f32
Text: Dimensions  Last update: 2024.05.09 | Table Name | Associated
Dimensions | Description | |:--|:--|:--| |dbid_billing_document|
|Contains descriptive information about billing documents, payment
status payment terms, and detailing transaction dates. |
|dacc_account|  |Holds account information including account status,
type and structure.| |dcus...
Score:  0.395

Node ID: 0a9cf475-06a9-4781-aef1-7371b76ccf3d
Text: Fact | Table Name | Associated Dimensions | Description |
|:--|:--|:--| |fcur_currency_rate_hyperion|dcur_currency|Stores
currency exchange rates, and links the reference date, scenario, and
rate type.| |ffco_cooo_cooc|  |Contains details of cost center
operations, including cost allocations and related financials and
links the relevant customer...
Score:  0.373



### ReAct Agents - Alternative System Architecture Idea (doesnt work yet)

In [27]:
list_of_table_names = ['dacc_account', 'dbid_billing_document']
sql_result_dict = make_sql_query(list_of_table_names)
#sql_result_dict['dacc_account']
sql_result_dict['dbid_billing_document']

Unnamed: 0,LVL0_TARGETNAME,SRC_TABLE_NAME
0,dbid_billing_document,
1,dbid_billing_document,vbfa
2,dbid_billing_document,zcrm_doc_flow_h
3,dbid_billing_document,t052u
4,dbid_billing_document,dd07t
5,dbid_billing_document,2lis_13_vditm
6,dbid_billing_document,0fi_gl_14
7,dbid_billing_document,2lis_13_vditm
8,dbid_billing_document,2lis_13_vdhdr
9,dbid_billing_document,vbfa
