In [44]:
import io
import zipfile
import requests
import frontmatter


In [45]:
url = 'https://codeload.github.com/DataTalksClub/faq/zip/refs/heads/main'
resp = requests.get(url)


In [46]:
repository_data = []

# Create a ZipFile object from the downloaded content
zf = zipfile.ZipFile(io.BytesIO(resp.content))

for file_info in zf.infolist():
    filename = file_info.filename.lower()

    # Only process markdown files
    if not filename.endswith('.md'):
        continue

    # Read and parse each file
    with zf.open(file_info) as f_in:
        content = f_in.read()
        post = frontmatter.loads(content)
        data = post.to_dict()
        data['filename'] = filename
        repository_data.append(data)

zf.close()


In [47]:
print(repository_data[1])


{'id': '9e508f2212', 'question': 'Course: When does the course start?', 'sort_order': 1, 'content': "The next cohort starts January 13th, 2025. More info at [DTC](https://datatalks.club/blog/guide-to-free-online-courses-at-datatalks-club.html).\n\n- Register before the course starts using this [link](https://airtable.com/shr6oVXeQvSI5HuWD).\n- Join the [course Telegram channel with announcements](https://t.me/dezoomcamp).\n- Don’t forget to register in DataTalks.Club's Slack and join the channel.", 'filename': 'faq-main/_questions/data-engineering-zoomcamp/general/001_9e508f2212_course-when-does-the-course-start.md'}


In [48]:
import io
import zipfile
import requests
import frontmatter

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filename = file_info.filename
        filename_lower = filename.lower()

        if not (filename_lower.endswith('.md') 
            or filename_lower.endswith('.mdx')):
            continue
    
        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                post = frontmatter.loads(content)
                data = post.to_dict()
                data['filename'] = filename
                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue
    
    zf.close()
    return repository_data


In [49]:
dtc_faq = read_repo_data('DataTalksClub', 'faq')
evidently_docs = read_repo_data('evidentlyai', 'docs')

print(f"FAQ documents: {len(dtc_faq)}")
print(f"Evidently documents: {len(evidently_docs)}")


FAQ documents: 1219
Evidently documents: 95


# Day 2 

## Step 1. Sliding Window Chunking


In [50]:
def sliding_window(seq, size, step):
    """
    Split a sequence (string) into overlapping chunks.
    Example: size=2000, step=1000
    """
    if size <= 0 or step <= 0:
        raise ValueError("size and step must be positive")

    n = len(seq)
    result = []
    for i in range(0, n, step):
        chunk = seq[i:i+size]
        result.append({'start': i, 'chunk': chunk})
        if i + size >= n:
            break
    return result


In [51]:
# Example: pick a long doc from evidently_docs
doc = evidently_docs[45]   # change index if needed
chunks = sliding_window(doc['content'], size=2000, step=1000)

print(f"Got {len(chunks)} chunks")
print(chunks[0]['chunk'][:500])  # preview first 500 chars


Got 21 chunks
In this tutorial, you will learn how to perform regression testing for LLM outputs.

You can compare new and old responses after changing a prompt, model, or anything else in your system. By re-running the same inputs with new parameters, you can spot any significant changes. This helps you push updates with confidence or identify issues to fix.

<Info>
  **This example uses Evidently Cloud.** You'll run evals in Python and upload them. You can also skip the upload and view Reports locally. For 


## ✅ Step 3. Process All Documents

In [52]:
evidently_chunks = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    chunks = sliding_window(doc_content, size=2000, step=1000)
    for chunk in chunks:
        chunk.update(doc_copy)   # keep metadata
    evidently_chunks.extend(chunks)

print(f"Total chunks: {len(evidently_chunks)}")


Total chunks: 575


## ✅ Step 4. Paragraph Splitting

In [53]:
import re

def split_by_paragraphs(text):
    return re.split(r"\n\s*\n", text.strip())

# Example
paragraphs = split_by_paragraphs(doc['content'])
print(f"Paragraphs: {len(paragraphs)}")
print(paragraphs[:2])


Paragraphs: 13
['When working on an AI system, you need test data to run automated evaluations for quality and safety. A test dataset is a structured set of test cases. It can contain:', '* Just the inputs, or\n* Both inputs and expected outputs (ground truth).']


## ✅ Step 5. Section Splitting (Markdown Headings)

