# Import

In [1]:
import minsearch
import requests
import random
import os
import json
from mistralai import Mistral
from openai import OpenAI

# Part 0: Basic RAG

## Search

In [2]:
docs_url = 'https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json'
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

documents = []

for course in documents_raw:
    course_name = course['course']

    for doc in course['documents']:
        doc['course'] = course_name
        documents.append(doc)

In [3]:
from minsearch import AppendableIndex

index = AppendableIndex(
    text_fields=["question", "text", "section"],
    keyword_fields=["course"]
)

index.fit(documents)

<minsearch.append.AppendableIndex at 0x7825c023e480>

In [4]:
def search(query):
    boost = {'question': 3.0, 'section': 0.5}

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5,
        output_ids=True
    )

    return results

## Prompt

In [5]:
prompt_template = """
You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
Use only the facts from the CONTEXT when answering the QUESTION.

<QUESTION>
{question}
</QUESTION>

<CONTEXT>
{context}
</CONTEXT>
""".strip()

def build_prompt(query, search_results):
    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    
    prompt = prompt_template.format(question=query, context=context).strip()
    return prompt

## The RAG flow

In [6]:
api_key = os.environ["OPENAI_API_KEY"]
client = OpenAI(api_key=api_key)

def llm(prompt):
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

## Mistral
# api_key = os.environ["MISTRAL_API_KEY"]
# client = Mistral(api_key=api_key)

# def llm(prompt):
#     response = client.chat.complete(
#         model= "mistral-large-latest",
#         messages = [{"role": "user", "content": prompt}]
#     )    
#     return response.choices[0].message.content

def rag(query):
    search_results = search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt)
    return answer

# Part 1: Agentic RAG

In [27]:
# Making RAG more agentic

## Update Prompt

In [8]:
prompt_template = """
You're a course teaching assistant.

You're given a QUESTION from a course student and that you need to answer with your own knowledge and provided CONTEXT.
At the beginning the context is EMPTY.

<QUESTION>
{question}
</QUESTION>

<CONTEXT> 
{context}
</CONTEXT>

If CONTEXT is EMPTY, you can use our FAQ database.
In this case, use the following output template:

{{
"action": "SEARCH",
"reasoning": "<add your reasoning here>"
}}

If you can answer the QUESTION using CONTEXT, use this template:

{{
"action": "ANSWER",
"answer": "<your answer>",
"source": "CONTEXT"
}}

If the context doesn't contain the answer, use your own knowledge to answer the question

{{
"action": "ANSWER",
"answer": "<your answer>",
"source": "OWN_KNOWLEDGE"
}}
""".strip()

## Check

In [9]:
question = "how do I run docker on gentoo?"
context = "EMPTY"

prompt = prompt_template.format(question=question, context=context)
print(prompt)

answer = llm(prompt)
print(answer)

You're a course teaching assistant.

You're given a QUESTION from a course student and that you need to answer with your own knowledge and provided CONTEXT.
At the beginning the context is EMPTY.

<QUESTION>
how do I run docker on gentoo?
</QUESTION>

<CONTEXT> 
EMPTY
</CONTEXT>

If CONTEXT is EMPTY, you can use our FAQ database.
In this case, use the following output template:

{
"action": "SEARCH",
"reasoning": "<add your reasoning here>"
}

If you can answer the QUESTION using CONTEXT, use this template:

{
"action": "ANSWER",
"answer": "<your answer>",
"source": "CONTEXT"
}

If the context doesn't contain the answer, use your own knowledge to answer the question

