In [16]:
import os
from dotenv import load_dotenv, find_dotenv
import sys
sys.path.append('../')
sys.path.append('../weaviate_utils')


def load_env():
    _ = load_dotenv(find_dotenv())


def get_openai_api_key():
    load_env()
    openai_api_key = os.getenv("OPENAI_API_KEY")
    return openai_api_key

def get_langchain_api_key():
    load_env()
    langchain_api_key = os.getenv("LANGCHAIN_API_KEY")
    return langchain_api_key

In [17]:
import os
from uuid import uuid4

os.environ['OPENAI_API_KEY'] = get_openai_api_key()
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = f"AIE1 - LangGraph - {uuid4().hex[0:8]}"
os.environ["LANGCHAIN_API_KEY"] = get_langchain_api_key()

In [18]:
import weaviate

client = weaviate.Client(
    url="http://localhost:8080",
    timeout_config=(5, 300),
    additional_headers={
        "X-OpenAI-Api-Key": os.environ["OPENAI_API_KEY"]
    }
)

if client.is_ready():
    print("Weaviate is ready")
else:
    print("Weaviate is not ready")


            your code to use Python client v4 `weaviate.WeaviateClient` connections and methods.

            For Python Client v4 usage, see: https://weaviate.io/developers/weaviate/client-libraries/python
            For code migration, see: https://weaviate.io/developers/weaviate/client-libraries/python/v3_v4_migration
            


Weaviate is ready


In [19]:
from weaviate_interface import WeaviateInterface

weaviate_url = "http://localhost:8080"
openai_key = os.getenv("OPENAI_API_KEY")
schema_file = '../weaviate_utils/schema.json'

weaviate_interface = WeaviateInterface(weaviate_url, openai_key, schema_file)

In [20]:
async def initialize_weaviate():
    await weaviate_interface.async_init()
    print("Weaviate schema initialization complete")

await initialize_weaviate()

INFO:httpx:HTTP Request: GET http://localhost:8080/v1/schema "HTTP/1.1 200 OK"


Weaviate schema initialization complete


In [21]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredFileLoader, CSVLoader

def load_documents(file_path: str, file_type: str = None):
    if file_type == 'csv' and not file_path.endswith('.csv'):
        raise ValueError(f"Expected a CSV file, but got {file_path}")
    if file_path.endswith('.csv'):
        loader = CSVLoader(file_path)
    else:
        if file_type is None:
            loader = UnstructuredFileLoader(file_path)
        else:
            raise ValueError(f"Unsupported file type for {file_path}")
    return loader.load()

# Example file paths
file_paths = ["../files/Tenacious Talent - GenAI Upskilling.pdf"]
all_documents = []
for file_path in file_paths:
    documents = load_documents(file_path)
    all_documents.extend(documents)

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=1024, chunk_overlap=50)
chunked_documents = text_splitter.split_documents(all_documents)
print(f"Number of chunks: {len(chunked_documents)}")


Number of chunks: 2


In [22]:
print(f"Number of chunks: {len(chunked_documents)}")

Number of chunks: 2


In [23]:
async def add_documents_to_weaviate(client, documents, class_name="DocumentChunk", batch_size=70):
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        objects = [{"text": doc.page_content, "doc_name": "Example Document"} for doc in batch]
        await client.batch_create_objects(objects, class_name)
    print("Documents added to Weaviate")

await add_documents_to_weaviate(weaviate_interface.client, chunked_documents)

INFO:httpx:HTTP Request: POST http://localhost:8080/v1/batch/objects "HTTP/1.1 200 OK"


Documents added to Weaviate


In [24]:
# Use vectorstore similarity search for retrieval
from langchain.vectorstores import Weaviate
from langchain.schema import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.schema.runnable import RunnablePassthrough
from operator import itemgetter

vectorstore = Weaviate(client, "DocumentChunk", "text", attributes=["doc_name"])

RAG_PROMPT = """
Use the following context to answer the user's query. If you cannot answer the question, please respond with 'I don't know'. and Please highly consider the context when answering the question.

Question:
{question}

Context:
{context}
"""

rag_prompt = ChatPromptTemplate.from_template(RAG_PROMPT)
openai_chat_model = ChatOpenAI(model="gpt-3.5-turbo")

retrieval_augmented_generation_chain = (
    {"context": itemgetter("question"), "retrieved_docs": lambda d: vectorstore.similarity_search(d["question"], k=5), "question": itemgetter("question")}
    | RunnablePassthrough.assign(context=itemgetter("context"))
    | {"response": rag_prompt | openai_chat_model, "context": itemgetter("context"), "question": itemgetter("question")}
)

In [25]:
import logging
import asyncio
from tenacity import retry, wait_fixed, stop_after_attempt, wait_exponential
from langchain.schema import Document

# Set up logging
logging.basicConfig(level=logging.INFO)