In [54]:
def split_markdown_by_level(text, level=2):
    header_pattern = r'^(#{' + str(level) + r'} )(.+)$'
    pattern = re.compile(header_pattern, re.MULTILINE)

    parts = pattern.split(text)
    sections = []
    for i in range(1, len(parts), 3):
        header = parts[i] + parts[i+1]
        header = header.strip()
        content = ""
        if i+2 < len(parts):
            content = parts[i+2].strip()
        if content:
            section = f"{header}\n\n{content}"
        else:
            section = header
        sections.append(section)
    return sections

# Example
sections = split_markdown_by_level(doc['content'], level=2)
print(f"Sections: {len(sections)}")
print(sections[:1])


Sections: 0
[]


## ✅ Step 6. Apply to All Docs

In [55]:
evidently_sections = []

for doc in evidently_docs:
    doc_copy = doc.copy()
    doc_content = doc_copy.pop('content')
    sections = split_markdown_by_level(doc_content, level=2)
    for section in sections:
        section_doc = doc_copy.copy()
        section_doc['section'] = section
        evidently_sections.append(section_doc)

print(f"Total section chunks: {len(evidently_sections)}")


Total section chunks: 262


## AI SPLITS 

In [56]:
# intelligent_chunking.py  (notebook-friendly)
import os
import time
import hashlib
from typing import List
from tqdm.auto import tqdm
from openai import OpenAI

# create client from env var (OpenAI python client)
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# --- Prompt template (you can tune this) ---
PROMPT_TEMPLATE = """
Split the provided document into logical, self-contained sections suitable for a Q&A/FAQ system.
Each section must be self-contained and focused on one topic. Return the output in this EXACT format,
separating sections with a line that contains three dashes (---) on its own.

For each section, include a short section title line that starts with "##".

<DOCUMENT>
{document}
</DOCUMENT>

Required output format example:

## Section Title
Section text with all relevant details.

---

## Another Section
Another section text.

---
""".strip()

# --- Helper: call LLM (Responses API) ---
def llm_split_text(text: str, model: str = "gpt-4o-mini", timeout: int = 60) -> str:
    """
    Call the OpenAI Responses API and return the text output.
    """
    # input may be a string; using input= with instructions + input is supported
    resp = client.responses.create(
        model=model,
        input=[{"role": "user", "content": PROMPT_TEMPLATE.format(document=text)}],
        # optional: max_tokens, temperature, etc.
    )
    # responses.create returns an object with output_text (high-level convenience)
    return getattr(resp, "output_text", None) or "".join([c.get("text","") for c in resp.output[0].content])

# --- Helper: parse the LLM output into section strings ---
def parse_sections_from_output(output_text: str) -> List[str]:
    parts = [p.strip() for p in output_text.split("---")]
    # keep parts that look non-empty and start with a heading or text
    sections = [p for p in parts if p]
    return sections

# --- Slice a long doc into large pre-slices (to keep LLM inputs bounded) ---
def big_slices(text: str, slice_size: int = 12000, overlap: int = 1000):
    if slice_size <= 0 or overlap < 0 or overlap >= slice_size:
        raise ValueError("slice_size must be > overlap >= 0")
    n = len(text)
    i = 0
    while i < n:
        yield text[i:i + slice_size]
        if i + slice_size >= n:
            break
        i += (slice_size - overlap)

# --- Top-level intelligent chunking function ---
def intelligent_chunking(text: str,
                         model: str = "gpt-4o-mini",
                         slice_size: int = 12000,
                         slice_overlap: int = 1000,
                         max_slices: int = 20,
                         sleep_between_calls: float = 0.6):
    """
    Use an LLM to split text into semantically-coherent sections.
    For very long documents we slice into large pre-slices to avoid one huge LLM call.
    """
    sections = []
    seen_hashes = set()

    # If text is small enough, do a single call
    if len(text) <= slice_size:
        out = llm_split_text(text, model=model)
        parsed = parse_sections_from_output(out)
        for s in parsed:
            h = hashlib.sha256(s.encode("utf-8")).hexdigest()
            if h not in seen_hashes:
                seen_hashes.add(h)
                sections.append(s)
        return sections

    # If text is long: process by big slices
    for idx, sl in enumerate(big_slices(text, slice_size=slice_size, overlap=slice_overlap)):
        if idx >= max_slices:
            # avoid infinite cost — stop after max_slices
            break
        out = llm_split_text(sl, model=model)
        parsed = parse_sections_from_output(out)
        for s in parsed:
            h = hashlib.sha256(s.encode("utf-8")).hexdigest()
            if h not in seen_hashes:
                seen_hashes.add(h)
                sections.append(s)

        # small sleep to avoid hitting rate limits; tune as needed
        time.sleep(sleep_between_calls)

    return sections


  pseudomatch = _compile(PseudoToken).match(line, pos)