{
"action": "ANSWER",
"answer": "<your answer>",
"source": "OWN_KNOWLEDGE"
}
{
"action": "ANSWER",
"answer": "To run Docker on Gentoo, you need to follow these general steps:\n\n1. **Install Docker**: You can install Docker using the Portage package manager. First, ensure your system is up-to-date:\n```\nsudo emerge --sync

In [10]:
question = "how do I join the course?"
context = "EMPTY"

prompt = prompt_template.format(question=question, context=context)
answer = llm(prompt)
print(answer)

{
"action": "SEARCH",
"reasoning": "The CONTEXT is EMPTY, so I need to search the FAQ database to find the information on how to join the course."
}


## Implement make the search

In [None]:
# Here, build_context is a helper function from the previous code

In [9]:
def build_context(search_results):
    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"

    return context.strip()

In [12]:
search_results = search(question)
context = build_context(search_results)
prompt = prompt_template.format(question=question, context=context)
print(prompt)

In [16]:
# Check once again
answer = llm(prompt)
print(answer)

{
"action": "ANSWER",
"answer": "To join the course, you need to register before it starts using this link. Additionally, join the course Telegram channel for announcements and register in DataTalks.Club's Slack to join the relevant channel.",
"source": "CONTEXT"
}


## Put this together

In [17]:
# First attempt to answer it with our know knowledge
# If needed, do the lookup and then answer
def agentic_rag_v1(question):
    context = "EMPTY"
    prompt = prompt_template.format(question=question, context=context)
    answer_json = llm(prompt)
    answer = json.loads(answer_json)
    print(answer)

    if answer['action'] == 'SEARCH':
        print('need to perform search...')
        search_results = search(question)
        context = build_context(search_results)
        
        prompt = prompt_template.format(question=question, context=context)
        answer_json = llm(prompt)
        answer = json.loads(answer_json)
        print(answer)

    return answer

## Test

In [26]:
# agentic_rag_v1('how do I join the course?')

In [25]:
# agentic_rag_v1('how patch KDE under FreeBSD?')

# Part 2: Agentic search

In [None]:
# We can let our "agent" formulate one or more search queries - and do it for a few iterations until we found an answer

## Update Prompt

In [11]:
prompt_template = """
You're a course teaching assistant.

You're given a QUESTION from a course student and that you need to answer with your own knowledge and provided CONTEXT.

The CONTEXT is build with the documents from our FAQ database.
SEARCH_QUERIES contains the queries that were used to retrieve the documents
from FAQ to and add them to the context.
PREVIOUS_ACTIONS contains the actions you already performed.

At the beginning the CONTEXT is empty.

You can perform the following actions:

- Search in the FAQ database to get more data for the CONTEXT
- Answer the question using the CONTEXT
- Answer the question using your own knowledge

For the SEARCH action, build search requests based on the CONTEXT and the QUESTION.
Carefully analyze the CONTEXT and generate the requests to deeply explore the topic. 

Don't use search queries used at the previous iterations.

Don't repeat previously performed actions.

Don't perform more than {max_iterations} iterations for a given student question.
The current iteration number: {iteration_number}. If we exceed the allowed number 
of iterations, give the best possible answer with the provided information.

Output templates:

If you want to perform search, use this template:

{{
"action": "SEARCH",
"reasoning": "<add your reasoning here>",
"keywords": ["search query 1", "search query 2", ...]
}}

If you can answer the QUESTION using CONTEXT, use this template:

{{
"action": "ANSWER_CONTEXT",
"answer": "<your answer>",
"source": "CONTEXT"
}}

If the context doesn't contain the answer, use your own knowledge to answer the question

{{
"action": "ANSWER",
"answer": "<your answer>",
"source": "OWN_KNOWLEDGE"
}}

<QUESTION>
{question}
</QUESTION>

<SEARCH_QUERIES>
{search_queries}
</SEARCH_QUERIES>

<CONTEXT> 
{context}
</CONTEXT>

<PREVIOUS_ACTIONS>
{previous_actions}
</PREVIOUS_ACTIONS>
""".strip()

## First iteration

In [None]:
# For the first iteration, we have this
question = "how do I join the course?"

search_queries = []
search_results = []
previous_actions = []
context = build_context(search_results)

prompt = prompt_template.format(
    question=question,
    context=context,
    search_queries="\n".join(search_queries),
    previous_actions='\n'.join([json.dumps(a) for a in previous_actions]),
    max_iterations=3,
    iteration_number=1
)
print(prompt)

In [None]:
answer_json = llm(prompt)
answer = json.loads(answer_json)
print(json.dumps(answer, indent=2))

# {
#   "action": "SEARCH",
#   "reasoning": "I need to find specific information on how to join the course, as this information is not present in the current CONTEXT.",
#   "keywords": [
#     "how to join the course",
#     "course enrollment process",
#     "register for the course"
#   ]
# }

## Save history

In [None]:
# We need to save the actions
previous_actions.append(answer)

In [None]:
# Save the search queries
keywords = answer['keywords']
search_queries.extend(keywords)

# And the search results
for k in keywords:
    res = search(k)
    search_results.extend(res)

In [None]:
# Some of the search results will be duplicates, so we need to remove them
def dedup(seq):
    seen = set()
    result = []
    for el in seq:
        _id = el['_id']
        if _id in seen:
            continue
        seen.add(_id)
        result.append(el)
    return result

search_results = dedup(search_results)

## Another iteration

In [None]:
# Now let's make another iteration - use the same code as previously, 
# but remove variable initialization and increase the iteration number

In [None]:
# question = "how do I join the course?"

# search_queries = []
# search_results = []
# previous_actions = []

context = build_context(search_results)

prompt = prompt_template.format(
    question=question,
    context=context,
    search_queries="\n".join(search_queries),
    previous_actions='\n'.join([json.dumps(a) for a in previous_actions]),
    max_iterations=3,
    iteration_number=2
)
print(prompt)

answer_json = llm(prompt)
answer = json.loads(answer_json)
print(json.dumps(answer, indent=2))

## Put everything together

In [13]:
def agentic_search(question):
    search_queries = []
    search_results = []
    previous_actions = []

    iteration = 0
    
    while True:
        print(f'ITERATION #{iteration}...')
    
        context = build_context(search_results)
        prompt = prompt_template.format(
            question=question,
            context=context,
            search_queries="\n".join(search_queries),
            previous_actions='\n'.join([json.dumps(a) for a in previous_actions]),
            max_iterations=3,
            iteration_number=iteration
        )
    
        print(prompt)
    
        answer_json = llm(prompt)
        answer = json.loads(answer_json)
        print(json.dumps(answer, indent=2))

        previous_actions.append(answer)
    
        action = answer['action']
        if action != 'SEARCH':
            break
    
        keywords = answer['keywords']
        search_queries = list(set(search_queries) | set(keywords))

        for k in keywords:
            res = search(k)
            search_results.extend(res)
    
        search_results = dedup(search_results)
        
        iteration = iteration + 1
        if iteration >= 4:
            break
    
        print()

    return answer

## Test

In [None]:
agentic_search('how do I prepare for the course?')

# Part 3: Function calling

In [None]:
# We put all this logic inside our prompt.
# But OpenAI and other providers provide a convenient API for adding extra functionality like search.
# https://platform.openai.com/docs/guides/function-calling
# It's called "function calling" - you define functions that the model can call, and if it decides to make a call, it returns structured output for that.

## Example

In [None]:
# For example, let's take our search function

In [14]:
def search(query):
    boost = {'question': 3.0, 'section': 0.5}

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5,
        output_ids=True
    )

    return results