# Enhanced error handling for similarity search
# Error handling for similarity search
@retry(wait=wait_exponential(min=1, max=10), stop=stop_after_attempt(5))
async def fetch_similar_docs(question: str):
    try:
        result = vectorstore.similarity_search(question, k=5)
        return result
    except ValueError as e:
        error_message = str(e)
        if "rate_limit_exceeded" in error_message:
            logging.error("Weaviate rate limit exceeded. Please try again later.")
            raise
        else:
            logging.error(f"Error during similarity search: {error_message}")
            raise

# Error handling for answer generation
@retry(wait=wait_fixed(30), stop=stop_after_attempt(10))
async def generate_answer(question: str, retrieved_docs: list):
    context = "\n\n".join(doc.page_content for doc in retrieved_docs)
    input_data = {
        "question": question,
        "context": context
    }
    try:
        result = await retrieval_augmented_generation_chain.ainvoke(input_data)
        return result
    except Exception as e:
        logging.error(f"Error during answer generation: {e}")
        raise

async def example_usage(question: str):
    try:
        logging.info("Retrieving documents...")
        retrieved_docs = await fetch_similar_docs(question)
        if not retrieved_docs or all(doc.page_content.strip() == "" for doc in retrieved_docs):
            logging.info("No sufficient context found in Weaviate, using DuckDuckGo search.")
            return None  # Indicate that no sufficient context was found
        logging.info("Retrieved documents:")
        for doc in retrieved_docs:
            logging.info(f"Document name: {doc.metadata['doc_name']}, Content snippet: {doc.page_content[:200]}")
        logging.info("Generating answer...")
        result = await generate_answer(question, retrieved_docs)
        logging.info(f"Generated response: {result}")
        return result
    except Exception as e:
        logging.error("An error occurred:", exc_info=True)
        return "An unexpected error occurred."

In [26]:
async def ask_question(question: str):
    answer = await example_usage(question)
    return answer

In [27]:
question1 = "What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?"
answer1 = await ask_question(question1)
print(answer1)

INFO:root:Retrieving documents...
INFO:root:Retrieved documents:
INFO:root:Document name: Example Document, Content snippet: Tenacious Talent - GenAI

UpSkilling

GEN-AI Challenge

TEAM-MATE:

Personalized Agent for Optimising Trainee Time and Focus

Business objective

TeamMate (not a real company, this is a scenario) is a
INFO:root:Document name: Example Document, Content snippet: Tenacious Talent - GenAI

UpSkilling

GEN-AI Challenge

TEAM-MATE:

Personalized Agent for Optimising Trainee Time and Focus

Business objective

TeamMate (not a real company, this is a scenario) is a
INFO:root:Document name: Example Document, Content snippet: Tenacious Talent - GenAI

UpSkilling

GEN-AI Challenge

TEAM-MATE:

Personalized Agent for Optimising Trainee Time and Focus

Business objective

TeamMate (not a real company, this is a scenario) is a
INFO:root:Document name: Example Document, Content snippet: Tenacious Talent - GenAI

UpSkilling

GEN-AI Challenge

TEAM-MATE:

Personalized Agent for O