In [57]:
# assume evidently_docs is a list of dicts with 'content' and metadata
from tqdm.auto import tqdm
all_ai_sections = []

for doc in tqdm(evidently_docs):
    doc_meta = {k:v for k,v in doc.items() if k != "content"}
    text = doc["content"]
    try:
        sections = intelligent_chunking(text, model="gpt-4o-mini",
                                        slice_size=12000, slice_overlap=1000,
                                        max_slices=30, sleep_between_calls=0.6)
    except Exception as e:
        print("LLM error on doc:", doc_meta.get("filename","<no filename>"), e)
        sections = []

    for sec in sections:
        entry = doc_meta.copy()
        entry["ai_section"] = sec
        all_ai_sections.append(entry)

print("AI sections created:", len(all_ai_sections))

# Save to a file for later ingestion
import json
with open("evidently_ai_sections.json", "w", encoding="utf-8") as f:
    json.dump(all_ai_sections, f, ensure_ascii=False, indent=2)


  0%|          | 0/95 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [58]:
from minsearch import Index

index = Index(
    text_fields=["chunk", "title", "description", "filename"],
    keyword_fields=[]
)
index.fit(evidently_chunks)

# Try a query
query = "What should be in a test dataset for AI evaluation?"
results = index.search(query, num_results=5)
print(results)