## OpenAI

In [15]:
# We describe it like that for OpenAi
search_tool = {
    "type": "function",
    "name": "search",
    "description": "Search the FAQ database",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query text to look up in the course FAQ."
            }
        },
        "required": ["query"],
        "additionalProperties": False
    }
}

In [None]:
# Here we have:

# name: search
# description: when to use it
# parameters: all the arguments that the function can take and their description

In [18]:
# In order to use function calling, we'll use a newer API - the "responses" API (not "chat completions" as previously) for OpenAI
question = "How do I do well in module 1?"

developer_prompt = """
You're a course teaching assistant. 
You're given a question from a course student and your task is to answer it.
""".strip()

tools = [search_tool]

chat_messages = [
    {"role": "developer", "content": developer_prompt},
    {"role": "user", "content": question}
]

response = client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=tools
)
response.output

In [None]:
# [ResponseFunctionToolCall(arguments='{"query":"How to do well in module 1"}',
#                           call_id='call_AwYwOak5Ljeidh4HbE3RxMZJ', 
#                           name='search', 
#                           type='function_call', 
#                           id='fc_6848604db67881a298ec38121c1555ef0dee5fa0cdb59912', 
#                           status='completed')]

## Mistral

In [34]:
# # We describe it like that for Mistral
# search_tool_mistral = {
#     "type": "function",
#     "function": {
#         "name": "search",
#         "description": "Search the FAQ database",
#         "parameters": {
#             "type": "object",
#             "properties": {
#                 "query": {
#                     "type": "string",
#                     "description": "Search query text to look up in the course FAQ."
#                 }
#             },
#             "required": ["query"],
#             "additionalProperties": False
#         }
#     }
# }