{'response': AIMessage(content='The Tenacious Talent - GenAI UpSkilling GEN-AI Challenge is a competition or program focused on upskilling individuals in artificial intelligence (AI) through various challenges and activities. It likely involves developing AI solutions, enhancing skills in AI technologies, and promoting innovation in the field.', response_metadata={'token_usage': {'completion_tokens': 57, 'prompt_tokens': 82, 'total_tokens': 139}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-cf7d0155-24a4-4efa-82c9-ca9ac4b18629-0', usage_metadata={'input_tokens': 82, 'output_tokens': 57, 'total_tokens': 139}), 'context': 'What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?', 'question': 'What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?'}


In [36]:
import json
import logging
from typing import TypedDict, Annotated, Sequence
from langchain_community.tools.ddg_search import DuckDuckGoSearchRun
from langgraph.prebuilt import ToolExecutor
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_core.messages import BaseMessage, HumanMessage, FunctionMessage, AIMessage
from langgraph.prebuilt import ToolInvocation
from langgraph.graph import StateGraph, END
import operator

logging.basicConfig(level=logging.INFO)

tool_belt = [DuckDuckGoSearchRun()]
tool_executor = ToolExecutor(tool_belt)
model = ChatOpenAI(temperature=0)
functions = [convert_to_openai_function(t) for t in tool_belt]
model = model.bind_functions(functions)

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

def call_model(state):
    messages = state["messages"]
    logging.info(f"Calling model with messages: {messages}")
    response = model.invoke(messages)
    logging.info(f"Model response: {response}")
    if not response.content.strip():
        response.content = messages[-1].content
    return {"messages": [response]}

def call_tool(state):
    last_message = state["messages"][-1]
    logging.info(f"Invoking tool with message: {last_message}")
    action = ToolInvocation(
        tool=last_message.additional_kwargs["function_call"]["name"],
        tool_input=json.loads(
            last_message.additional_kwargs["function_call"]["arguments"]
        )
    )
    logging.info(f"Tool action: {action}")
    response = tool_executor.invoke(action)
    logging.info(f"Tool response: {response}")
    function_message = FunctionMessage(content=str(response), name=action.tool)
    return {"messages": [function_message]}

async def fetch_and_generate_answer(question: str):
    # Mock implementation of RAG functions
    retrieved_docs = await fetch_similar_docs(question)  # Replace with actual function
    if not retrieved_docs or all(doc.page_content.strip() == "" for doc in retrieved_docs):
        return None
    result = await generate_answer(question, retrieved_docs)  # Replace with actual function
    if result == "I don't know.":
        return None
    return result

async def call_rag(state):
    last_message = state["messages"][-1]
    question = last_message.content
    logging.info(f"Calling RAG with question: {question}")
    result = await fetch_and_generate_answer(question)
    if result is None:
        logging.info(f"Performing DuckDuckGo search for question: {question}")
        action = ToolInvocation(tool="DuckDuckGoSearchRun", tool_input={"query": question})
        response = tool_executor.invoke(action)
        function_message = FunctionMessage(content=str(response), name=action.tool)
        return {"messages": [function_message]}
    elif result == "I don't know.":
        return {"messages": [HumanMessage(content=result, type='human')], "end": END}
    else:
        logging.info(f"RAG result: {result}")
        content = result['response'].content
        return {"messages": [HumanMessage(content=content, type='human')]}

workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)
workflow.add_node("rag", call_rag)
workflow.set_entry_point("agent")

def should_use_tool_or_rag(state):
    last_message = state["messages"][-1]
    logging.info(f"Last message: {last_message.content}")
    if isinstance(last_message, AIMessage) and last_message.content.lower() == "i don't know.":
        logging.info("Returning 'tool'")
        return "tool"
    if isinstance(last_message, FunctionMessage) and last_message.name == "DuckDuckGoSearchRun":
        logging.info("Returning 'rag'")
        return "rag"
    return "rag"

workflow.add_conditional_edges(
    "agent",
    should_use_tool_or_rag,
    {
        "tool": "action",
        "rag": "rag",
        "end": END
    }
)

workflow.add_edge("action", "agent")
workflow.add_edge("rag", "agent")

app = workflow.compile()

In [37]:
async def ask_query(question):
    logging.info(f"Question: {question}")
    inputs = {"messages": [HumanMessage(content=question, type='human')]}
    while True:
        response = await app.ainvoke(inputs)
        if 'messages' not in response or not response['messages']:
            logging.error("No valid response received")
            break  # Break the loop if there is no valid response
        result_message = response['messages'][0]
        result_content = result_message.content
        if isinstance(result_content, str) and result_content.strip() and result_content != "I don't know.":
            return {"messages": [HumanMessage(content=result_content, type='human')]}
        elif 'end' in response and response['end'] == END:
            logging.info("Reached END state")
            return {"messages": [HumanMessage(content=result_content, type='human')]}
        elif isinstance(result_message, AIMessage) and result_content == "I don't know.":
            logging.info(f"Performing DuckDuckGo search for question: {question}")
            action = ToolInvocation(tool="DuckDuckGoSearchRun", tool_input={"query": question})
            response = tool_executor.invoke(action)
            function_message = FunctionMessage(content=str(response), name=action.tool)
            return {"messages": [function_message]}
        else:
            logging.info(f"Unexpected result type or no valid response: {type(result_content)}")
            break  # Break the loop once a response is generated


In [38]:
question1 = "What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?"
answer1 = await ask_query(question1)
logging.info(f"Answer: {answer1}")

INFO:root:Question: What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?
INFO:root:Calling model with messages: [HumanMessage(content='What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?')]


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:Model response: content='' additional_kwargs={'function_call': {'arguments': '{"query":"Tenacious Talent - GenAI UpSkilling GEN-AI Challenge"}', 'name': 'duckduckgo_search'}} response_metadata={'token_usage': {'completion_tokens': 29, 'prompt_tokens': 92, 'total_tokens': 121}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'function_call', 'logprobs': None} id='run-cb9c67ad-f912-45ce-9c0a-109a62e8b11e-0' usage_metadata={'input_tokens': 92, 'output_tokens': 29, 'total_tokens': 121}
INFO:root:Last message: What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?
INFO:root:Calling RAG with question: What is Tenacious Talent - GenAI UpSkilling GEN-AI Challenge?
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:root:RAG result: {'response': AIMessage(content='Tenacious Talent - GenAI UpSkilling GEN-AI Challenge i

CancelledError: 

In [None]:
question2 = "Can you tell me about Langsmith?"
answer2 = await ask_query(question2)
logging.info(f"Answer: {answer2}")

In [None]:
from langchain_community.tools.ddg_search import DuckDuckGoSearchRun

# Create an instance of the tool
tool = DuckDuckGoSearchRun()

# Use the tool to perform a search
result = tool._run("can you tell me about langsmith?")

# Print the result
print(result)