[{'start': 0, 'chunk': 'Retrieval-Augmented Generation (RAG) systems rely on retrieving answers from a knowledge base before generating responses. To evaluate them effectively, you need a test dataset that reflects what the system *should* know.\n\nInstead of manually creating test cases, you can generate them directly from your knowledge source, ensuring accurate and relevant ground truth data.\n\n## Create a RAG test dataset\n\nYou can generate ground truth RAG dataset from your data source.\n\n### 1. Create a Project\n\nIn the Evidently UI, start a new Project or open an existing one.\n\n* Navigate to “Datasets” in the left menu.\n* Click “Generate” and select the “RAG” option.\n\n![](/images/synthetic/synthetic_data_select_method.png)\n\n### 2. Upload your knowledge base\n\nSelect a file containing the information your AI system retrieves from. Supported formats: Markdown (.md), CSV, TXT, PDFs. Choose how many inputs to generate.\n\n![](/images/synthetic/synthetic_data_inputs_examp

In [59]:
from sentence_transformers import SentenceTransformer
from minsearch import VectorSearch
import numpy as np
from tqdm.auto import tqdm

embedding_model = SentenceTransformer("multi-qa-distilbert-cos-v1")

# Encode all chunks
evidently_embeddings = []
for d in tqdm(evidently_chunks):
    v = embedding_model.encode(d["chunk"])
    evidently_embeddings.append(v)
evidently_embeddings = np.array(evidently_embeddings)

# Build the vector index
evidently_vindex = VectorSearch()
evidently_vindex.fit(evidently_embeddings, evidently_chunks)

# Try a query
q = embedding_model.encode("How do I evaluate an AI model?")
vector_results = evidently_vindex.search(q, num_results=5)
print(vector_results)


  0%|          | 0/575 [00:00<?, ?it/s]

[{'start': 0, 'chunk': "You may need evaluations at different stages of your AI product development:\n\n* **Ad hoc analysis.** Spot-check the quality of your data or AI outputs.\n\n* **Experiments**. Test different parameters, models, or prompts and compare outcomes.\n\n* **Safety and adversarial testing.** Evaluate how your system handles edge cases and adversarial inputs, including on synthetic data.\n\n* **Regression testing.** Ensure the performance does not degrade after updates or fixes.\n\n* **Monitoring**. Track the response quality for production systems.\n\nEvidently supports all these workflows. You can run evals locally or directly on the platform.\n\n## Evaluations via API\n\n<Check>\n  Supported in: `Evidently OSS`, `Evidently Cloud` and `Evidently Enterprise`.\n</Check>\n\nThis is perfect for experiments, CI/CD workflows, or custom evaluation pipelines.\n\n![](/images/evals_flow_python.png)\n\n**How it works**:\n\n* Run Python-based evaluations on your AI outputs by gene

In [60]:
def text_search(query):
    return index.search(query, num_results=5)

def vector_search(query):
    q = embedding_model.encode(query)
    return evidently_vindex.search(q, num_results=5)

def hybrid_search(query):
    text_results = text_search(query)
    vector_results = vector_search(query)
    
    seen = set()
    final = []
    for r in text_results + vector_results:
        fid = r.get("filename")
        if fid not in seen:
            seen.add(fid)
            final.append(r)
    return final

print(hybrid_search("How can I evaluate my dataset?"))


[{'start': 0, 'chunk': 'Retrieval-Augmented Generation (RAG) systems rely on retrieving answers from a knowledge base before generating responses. To evaluate them effectively, you need a test dataset that reflects what the system *should* know.\n\nInstead of manually creating test cases, you can generate them directly from your knowledge source, ensuring accurate and relevant ground truth data.\n\n## Create a RAG test dataset\n\nYou can generate ground truth RAG dataset from your data source.\n\n### 1. Create a Project\n\nIn the Evidently UI, start a new Project or open an existing one.\n\n* Navigate to “Datasets” in the left menu.\n* Click “Generate” and select the “RAG” option.\n\n![](/images/synthetic/synthetic_data_select_method.png)\n\n### 2. Upload your knowledge base\n\nSelect a file containing the information your AI system retrieves from. Supported formats: Markdown (.md), CSV, TXT, PDFs. Choose how many inputs to generate.\n\n![](/images/synthetic/synthetic_data_inputs_examp

In [61]:
def text_search(query: str):
    return faq_index.search(query, num_results=5)


In [62]:
from typing import List, Any

def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results.
    """
    return faq_index.search(query, num_results=5)


In [63]:
from pydantic_ai import Agent

system_prompt = """
You are a helpful assistant for a course.

Use the search tool to find relevant information from the course materials before answering questions.

If search gives results, base your answer on them.  
If search doesn’t help, give general guidance.
"""

agent = Agent(
    name="faq_agent",
    instructions=system_prompt,
    tools=[text_search],
    model="gpt-4o-mini"
)


In [64]:
question = "I just discovered the course, can I join now?"

result = await agent.run(user_prompt=question)


NameError: name 'faq_index' is not defined

In [67]:
zip_file = zipfile.ZipFile(io.BytesIO(resp.content))

# Extract to data/faq
extract_path = "data/faq"
os.makedirs(extract_path, exist_ok=True)
zip_file.extractall(extract_path)

In [71]:
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader

# Load documents
docs = SimpleDirectoryReader("data/faq/faq-main").load_data()

# Build index
index = VectorStoreIndex.from_documents(docs)

# Persist
index.storage_context.persist("storage/faq_index")


In [72]:
from typing import List, Any

def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results returned by the FAQ index.
    """
    return faq_index.search(query, num_results=5)


In [79]:
from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage, SimpleDirectoryReader

# 1. Load documents from your docs folder
docs = SimpleDirectoryReader("data/docs").load_data()

# 2. Build index
docs_index = VectorStoreIndex.from_documents(docs)

# 3. (Optional) Persist index
docs_index.storage_context.persist(persist_dir="storage/docs_index")

# 4. Reload later if already saved
storage_context = StorageContext.from_defaults(persist_dir="storage/docs_index")
docs_index = load_index_from_storage(storage_context)


ValueError: Directory data/docs does not exist.

In [76]:
def doc_search(query: str) -> List[Any]:
    """
    Search through the documentation index.

    Args:
        query (str): The user query.

    Returns:
        List[Any]: Up to 5 relevant results from the documentation index.
    """
    return docs_index.search(query, num_results=5)


In [74]:
from pydantic_ai import Agent

system_prompt = """
You are a helpful assistant that answers user questions
based on the documentation provided. Use the doc_search tool when needed.
"""

agent = Agent(
    name="docs_agent",
    instructions=system_prompt,
    tools=[doc_search],   # tools you defined
    model="gpt-4o-mini"   # or "gpt-4o"
)


In [80]:
import os
import requests
import zipfile
from io import BytesIO
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

# Where to store docs
docs_dir = "data/docs"

# Download FAQ repo if not already there
if not os.path.exists(docs_dir):
    os.makedirs("data", exist_ok=True)
    url = "https://codeload.github.com/DataTalksClub/faq/zip/refs/heads/main"
    resp = requests.get(url)
    resp.raise_for_status()

    # Unzip into data/docs
    with zipfile.ZipFile(BytesIO(resp.content)) as zf:
        zf.extractall("data")
    
    # Move extracted content into data/docs
    extracted_folder = "data/faq-main"   # GitHub repo unzips as faq-main
    if os.path.exists(extracted_folder):
        os.rename(extracted_folder, docs_dir)

# Load documents
docs = SimpleDirectoryReader(docs_dir).load_data()

# Build index
index = VectorStoreIndex.from_documents(docs)

print(f"Loaded {len(docs)} documents")


Loaded 10 documents


In [81]:
question = "How do I set up the environment for this project?"

result = await agent.run(user_prompt=question)
print(result)


NameError: name 'docs_index' is not defined

## Day 4 

In [88]:
import sys
print(sys.executable)


C:\Users\Ghada\aihero\course\.venv\Scripts\python.exe


In [89]:
from typing import List, Any

# Create a query engine once from your index
query_engine = index.as_query_engine()

def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the documentation index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of search results from the documentation index.
    """
    response = query_engine.query(query)
    return [str(response)]


In [90]:
# Example: run a search
results = text_search("How do I join the bootcamp?")
print(results)


['You can join the bootcamp by visiting the DataTalks.Club website and enrolling in the specific course you are interested in, such as Data Engineering Zoomcamp, Machine Learning Zoomcamp, or MLOps Zoomcamp.']


In [92]:
from pydantic_ai import Agent

system_prompt = """
You are a helpful assistant that answers questions about the DataTalks.Club documentation.
Use the text_search tool when you need to look up information.
"""

agent = Agent(
    name="docs_agent",
    instructions=system_prompt,
    tools=[text_search],   # 👈 give the agent your tool
    model="gpt-4o-mini"    # or another model you prefer
)


In [97]:
question = "I just discovered the course, can I join now?"

# If you're in Jupyter/Notebook, don't use asyncio.run()
result = await agent.run(user_prompt=question)

# In v1.0.11, the final text is available at .output
print("Final Answer:", result.output)

# To debug the internal flow:
print("\n--- Messages ---")
for msg in result.new_messages():
    print(msg)


Final Answer: Yes, you can join the course by enrolling in the available courses listed on the DataTalks.Club FAQ page.

--- Messages ---
ModelRequest(parts=[UserPromptPart(content='I just discovered the course, can I join now?', timestamp=datetime.datetime(2025, 9, 30, 15, 12, 19, 561129, tzinfo=datetime.timezone.utc))], instructions='You are a helpful assistant that answers questions about the DataTalks.Club documentation.\nUse the text_search tool when you need to look up information.')
ModelResponse(parts=[ToolCallPart(tool_name='text_search', args='{"query":"joining the course"}', tool_call_id='call_HFI3AJg5dNEWaKr2cz8LMoOZ')], usage=RequestUsage(input_tokens=127, output_tokens=16, details={'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}), model_name='gpt-4o-mini-2024-07-18', timestamp=datetime.datetime(2025, 9, 30, 15, 12, 20, tzinfo=TzInfo(UTC)), provider_name='openai', provider_details={'finish_reason': 'tool_calls'}, 

In [98]:
question = "When are the deadlines for the homework?"
result = await agent.run(user_prompt=question)
print("Final Answer:", result.output)


Final Answer: I couldn't find any specific information about the homework deadlines in the documentation. It might be helpful to check the course materials or announcements directly related to your class.


## Day 5 

In [106]:
import json
from datetime import datetime
from pathlib import Path
import secrets

LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)

def serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")

def log_interaction_to_file(agent, messages, source="user"):
    # Convert Pydantic AI messages to plain dicts
    serialized_messages = []
    for m in messages:
        if hasattr(m, "dict"):  # Pydantic model
            serialized_messages.append(m.dict())
        else:
            serialized_messages.append(str(m))  # fallback to string

    # Extract tools
    tools = []
    for ts in agent.toolsets:
        tools.extend(ts.tools.keys())

    entry = {
        "agent_name": agent.name,
        "system_prompt": getattr(agent, "_instructions", ""),
        "model_name": getattr(agent.model, "model_name", str(agent.model)),
        "tools": tools,
        "messages": serialized_messages,
        "source": source,
    }

    ts_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    rand_hex = secrets.token_hex(3)
    filename = f"{agent.name}_{ts_str}_{rand_hex}.json"
    filepath = LOG_DIR / filename

    with filepath.open("w", encoding="utf-8") as f_out:
        json.dump(entry, f_out, indent=2, ensure_ascii=False, default=serializer)

    print(f"✅ Logged to {filepath}")
    return filepath


In [107]:
question = "When are the deadlines for the homework?"
result = await agent.run(user_prompt=question)
print("Final Answer:", result.output)

log_interaction_to_file(agent, result.new_messages())


Final Answer: The documentation does not provide specific dates for homework deadlines. It suggests using a Makefile to manage and verify your code before submission. If you need exact deadlines, you may want to check the course schedule or announcements directly related to the homework.
✅ Logged to logs\docs_agent_20250930_152313_495ae2.json


WindowsPath('logs/docs_agent_20250930_152313_495ae2.json')

In [109]:
def simplify_log_messages(messages):
    log_simplified = []
    for m in messages:
        parts = []
        for original_part in m['parts']:
            part = original_part.copy()
            kind = part['part_kind']
            if kind == 'user-prompt': del part['timestamp']
            if kind == 'tool-call': del part['tool_call_id']
            if kind == 'tool-return':
                del part['tool_call_id']; del part['metadata']; del part['timestamp']
                part['content'] = 'RETURN_RESULTS_REDACTED'
            if kind == 'text': del part['id']
            parts.append(part)
        message = {'kind': m['kind'], 'parts': parts}
        log_simplified.append(message)
    return log_simplified


In [110]:
from pydantic import BaseModel
from pydantic_ai import Agent

evaluation_prompt = """
Use this checklist to evaluate the quality of an AI agent's answer (<ANSWER>) to a user question (<QUESTION>).
We also include the entire log (<LOG>) for analysis.

Checklist:
- instructions_follow: Did the agent follow the user's instructions?
- instructions_avoid: Did the agent avoid forbidden actions?
- answer_relevant: Does the answer address the question?
- answer_clear: Is it clear and correct?
- answer_citations: Are references included?
- completeness: Does it cover all key aspects?
- tool_call_search: Was the search tool used?

Output true/false for each check and provide a short justification.
""".strip()

class EvaluationCheck(BaseModel):
    check_name: str
    justification: str
    check_pass: bool

class EvaluationChecklist(BaseModel):
    checklist: list[EvaluationCheck]
    summary: str

eval_agent = Agent(
    name='eval_agent',
    model='gpt-5-nano',
    instructions=evaluation_prompt,
    output_type=EvaluationChecklist
)

user_prompt_format = """
<INSTRUCTIONS>{instructions}</INSTRUCTIONS>
<QUESTION>{question}</QUESTION>
<ANSWER>{answer}</ANSWER>
<LOG>{log}</LOG>
""".strip()


In [111]:
def load_log_file(log_file):
    with open(log_file, 'r') as f_in:
        log_data = json.load(f_in)
        log_data['log_file'] = log_file
        return log_data

async def evaluate_log_record(eval_agent, log_record):
    messages = log_record['messages']
    instructions = log_record['system_prompt']
    question = messages[0]['parts'][0]['content']
    answer = messages[-1]['parts'][0]['content']
    log_simplified = simplify_log_messages(messages)
    log = json.dumps(log_simplified)
    user_prompt = user_prompt_format.format(
        instructions=instructions,
        question=question,
        answer=answer,
        log=log
    )
    result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)
    return result.output


In [113]:
from pathlib import Path

LOG_DIR = Path('./logs')
for f in LOG_DIR.glob('*.json'):
    print(f.name)


20250930_152048.json
20250930_152136.json
20250930_152226.json
docs_agent_20250930_152313_495ae2.json


In [114]:
log_files = sorted(LOG_DIR.glob('*.json'), reverse=True)  # latest first
log_record = load_log_file(log_files[0])


In [115]:
question = "How do I install Kafka in Python?"
result = await agent.run(user_prompt=question)
log_path = log_interaction_to_file(agent, result.new_messages())
print("Logged to:", log_path)


✅ Logged to logs\docs_agent_20250930_152635_777980.json
Logged to: logs\docs_agent_20250930_152635_777980.json


In [116]:
log_file = './logs/docs_agent_20250930_152635_777980.json'
log_record = load_log_file(log_file)


In [118]:
import json

log_file = './logs/docs_agent_20250930_152635_777980.json'
with open(log_file, 'r', encoding='utf-8') as f:
    log_record = json.load(f)

# Check the first message
print(type(log_record['messages']))
print(log_record['messages'][0])


<class 'list'>
ModelRequest(parts=[UserPromptPart(content='How do I install Kafka in Python?', timestamp=datetime.datetime(2025, 9, 30, 15, 26, 29, 812108, tzinfo=datetime.timezone.utc))], instructions='You are a helpful assistant that answers questions about the DataTalks.Club documentation.\nUse the text_search tool when you need to look up information.')


In [129]:
import json
from datetime import datetime

# Convert datetime objects to ISO format
def serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")

messages_dict = ModelMessagesTypeAdapter.dump_python(result.new_messages())

with open("log.json", "w", encoding="utf-8") as f:
    json.dump(messages_dict, f, indent=2, ensure_ascii=False, default=serializer)


In [130]:
with open("log.json", "r", encoding="utf-8") as f:
    log_record = json.load(f)


In [131]:
messages = log_record

# Extract question and answer
question = messages[0]['parts'][0]['content']
answer = messages[-1]['parts'][0]['content']


In [133]:
# Use your agent object directly, not result.agent
user_prompt = user_prompt_format.format(
    instructions=agent._instructions,  # system prompt from the agent
    question=question,
    answer=answer,
    log=json.dumps(messages)  # or a simplified version
)


In [135]:
# Run the evaluation agent
eval_result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)

# Access the Pydantic output
checklist = eval_result.output

# Print summary and checks
print(checklist.summary)

for check in checklist.checklist:
    print(check)


Tool-assisted evaluation suggested the answer is correct and concise, but it lacks citations and broader guidance (e.g., usage examples, alternatives like kafka-python).
check_name='instructions_follow' justification="We attempted to adhere to the user's instruction to answer about DataTalks.Club documentation and used the text_search lookup as part of preparing a precise answer." check_pass=True
check_name='instructions_avoid' justification='No disallowed actions detected; content is technical guidance.' check_pass=True
check_name='answer_relevant' justification='The answer provides information on installing Kafka in Python and mentions confluent-kafka as the library.' check_pass=True
check_name='answer_clear' justification='The answer is clear and actionable (pip install confluent-kafka).' check_pass=True
check_name='answer_citations' justification='No external citations are included in the answer. If DataTalks.Club docs require citations, they are not included in this short answer.'

In [138]:
# Access structured output
checklist = eval_result.output

# Print overall summary
print("Evaluation Summary:", checklist.summary)

# Print each individual check
for check in checklist.checklist:
    print(f"{check.check_name}: {check.check_pass} - {check.justification}")


Evaluation Summary: Tool-assisted evaluation suggested the answer is correct and concise, but it lacks citations and broader guidance (e.g., usage examples, alternatives like kafka-python).
instructions_follow: True - We attempted to adhere to the user's instruction to answer about DataTalks.Club documentation and used the text_search lookup as part of preparing a precise answer.
instructions_avoid: True - No disallowed actions detected; content is technical guidance.
answer_relevant: True - The answer provides information on installing Kafka in Python and mentions confluent-kafka as the library.
answer_clear: True - The answer is clear and actionable (pip install confluent-kafka).
answer_citations: False - No external citations are included in the answer. If DataTalks.Club docs require citations, they are not included in this short answer.
completeness: True - Provides the basic installation command but could mention usage and alternatives.
tool_call_search: True - A text_search was u

In [139]:
eval_results = []

for log_file in LOG_DIR.glob("*.json"):
    log_record = load_log_file(log_file)
    user_prompt = create_user_prompt_from_log(log_record)  # use your formatting function
    result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)
    eval_results.append((log_file.name, result.output))


JSONDecodeError: Expecting value: line 3 column 20 (char 66)

In [144]:
import pandas as pd

rows = []
for log_name, checklist in eval_results:
    row = {"file": log_name}
    for check in checklist.checklist:
        row[check.check_name] = check.check_pass
    rows.append(row)

df = pd.DataFrame(rows)
print(df)
print(df.mean(numeric_only=True))  # average pass rate for each check


Empty DataFrame
Columns: []
Index: []
Series([], dtype: float64)


{
  "timestamp": "2025-09-30T15:20:48.857088",
  "system_prompt": 


In [142]:
messages_dict = ModelMessagesTypeAdapter.dump_python(result.new_messages())
log_data = {
    "messages": messages_dict,
    "system_prompt": result.agent.instructions if hasattr(result, "agent") else "No instructions"
}

with open("log.json", "w", encoding="utf-8") as f:
    json.dump(log_data, f, indent=2, ensure_ascii=False, default=serializer)


In [143]:
with open(log_file, 'r', encoding='utf-8') as f:
    content = f.read()
print(content[:500])  # preview first 500 chars


{
  "timestamp": "2025-09-30T15:20:48.857088",
  "system_prompt": 


In [146]:
log_record = load_log_file("logs/docs_agent_20250930_152635_777980.json")
messages = log_record['messages']
print(type(messages))       # should be list
print(messages[0])          # see what the first item actually is


<class 'list'>
ModelRequest(parts=[UserPromptPart(content='How do I install Kafka in Python?', timestamp=datetime.datetime(2025, 9, 30, 15, 26, 29, 812108, tzinfo=datetime.timezone.utc))], instructions='You are a helpful assistant that answers questions about the DataTalks.Club documentation.\nUse the text_search tool when you need to look up information.')


In [147]:
# Convert string messages to proper format
fixed_messages = []
for m in messages:
    if isinstance(m, str):
        fixed_messages.append({
            "source": "user",
            "parts": [{"content": m, "timestamp": None}]
        })
    else:
        fixed_messages.append(m)

messages = fixed_messages


In [148]:
question = messages[0]['parts'][0]['content']
answer = messages[-1]['parts'][0]['content']


In [149]:
log_record = load_log_file("logs/docs_agent_20250930_152635_777980.json")
messages = log_record['messages']

# Fix messages if they are strings
fixed_messages = []
for m in messages:
    if isinstance(m, str):
        fixed_messages.append({
            "source": "user",
            "parts": [{"content": m, "timestamp": None}]
        })
    else:
        fixed_messages.append(m)

messages = fixed_messages

# Now safely get question and answer
question = messages[0]['parts'][0]['content']
answer = messages[-1]['parts'][0]['content']

print("Question:", question)
print("Answer:", answer)


Question: ModelRequest(parts=[UserPromptPart(content='How do I install Kafka in Python?', timestamp=datetime.datetime(2025, 9, 30, 15, 26, 29, 812108, tzinfo=datetime.timezone.utc))], instructions='You are a helpful assistant that answers questions about the DataTalks.Club documentation.\nUse the text_search tool when you need to look up information.')
Answer: ModelResponse(parts=[TextPart(content='To install Kafka in Python, you can use the `confluent-kafka` library. You can install it using pip with the following command:\n\n```bash\npip install confluent-kafka\n``` \n\nThis library provides a Python client for Apache Kafka.')], usage=RequestUsage(input_tokens=168, output_tokens=53, details={'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}), model_name='gpt-4o-mini-2024-07-18', timestamp=datetime.datetime(2025, 9, 30, 15, 26, 34, tzinfo=TzInfo(UTC)), provider_name='openai', provider_details={'finish_reason': 'stop'}, provider

In [150]:
user_prompt = f"""
Instructions: {log_record['system_prompt']}

Question: {question}
Answer: {answer}

Log: {json.dumps(messages, indent=2, ensure_ascii=False)}
"""


In [151]:
eval_result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)


In [153]:
# Just print its attributes
print(dir(eval_result))
# Or a nicer view
import pprint
pprint.pprint(eval_result.__dict__)


['__annotations__', '__class__', '__class_getitem__', '__dataclass_fields__', '__dataclass_params__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__match_args__', '__module__', '__ne__', '__new__', '__orig_bases__', '__parameters__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_is_protocol', '_new_message_index', '_output_tool_name', '_set_output_tool_return', '_state', '_traceparent', '_traceparent_value', 'all_messages', 'all_messages_json', 'new_messages', 'new_messages_json', 'output', 'timestamp', 'usage']
{'_new_message_index': 0,
 '_output_tool_name': 'final_result',
 '_state': GraphAgentState(message_history=[ModelRequest(parts=[UserPromptPart(content='\nInstructions: You are a helpful assistant that answers questions about the DataTalks.Club 

In [154]:
# Access the checklist and summary properly
evaluation = eval_result.output  # This is an EvaluationChecklist

# Print summary
print("Summary:")
print(evaluation.summary)

# Print each check
print("\nChecklist:")
for check in evaluation.checklist:
    print(f"- {check.check_name}: {check.check_pass} ({check.justification})")


Summary:
Triggered a text search as required by the developer instruction to fetch DataTalks.Club docs on Kafka installation for Python. Awaiting tool results.

Checklist:
- instructions_follow: True (The assistant will call a search tool as required by the developer instruction; after tool results, it will provide an answer. The final content is prepared to answer the user's question.)
