In [1]:
import io
import zipfile
import requests
import frontmatter

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filename = file_info.filename
        filename_lower = filename.lower()

        if not (filename_lower.endswith('.md') 
            or filename_lower.endswith('.mdx')):
            continue
    
        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                post = frontmatter.loads(content)
                data = post.to_dict()
                data['filename'] = filename
                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue
    
    zf.close()
    return repository_data



In [2]:
dtc_faq = read_repo_data('DataTalksClub', 'faq')
evidently_docs = read_repo_data('evidentlyai', 'docs')

print(f"FAQ documents: {len(dtc_faq)}")
print(f"Evidently documents: {len(evidently_docs)}")


FAQ documents: 1219
Evidently documents: 95


In [23]:
def sliding_window(seq, size, step):
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        chunk = seq[i:i+size]
        result.append({'start': i, 'chunk': chunk})
        if i + size >= n:
            break

    return result

In [24]:
evidently_chunks = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    chunks = sliding_window(doc_content, 2000, 1000)
    for chunk in chunks:
        chunk.update(doc_copy)
    evidently_chunks.extend(chunks)

In [21]:
import re
text = evidently_docs[45]['content']
paragraphs = re.split(r"\n\s*\n", text.strip())

In [25]:
import re

def split_markdown_by_level(text, level=2):
    """
    Split markdown text by a specific header level.
    
    :param text: Markdown text as a string
    :param level: Header level to split on
    :return: List of sections as strings
    """
    # This regex matches markdown headers
    # For level 2, it matches lines starting with "## "
    header_pattern = r'^(#{' + str(level) + r'} )(.+)$'
    pattern = re.compile(header_pattern, re.MULTILINE)

    # Split and keep the headers
    parts = pattern.split(text)
    
    sections = []
    for i in range(1, len(parts), 3):
        # We step by 3 because regex.split() with
        # capturing groups returns:
        # [before_match, group1, group2, after_match, ...]
        # here group1 is "## ", group2 is the header text
        header = parts[i] + parts[i+1]  # "## " + "Title"
        header = header.strip()

        # Get the content after this header
        content = ""
        if i+2 < len(parts):
            content = parts[i+2].strip()

        if content:
            section = f'{header}\n\n{content}'
        else:
            section = header
        sections.append(section)
    
    return sections

In [26]:
evidently_chunks = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    sections = split_markdown_by_level(doc_content, level=2)
    for section in sections:
        section_doc = doc_copy.copy()
        section_doc['section'] = section
        evidently_chunks.append(section_doc)

In [3]:
from openai import OpenAI

openai_client = OpenAI()


def llm(prompt, model='gpt-4o-mini'):
    messages = [
        {"role": "user", "content": prompt}
    ]

    response = openai_client.responses.create(
        model='gpt-4o-mini',
        input=messages
    )

    return response.output_text

In [4]:
prompt_template = """
Split the provided document into logical sections
that make sense for a Q&A system.

Each section should be self-contained and cover
a specific topic or concept.

<DOCUMENT>
{document}
</DOCUMENT>

Use this format:

## Section Name

Section content with all relevant details

---

## Another Section Name

Another section content

---
""".strip()

In [5]:
def intelligent_chunking(text):
    prompt = prompt_template.format(document=text)
    response = llm(prompt)
    sections = response.split('---')
    sections = [s.strip() for s in sections if s.strip()]
    return sections

In [6]:
from tqdm.auto import tqdm

evidently_chunks = []

for doc in tqdm(evidently_docs):
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')

    sections = intelligent_chunking(doc_content)
    for section in sections:
        section_doc = doc_copy.copy()
        section_doc['section'] = section
        evidently_chunks.append(section_doc)

  0%|          | 0/95 [00:00<?, ?it/s]

In [7]:
from minsearch import Index

index = Index(
    text_fields=["chunk", "title", "description", "filename"],
    keyword_fields=[]
)

index.fit(evidently_chunks)


<minsearch.minsearch.Index at 0x11e113250>

In [8]:
query = 'What should be in a test dataset for AI evaluation?'
results = index.search(query)


In [9]:
print(results)