In [1]:
# # In order to use function calling, we'll use a newer API - the "responses" API (not "chat completions" as previously) for Mistral
# question = "How do I do well in module 1?"

# developer_prompt = """
# You're a course teaching assistant. 
# You're given a question from a course student and your task is to answer it.
# """.strip()

# tools = [search_tool]

# chat_messages = [
#     {"role": "developer", "content": developer_prompt},
#     {"role": "user", "content": question}
# ]

# response = client.chat.complete(
#     model = model,
#     messages = chat_messages,
#     tools = search_tool_mistral,
#     tool_choice = "any",
#     parallel_tool_calls = False,
# )
# response.output

## Call to search

In [None]:
calls = response.output
call = calls[0]
call

call_id = call.call_id
call_id

f_name = call.name
f_name

arguments = json.loads(call.arguments)
arguments

In [None]:
# Using f_name we can find the function we need
f = globals()[f_name]

In [None]:
# And invoke it with the arguments
results = f(**arguments)

In [None]:
# Now let's save the results as json
search_results = json.dumps(results, indent=2)
print(search_results)

In [None]:
# And save both the response and the result of the function call
chat_messages.append(call)

chat_messages.append({
    "type": "function_call_output",
    "call_id": call.call_id,
    "output": search_results,
})

In [None]:
# Now chat_messages contains both the call description (so it keeps track of history) and the results
# Let's make another call to the model
response = client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=tools
)

In [None]:
# This time it should be the response (but also can be another call)
r = response.output[0]
print(r.content[0].text)

## Multiple calls

In [None]:
# What if we want to make multiple calls? Change the developer prompt a little
developer_prompt = """
You're a course teaching assistant. 
You're given a question from a course student and your task is to answer it.
If you look up something in FAQ, convert the student question into multiple queries.
""".strip()

chat_messages = [
    {"role": "developer", "content": developer_prompt},
    {"role": "user", "content": question}
]

response = client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=tools
)

In [None]:
# First, create a function do_call

def do_call(tool_call_response):
    function_name = tool_call_response.name
    arguments = json.loads(tool_call_response.arguments)

    f = globals()[function_name]
    result = f(**arguments)

    return {
        "type": "function_call_output",
        "call_id": tool_call_response.call_id,
        "output": json.dumps(result, indent=2),
    }

In [None]:
# Now iterate over responses

for entry in response.output:
    chat_messages.append(entry)
    print(entry.type)

    if entry.type == 'function_call':      
        result = do_call(entry)
        chat_messages.append(result)
    elif entry.type == 'message':
        print(entry.text)

In [None]:
# First call will probably be function call, so let's do another one

response = client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=tools
)

for entry in response.output:
    chat_messages.append(entry)
    print(entry.type)
    print()

    if entry.type == 'function_call':      
        result = do_call(entry)
        chat_messages.append(result)
    elif entry.type == 'message':
        print(entry.content[0].text) 
# This one is a text response

## Putting everything together

In [None]:
# But what if it's not?

# Let's make two loops:
# First is the main Q&A loop - ask question, get back the answer
# Second is the request loop - send requests until there's a message reply from the API

developer_prompt = """
You're a course teaching assistant. 
You're given a question from a course student and your task is to answer it.

Use FAQ if your own knowledge is not sufficient to answer the question.
When using FAQ, perform deep topic exploration: make one request to FAQ,
and then based on the results, make more requests.

At the end of each response, ask the user a follow up question based on your answer.
""".strip()

chat_messages = [
    {"role": "developer", "content": developer_prompt},
]

In [None]:
while True: # main Q&A loop
    question = input() # How do I do my best for module 1?
    if question == 'stop':
        break

    message = {"role": "user", "content": question}
    chat_messages.append(message)

    while True: # request-response loop - query API till get a message
        response = client.responses.create(
            model='gpt-4o-mini',
            input=chat_messages,
            tools=tools
        )

        has_messages = False
        
        for entry in response.output:
            chat_messages.append(entry)
        
            if entry.type == 'function_call':      
                print('function_call:', entry)
                print()
                result = do_call(entry)
                chat_messages.append(result)
            elif entry.type == 'message':
                print(entry.content[0].text)
                print()
                has_messages = True

        if has_messages:
            break