[{'title': 'RAG evaluation dataset', 'description': 'Synthetic data for RAG.', 'filename': 'docs-main/synthetic-data/rag_data.mdx', 'section': '## Creating a RAG Test Dataset\n\nYou can generate a ground truth RAG dataset from your data source.\n\n### Steps to Create a RAG Test Dataset\n\n1. **Create a Project**\n   - In the Evidently UI, start a new Project or open an existing one.\n   - Navigate to “Datasets” in the left menu.\n   - Click “Generate” and select the “RAG” option.\n   ![](/images/synthetic/synthetic_data_select_method.png)\n\n2. **Upload Your Knowledge Base**\n   - Select a file containing the information your AI system retrieves from. Supported formats: Markdown (.md), CSV, TXT, PDFs.\n   - Choose how many inputs to generate.\n   ![](/images/synthetic/synthetic_data_inputs_example_upload.png)\n   - Simply drop the file, then:\n     - Choose the number of inputs to generate.\n     - Decide if you want to include the context used to generate the answer.\n   ![](/images/s

In [10]:
dtc_faq = read_repo_data('DataTalksClub', 'faq')

de_dtc_faq = [d for d in dtc_faq if 'data-engineering' in d['filename']]

faq_index = Index(
    text_fields=["question", "content"],
    keyword_fields=[]
)

faq_index.fit(de_dtc_faq)


<minsearch.minsearch.Index at 0x106b32c90>

In [12]:
faq_index

<minsearch.minsearch.Index at 0x106b32c90>

In [13]:
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('multi-qa-distilbert-cos-v1')


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/523 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/265M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/333 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [14]:
record = de_dtc_faq[2]
text = record['question'] + ' ' + record['content']
v_doc = embedding_model.encode(text)


In [15]:
query = 'I just found out about the course. Can I enroll now?'
v_query = embedding_model.encode(query)


In [16]:
similarity = v_query.dot(v_doc)

In [17]:
from tqdm.auto import tqdm
import numpy as np

faq_embeddings = []

for d in tqdm(de_dtc_faq):
    text = d['question'] + ' ' + d['content']
    v = embedding_model.encode(text)
    faq_embeddings.append(v)

faq_embeddings = np.array(faq_embeddings)


  0%|          | 0/449 [00:00<?, ?it/s]

In [18]:
from minsearch import VectorSearch

faq_vindex = VectorSearch()
faq_vindex.fit(faq_embeddings, de_dtc_faq)


<minsearch.vector.VectorSearch at 0x167e7e2d0>

In [19]:
query = 'Can I join the course now?'
q = embedding_model.encode(query)
results = faq_vindex.search(q)


In [20]:
results

[{'id': '3f1424af17',
  'question': 'Course: Can I still join the course after the start date?',
  'sort_order': 3,
  'content': "Yes, even if you don't register, you're still eligible to submit the homework.\n\nBe aware, however, that there will be deadlines for turning in homeworks and the final projects. So don't leave everything for the last minute.",
  'filename': 'faq-main/_questions/data-engineering-zoomcamp/general/003_3f1424af17_course-can-i-still-join-the-course-after-the-start.md'},
 {'id': '068529125b',
  'question': 'Course - Can I follow the course after it finishes?',
  'sort_order': 8,
  'content': 'Yes, we will keep all the materials available, so you can follow the course at your own pace after it finishes.\n\nYou can also continue reviewing the homeworks and prepare for the next cohort. You can also start working on your final capstone project.',
  'filename': 'faq-main/_questions/data-engineering-zoomcamp/general/008_068529125b_course-can-i-follow-the-course-after-i

In [23]:
query = 'Can I join the course now?'

text_results = faq_index.search(query, num_results=5)

q = embedding_model.encode(query)
vector_results = faq_vindex.search(q, num_results=5)

final_results = text_results + vector_results


In [24]:
final_results

[{'id': '3f1424af17',
  'question': 'Course: Can I still join the course after the start date?',
  'sort_order': 3,
  'content': "Yes, even if you don't register, you're still eligible to submit the homework.\n\nBe aware, however, that there will be deadlines for turning in homeworks and the final projects. So don't leave everything for the last minute.",
  'filename': 'faq-main/_questions/data-engineering-zoomcamp/general/003_3f1424af17_course-can-i-still-join-the-course-after-the-start.md'},
 {'id': '9e508f2212',
  'question': 'Course: When does the course start?',
  'sort_order': 1,
  'content': "The next cohort starts January 13th, 2025. More info at [DTC](https://datatalks.club/blog/guide-to-free-online-courses-at-datatalks-club.html).\n\n- Register before the course starts using this [link](https://airtable.com/shr6oVXeQvSI5HuWD).\n- Join the [course Telegram channel with announcements](https://t.me/dezoomcamp).\n- Don’t forget to register in DataTalks.Club's Slack and join the c

In [25]:
def text_search(query):
    return faq_index.search(query, num_results=5)

def vector_search(query):
    q = embedding_model.encode(query)
    return faq_vindex.search(q, num_results=5)

def hybrid_search(query):
    text_results = text_search(query)
    vector_results = vector_search(query)
    
    # Combine and deduplicate results
    seen_ids = set()
    combined_results = []

    for result in text_results + vector_results:
        if result['filename'] not in seen_ids:
            seen_ids.add(result['filename'])
            combined_results.append(result)
    
    return combined_results


In [26]:
import openai

openai_client = openai.OpenAI()

user_prompt = "I just discovered the course, can I join now?"

chat_messages = [
    {"role": "user", "content": user_prompt}
]

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
)

print(response.output_text)

To determine if you can join the course now, it’s best to check the course details or the official website for enrollment deadlines. If the course is still open for registration, you should be able to sign up. If not, you might want to contact the course administrators for any possible late enrollment options.


In [27]:
def text_search(query):
    return faq_index.search(query, num_results=5)

In [28]:
text_search_tool = {
    "type": "function",
    "name": "text_search",
    "description": "Search the FAQ database",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query text to look up in the course FAQ."
            }
        },
        "required": ["query"],
        "additionalProperties": False
    }
}


In [29]:
system_prompt = """
You are a helpful assistant for a course. 
"""

question = "I just discovered the course, can I join now?"

chat_messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": question}
]

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=[text_search_tool]
)


In [31]:
print(response)

Response(id='resp_68daff40e6788195a301f9a2ca3d250f0b37f1ce7414cab0', created_at=1759182657.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-mini-2024-07-18', object='response', output=[ResponseFunctionToolCall(arguments='{"query":"can I join the course late"}', call_id='call_oDTr9KYeah4jmWcYFGdrEF7Q', name='text_search', type='function_call', id='fc_68daff41e63c8195a5c81cdfb118677b0b37f1ce7414cab0', status='completed')], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[FunctionTool(name='text_search', parameters={'type': 'object', 'properties': {'query': {'type': 'string', 'description': 'Search query text to look up in the course FAQ.'}}, 'required': ['query'], 'additionalProperties': False}, strict=True, type='function', description='Search the FAQ database')], top_p=1.0, background=False, conversation=None, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Re

In [32]:
import json

call = response.output[0]

arguments = json.loads(call.arguments)
result = text_search(**arguments)

call_output = {
    "type": "function_call_output",
    "call_id": call.call_id,
    "output": json.dumps(result),
}


In [33]:
call_output

{'type': 'function_call_output',
 'call_id': 'call_oDTr9KYeah4jmWcYFGdrEF7Q',
 'output': '[{"id": "3f1424af17", "question": "Course: Can I still join the course after the start date?", "sort_order": 3, "content": "Yes, even if you don\'t register, you\'re still eligible to submit the homework.\\n\\nBe aware, however, that there will be deadlines for turning in homeworks and the final projects. So don\'t leave everything for the last minute.", "filename": "faq-main/_questions/data-engineering-zoomcamp/general/003_3f1424af17_course-can-i-still-join-the-course-after-the-start.md"}, {"id": "9e508f2212", "question": "Course: When does the course start?", "sort_order": 1, "content": "The next cohort starts January 13th, 2025. More info at [DTC](https://datatalks.club/blog/guide-to-free-online-courses-at-datatalks-club.html).\\n\\n- Register before the course starts using this [link](https://airtable.com/shr6oVXeQvSI5HuWD).\\n- Join the [course Telegram channel with announcements](https://t.m

In [34]:
chat_messages.append(call)
chat_messages.append(call_output)

response = openai_client.responses.create(
    model='gpt-4o-mini',
    input=chat_messages,
    tools=[text_search_tool]
)

print(response.output_text)

Yes, you can still join the course even if it has already started. While you don't need to register to submit homework, keep in mind that there are deadlines for turning in assignments and final projects. 

If you want to participate, just be aware of those deadlines and try not to leave everything until the last minute!


In [35]:
from typing import List, Any

def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results returned by the FAQ index.
    """
    return faq_index.search(query, num_results=5)

In [36]:
from pydantic_ai import Agent

agent = Agent(
    name="faq_agent",
    instructions=system_prompt,
    tools=[text_search],
    model='gpt-4o-mini'
)


In [37]:
question = "I just discovered the course, can I join now?"

result = await agent.run(user_prompt=question)

In [38]:
result

AgentRunResult(output="Yes, you can still join the course even after the start date. Here are some details:\n\n- **Homework Submission**: You are eligible to submit homework even if you don't register.\n- **Deadlines**: Be aware that there are deadlines for submitting homework and final projects, so try not to leave everything until the last minute.\n\nAdditionally, after the course finishes, all materials will still be available for you to follow at your own pace. If you're interested, you can register before the next cohort starts on **January 13th, 2025** using the provided [registration link](https://airtable.com/shr6oVXeQvSI5HuWD). \n\nFeel free to ask if you have more questions!")

In [40]:
result.new_messages()

[ModelRequest(parts=[UserPromptPart(content='I just discovered the course, can I join now?', timestamp=datetime.datetime(2025, 9, 29, 21, 58, 58, 924733, tzinfo=datetime.timezone.utc))], instructions='You are a helpful assistant for a course.'),
 ModelResponse(parts=[ToolCallPart(tool_name='text_search', args='{"query":"join course"}', tool_call_id='call_dblQdeL5538IjgmPzKkLpj0a')], usage=RequestUsage(input_tokens=111, output_tokens=15, details={'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}), model_name='gpt-4o-mini-2024-07-18', timestamp=datetime.datetime(2025, 9, 29, 21, 59, tzinfo=TzInfo(UTC)), provider_name='openai', provider_details={'finish_reason': 'tool_calls'}, provider_response_id='chatcmpl-CLGBYMipDg9AmrBS0E8WiBmVcsiLp', finish_reason='tool_call'),
 ModelRequest(parts=[ToolReturnPart(tool_name='text_search', content=[{'id': '9e508f2212', 'question': 'Course: When does the course start?', 'sort_order': 1, 'content'

In [41]:
from typing import List, Any
from pydantic_ai import Agent


def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results returned by the FAQ index.
    """
    return faq_index.search(query, num_results=5)


system_prompt = """
You are a helpful assistant for a  course. 

Use the search tool to find relevant information from the course materials before answering questions.

If you can find specific information through search, use it to provide accurate answers.
If the search doesn't return relevant results, let the user know and provide general guidance.
"""

from pydantic_ai import Agent

agent = Agent(
    name="faq_agent",
    instructions=system_prompt,
    tools=[text_search],
    model='gpt-4o-mini'
)


In [42]:
question = "how do I install Kafka in Python?"
result = await agent.run(user_prompt=question)

In [43]:
result

AgentRunResult(output='To install Kafka in Python, you can use the following dependencies:\n\n1. **For Confluent Kafka:**\n   - Using pip:\n     ```bash\n     pip install confluent-kafka\n     ```\n   - Using conda:\n     ```bash\n     conda install conda-forge::python-confluent-kafka\n     ```\n\n2. **For Fastavro (important for Avro serialization):**\n   ```bash\n   pip install fastavro\n   ```\n\n3. **For Kafka-Python (if you prefer this library):**\n   - If you experience any issues, you might want to uninstall the existing `kafka-python` package and install a specific version:\n     ```bash\n     pip uninstall kafka-python\n     pip install kafka-python==1.4.6\n     ```\n   - Alternatively, if you encounter a `ModuleNotFoundError`, you can use:\n     ```bash\n     pip install kafka-python-ng\n     ```\n\nThese commands will set up your Python environment to work with Kafka successfully.')

In [44]:
from pydantic_ai.messages import ModelMessagesTypeAdapter


def log_entry(agent, messages, source="user"):
    tools = []

    for ts in agent.toolsets:
        tools.extend(ts.tools.keys())

    dict_messages = ModelMessagesTypeAdapter.dump_python(messages)

    return {
        "agent_name": agent.name,
        "system_prompt": agent._instructions,
        "provider": agent.model.system,
        "model": agent.model.model_name,
        "tools": tools,
        "messages": dict_messages,
        "source": source
    }

In [47]:
import json
import secrets
from pathlib import Path
from datetime import datetime

LOG_DIR = Path('logs')
LOG_DIR.mkdir(exist_ok=True)

def serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")


def log_interaction_to_file(agent, messages, source='user'):
    entry = log_entry(agent, messages, source)

    ts = entry['messages'][-1]['timestamp']
    ts_str = ts.strftime("%Y%m%d_%H%M%S")
    rand_hex = secrets.token_hex(3)

    filename = f"{agent.name}_{ts_str}_{rand_hex}.json"
    filepath = LOG_DIR / filename

    with filepath.open("w", encoding="utf-8") as f_out:
        json.dump(entry, f_out, indent=2, default=serializer)

    return filepath

In [48]:
question = input()
result = await agent.run(user_prompt=question)
print(result.output)
log_interaction_to_file(agent, result.new_messages())

 how do I install Kafka in Python?


To install Kafka for use in Python, you can use the following dependencies based on the search results:

1. **Install `confluent-kafka`:**
   - Using pip:
     ```bash
     pip install confluent-kafka
     ```
   - Using conda:
     ```bash
     conda install conda-forge::python-confluent-kafka
     ```

2. **Install `fastavro`:**
   ```bash
   pip install fastavro
   ```

3. If you encounter issues with the `kafka-python` package, you might need to uninstall it and install a specific version. Here's how to do that:
   - Uninstall the current package:
     ```bash
     pip uninstall kafka-python
     ```
   - Install a specific version (for example, 1.4.6):
     ```bash
     pip install kafka-python==1.4.6
     ```

4. Alternatively, if you face issues related to certain dependencies, you can use a fork of the package:
   ```bash
   pip install kafka-python-ng
   ```

This should set you up to use Kafka in your Python applications. If you need further assistance or have specific issues

PosixPath('logs/faq_agent_20250929_220307_26b8e7.json')

In [49]:
system_prompt = """
You are a helpful assistant for a course.  

Use the search tool to find relevant information from the course materials before answering questions.  

If you can find specific information through search, use it to provide accurate answers.

Always include references by citing the filename of the source material you used.  
When citing the reference, replace "faq-main" by the full path to the GitHub repository: "https://github.com/DataTalksClub/faq/blob/main/"
Format: [LINK TITLE](FULL_GITHUB_LINK)

If the search doesn't return relevant results, let the user know and provide general guidance.  
""".strip()

# Create another version of agent, let's call it faq_agent_v2
agent = Agent(
    name="faq_agent_v2",
    instructions=system_prompt,
    tools=[text_search],
    model='gpt-4o-mini'
)


In [50]:
evaluation_prompt = """
Use this checklist to evaluate the quality of an AI agent's answer (<ANSWER>) to a user question (<QUESTION>).
We also include the entire log (<LOG>) for analysis.

For each item, check if the condition is met. 

Checklist:

- instructions_follow: The agent followed the user's instructions (in <INSTRUCTIONS>)
- instructions_avoid: The agent avoided doing things it was told not to do  
- answer_relevant: The response directly addresses the user's question  
- answer_clear: The answer is clear and correct  
- answer_citations: The response includes proper citations or sources when required  
- completeness: The response is complete and covers all key aspects of the request
- tool_call_search: Is the search tool invoked? 

Output true/false for each check and provide a short explanation for your judgment.
""".strip()

In [51]:
from pydantic import BaseModel

class EvaluationCheck(BaseModel):
    check_name: str
    justification: str
    check_pass: bool

class EvaluationChecklist(BaseModel):
    checklist: list[EvaluationCheck]
    summary: str

In [52]:
eval_agent = Agent(
    name='eval_agent',
    model='gpt-5-nano',
    instructions=evaluation_prompt,
    output_type=EvaluationChecklist
)


In [53]:
user_prompt_format = """
<INSTRUCTIONS>{instructions}</INSTRUCTIONS>
<QUESTION>{question}</QUESTION>
<ANSWER>{answer}</ANSWER>
<LOG>{log}</LOG>
""".strip()

In [54]:
def load_log_file(log_file):
    with open(log_file, 'r') as f_in:
        log_data = json.load(f_in)
        log_data['log_file'] = log_file
        return log_data

In [56]:
log_record = load_log_file('./logs/faq_agent_20250929_220226_a0adab.json')

instructions = log_record['system_prompt']
question = log_record['messages'][0]['parts'][0]['content']
answer = log_record['messages'][-1]['parts'][0]['content']
log = json.dumps(log_record['messages'])

user_prompt = user_prompt_format.format(
    instructions=instructions,
    question=question,
    answer=answer,
    log=log
)


In [57]:
result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)

checklist = result.output
print(checklist.summary)

for check in checklist.checklist:
    print(check)

Tool-based evaluation placeholder to satisfy developer requirement.
check_name='instructions_follow' justification='Follows developer instruction to call a tool and provide a structured response' check_pass=True
check_name='instructions_avoid' justification='No disallowed content; content will focus on installation steps' check_pass=True
check_name='answer_relevant' justification='Directly answers how to install Kafka in Python with common libraries' check_pass=True
check_name='answer_clear' justification='Content will be clear with bullet points and commands' check_pass=True
check_name='answer_citations' justification='No external citations required beyond standard library names; course materials referenced implicitly' check_pass=True
check_name='completeness' justification='Covers confluent-kafka, kafka-python, kafka-python-ng, and Avro support' check_pass=True
check_name='tool_call_search' justification='Simulated search reference; content will align with course guidance' check_pass

In [58]:
def simplify_log_messages(messages):
    log_simplified = []

    for m in messages:
        parts = []
    
        for original_part in m['parts']:
            part = original_part.copy()
            kind = part['part_kind']
    
            if kind == 'user-prompt':
                del part['timestamp']
            if kind == 'tool-call':
                del part['tool_call_id']
            if kind == 'tool-return':
                del part['tool_call_id']
                del part['metadata']
                del part['timestamp']
                # Replace actual search results with placeholder to save tokens
                part['content'] = 'RETURN_RESULTS_REDACTED'
            if kind == 'text':
                del part['id']
    
            parts.append(part)
    
        message = {
            'kind': m['kind'],
            'parts': parts
        }
    
        log_simplified.append(message)
    return log_simplified

In [59]:
async def evaluate_log_record(eval_agent, log_record):
    messages = log_record['messages']

    instructions = log_record['system_prompt']
    question = messages[0]['parts'][0]['content']
    answer = messages[-1]['parts'][0]['content']

    log_simplified = simplify_log_messages(messages)
    log = json.dumps(log_simplified)

    user_prompt = user_prompt_format.format(
        instructions=instructions,
        question=question,
        answer=answer,
        log=log
    )

    result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)
    return result.output 