In [None]:
# Let's make it a bit nicer using HTML:

from IPython.display import display, HTML
import markdown # pip install markdown    

developer_prompt = """
You're a course teaching assistant. 
You're given a question from a course student and your task is to answer it.

Use FAQ if your own knowledge is not sufficient to answer the question.

At the end of each response, ask the user a follow up question based on your answer.
""".strip()

chat_messages = [
    {"role": "developer", "content": developer_prompt},
]

# Chat loop
while True:
    
    if question.strip().lower() == 'stop':
        print("Chat ended.")
        break
    print()

    message = {"role": "user", "content": question}
    chat_messages.append(message)

    while True:  # inner request loop
        response = client.responses.create(
            model='gpt-4o-mini',
            input=chat_messages,
            tools=tools
        )

        has_messages = False

        for entry in response.output:
            chat_messages.append(entry)

            if entry.type == "function_call":
                result = do_call(entry)
                chat_messages.append(result)
                display_function_call(entry, result)

            elif entry.type == "message":
                display_response(entry)
                has_messages = True

        if has_messages:
            break

## Using multiple tools

In [None]:
# What if we also want to use this chat app to add new entries to the FAQ? We'll need another function for it:

def add_entry(question, answer):
    doc = {
        'question': question,
        'text': answer,
        'section': 'user added',
        'course': 'data-engineering-zoomcamp'
    }
    index.append(doc)

In [None]:
# Description
add_entry_description = {
    "type": "function",
    "name": "add_entry",
    "description": "Add an entry to the FAQ database",
    "parameters": {
        "type": "object",
        "properties": {
            "question": {
                "type": "string",
                "description": "The question to be added to the FAQ database",
            },
            "answer": {
                "type": "string",
                "description": "The answer to the question",
            }
        },
        "required": ["question", "answer"],
        "additionalProperties": False
    }
}

In [None]:
# Let's use chat_assistant

import chat_assistant

tools = chat_assistant.Tools()
tools.add_tool(search, search_tool)

tools.get_tools()

developer_prompt = """
You're a course teaching assistant. 
You're given a question from a course student and your task is to answer it.

Use FAQ if your own knowledge is not sufficient to answer the question.

At the end of each response, ask the user a follow up question based on your answer.
""".strip()

chat_interface = chat_assistant.ChatInterface()

chat = chat_assistant.ChatAssistant(
    tools=tools,
    developer_prompt=developer_prompt,
    chat_interface=chat_interface,
    client=client
)

In [None]:
# And run it
chat.run()

In [None]:
# Now let's add the new tool
tools.add_tool(add_entry, add_entry_description)
tools.get_tools()

In [None]:
# And talk with the assistant:

# How do I do well for module 1?
# Add this back to FAQ
# And check that it's in the index:

index.docs[-1]

# Part 4: Using PydanticAI

In [None]:
pip install pydantic-ai

In [None]:
from pydantic_ai import Agent, RunContext

In [None]:
# create an agent
chat_agent = Agent(  
    'openai:gpt-4o-mini',
    system_prompt=developer_prompt
)

In [None]:
# Now we can use it to automate tool description:
from typing import Dict

@chat_agent.tool
def search_tool(ctx: RunContext, query: str) -> Dict[str, str]:
    """
    Search the FAQ for relevant entries matching the query.

    Parameters
    ----------
    query : str
        The search query string provided by the user.

    Returns
    -------
    list
        A list of search results (up to 5), each containing relevance information 
        and associated output IDs.
    """
    print(f"search('{query}')")
    return search(query)


@chat_agent.tool
def add_entry_tool(ctx: RunContext, question: str, answer: str) -> None:
    """
    Add a new question-answer entry to FAQ.

    This function creates a document with the given question and answer, 
    tagging it as user-added content.

    Parameters
    ----------
    question : str
        The question text to be added to the index.

    answer : str
        The answer or explanation corresponding to the question.

    Returns
    -------
    None
    """
    return add_entry(question, answer)

In [None]:
# It reads the functions' docstrings to automatically create function definition, so we don't need to worry about it
# Let's use it:

user_prompt = "I just discovered the course. Can I join now?"
agent_run = await chat_agent.run(user_prompt)
print(agent_run.output)