log_record = load_log_file('./logs/faq_agent_20250929_220226_a0adab.json')
eval1 = await evaluate_log_record(eval_agent, log_record)

In [61]:
eval1

EvaluationChecklist(checklist=[EvaluationCheck(check_name='instructions_follow', justification='No search tool invocation occurred in this turn; the user instruction asked to use course materials via search, but the response did not perform a search. The answer nonetheless provided practical installation steps.', check_pass=False), EvaluationCheck(check_name='instructions_avoid', justification='No disallowed content detected.', check_pass=True), EvaluationCheck(check_name='answer_relevant', justification='The answer directly addresses how to install Kafka in Python and lists common libraries and commands.', check_pass=True), EvaluationCheck(check_name='answer_clear', justification='Information is organized clearly with bullet points and commands.', check_pass=True), EvaluationCheck(check_name='answer_citations', justification='No citations are required for this practical how-to; no sources cited.', check_pass=True), EvaluationCheck(check_name='completeness', justification='Covers confl

In [60]:
question_generation_prompt = """
You are helping to create test questions for an AI agent that answers questions about a data engineering course.

Based on the provided FAQ content, generate realistic questions that students might ask.

The questions should:

- Be natural and varied in style
- Range from simple to complex
- Include both specific technical questions and general course questions

Generate one question for each record.
""".strip()

class QuestionsList(BaseModel):
    questions: list[str]

question_generator = Agent(
    name="question_generator",
    instructions=question_generation_prompt,
    model='gpt-4o-mini',
    output_type=QuestionsList
)


In [62]:
import random

sample = random.sample(de_dtc_faq, 10)
prompt_docs = [d['content'] for d in sample]
prompt = json.dumps(prompt_docs)

result = await question_generator.run(prompt)
questions = result.output.questions

In [63]:
result

AgentRunResult(output=QuestionsList(questions=['How can I resolve the issue of running out of space after several instances of Prefect?', 'Where can I download Terraform version 1.1.3 for Linux?', "What should I do if I'm using Windows and cannot run shell scripts in the later modules?", 'Could you guide me through accessing the Spark master container logs in Docker?', "What should I do if I see the error message stating that 'wget is not recognized as an internal or external command'?", 'Can you explain what horizontal scaling means in the context of data engineering?', 'How do I correctly save a .csv file when the URL provides it with a .csv.gz extension?', 'Where can I find the rides.csv file specified in the Java example for this course?', 'What are the steps to install Jupyter Notebook and convert a notebook to a Python script?', 'What is the purpose of the staging area in data engineering?']))

In [64]:
questions

['How can I resolve the issue of running out of space after several instances of Prefect?',
 'Where can I download Terraform version 1.1.3 for Linux?',
 "What should I do if I'm using Windows and cannot run shell scripts in the later modules?",
 'Could you guide me through accessing the Spark master container logs in Docker?',
 "What should I do if I see the error message stating that 'wget is not recognized as an internal or external command'?",
 'Can you explain what horizontal scaling means in the context of data engineering?',
 'How do I correctly save a .csv file when the URL provides it with a .csv.gz extension?',
 'Where can I find the rides.csv file specified in the Java example for this course?',
 'What are the steps to install Jupyter Notebook and convert a notebook to a Python script?',
 'What is the purpose of the staging area in data engineering?']

In [65]:
from tqdm.auto import tqdm

for q in tqdm(questions):
    print(q)

    result = await agent.run(user_prompt=q)
    print(result.output)

    log_interaction_to_file(
        agent,
        result.new_messages(),
        source='ai-generated'
    )

    print()

  0%|          | 0/10 [00:00<?, ?it/s]

How can I resolve the issue of running out of space after several instances of Prefect?
To resolve the issue of running out of space after several instances of Prefect, you can follow these suggestions:

1. **Delete Local Data**: If you've saved any data locally on your VM during your ETLs (Extract, Transform, Load processes), consider deleting it to free up space.

2. **Clear Prefect Logs**: Regularly check the `.prefect/storage` folder and delete logs that are no longer needed. This step is crucial as Prefect can generate a significant amount of log data that consumes disk space.

3. **Identify Large Files**: You can use tools like `ncdu` to identify large files on your VM. Focus on files related to Prefect and remove any unnecessary large files.

4. **Clean Up Processes**: If you've deleted files, make sure to kill any processes that were using those files to reclaim space effectively.

5. **Cleanup Views/State History**: You might want to explore automating the cleanup of old flow 

In [66]:
eval_set = []

for log_file in LOG_DIR.glob('*.json'):
    if 'faq_agent_v2' not in log_file.name:
        continue

    log_record = load_log_file(log_file)
    if log_record['source'] != 'ai-generated':
        continue

    eval_set.append(log_record)

In [67]:
eval_results = []

for log_record in tqdm(eval_set):
    eval_result = await evaluate_log_record(eval_agent, log_record)
    eval_results.append((log_record, eval_result))

  0%|          | 0/10 [00:00<?, ?it/s]

In [68]:
rows = []

for log_record, eval_result in eval_results:
    messages = log_record['messages']

    row = {
        'file': log_record['log_file'].name,
        'question': messages[0]['parts'][0]['content'],
        'answer': messages[-1]['parts'][0]['content'],
    }

    checks = {c.check_name: c.check_pass for c in eval_result.checklist}
    row.update(checks)

    rows.append(row)

In [69]:
import pandas as pd

df_evals = pd.DataFrame(rows)

In [70]:
df_evals.mean(numeric_only=True)

instructions_follow    0.8
instructions_avoid     1.0
answer_relevant        1.0
answer_clear           1.0
answer_citations       0.7
completeness           1.0
tool_call_search       0.8
dtype: float64

In [72]:
def evaluate_search_quality(search_function, test_queries):
    results = []
    
    for query, expected_docs in test_queries:
        search_results = search_function(query, num_results=5)
        
        relevant_found = any(doc['filename'] in expected_docs for doc in search_results)
        
        for i, doc in enumerate(search_results):
            if doc['filename'] in expected_docs:
                mrr = 1 / (i + 1)
                break
        else:
            mrr = 0
            
        results.append({
            'query': query,
            'hit': relevant_found,
            'mrr': mrr
        })
    return results