# Day 5: Evaluation

In [35]:
import os
from dotenv import load_dotenv
load_dotenv()
# print(os.environ.get("GROQ_API_KEY"))

True

In [36]:
import io
import zipfile
import requests
import frontmatter

def read_repo_data(repo_owner, repo_name):
    """
    Download and parse all markdown files from a GitHub repository.
    
    Args:
        repo_owner: GitHub username or organization
        repo_name: Repository name
    
    Returns:
        List of dictionaries containing file content and metadata
    """
    prefix = 'https://codeload.github.com' 
    url = f'{prefix}/{repo_owner}/{repo_name}/zip/refs/heads/main'
    resp = requests.get(url)
    
    if resp.status_code != 200:
        raise Exception(f"Failed to download repository: {resp.status_code}")

    repository_data = []
    zf = zipfile.ZipFile(io.BytesIO(resp.content))
    
    for file_info in zf.infolist():
        filename = file_info.filename
        filename_lower = filename.lower()

        if not (filename_lower.endswith('.md') 
            or filename_lower.endswith('.mdx')):
            continue
    
        try:
            with zf.open(file_info) as f_in:
                content = f_in.read().decode('utf-8', errors='ignore')
                post = frontmatter.loads(content)
                data = post.to_dict()
                data['filename'] = filename
                repository_data.append(data)
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            continue
    
    zf.close()
    return repository_data




from minsearch import Index

dtc_faq = read_repo_data('DataTalksClub', 'faq')

de_dtc_faq = [d for d in dtc_faq if 'data-engineering' in d['filename']]

faq_index = Index(
    text_fields=["question", "content"],
    keyword_fields=[]
)


def text_search(query):
    return faq_index.search(query, num_results=5)


In [37]:
from typing import List, Any
from pydantic_ai import Agent


def text_search(query: str) -> List[Any]:
    """
    Perform a text-based search on the FAQ index.

    Args:
        query (str): The search query string.

    Returns:
        List[Any]: A list of up to 5 search results returned by the FAQ index.
    """
    return faq_index.search(query, num_results=5)


system_prompt = """
You are a helpful assistant for a  course. 

Use the search tool to find relevant information from the course materials before answering questions.

If you can find specific information through search, use it to provide accurate answers.
If the search doesn't return relevant results, let the user know and provide general guidance.
"""

from pydantic_ai import Agent

agent = Agent(
    name="faq_agent",
    instructions=system_prompt,
    tools=[text_search],
    model='groq:llama-3.1-8b-instant'
)


In [38]:
question = "how do I install Kafka in Python?"
result = await agent.run(user_prompt=question)


In [39]:
from pydantic_ai.messages import ModelMessagesTypeAdapter


def log_entry(agent, messages, source="user"):
    tools = []

    for ts in agent.toolsets:
        tools.extend(ts.tools.keys())

    dict_messages = ModelMessagesTypeAdapter.dump_python(messages)

    return {
        "agent_name": agent.name,
        "system_prompt": agent._instructions,
        "provider": agent.model.system,
        "model": agent.model.model_name,
        "tools": tools,
        "messages": dict_messages,
        "source": source
    }


In [40]:
import json
import secrets
from pathlib import Path
from datetime import datetime


LOG_DIR = Path('logs')
LOG_DIR.mkdir(exist_ok=True)


def serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")


def log_interaction_to_file(agent, messages, source='user'):
    entry = log_entry(agent, messages, source)

    ts = entry['messages'][-1]['timestamp']
    ts_str = ts.strftime("%Y%m%d_%H%M%S")
    rand_hex = secrets.token_hex(3)

    filename = f"{agent.name}_{ts_str}_{rand_hex}.json"
    filepath = LOG_DIR / filename

    with filepath.open("w", encoding="utf-8") as f_out:
        json.dump(entry, f_out, indent=2, default=serializer)

    return filepath


In [41]:
"""
Try these questions:

how do I use docker on windows?
can I join late and get a certificate?
what do I need to do for the certificate?

"""

question = input()
result = await agent.run(user_prompt=question)
print(result.output)
log_interaction_to_file(agent, result.new_messages())


Unfortunately, the search results were not very helpful. However, based on general knowledge, I can provide some steps to get you started with using Docker on Windows.

To use Docker on Windows, you will need to:

1. **Install Docker Desktop**: Download and install Docker Desktop from the official Docker website. This will give you the Docker CLI, a GUI dashboard, and a few other tools.
2. **Create a Docker account**: If you don't already have a Docker account, create one on the Docker website. This will give you access to the Docker Hub, where you can pull and push images.
3. **Pull a Docker image**: Use the `docker pull` command to download a Docker image from Docker Hub. For example, to pull the official Ubuntu image, run `docker pull ubuntu`.
4. **Run a Docker container**: Use the `docker run` command to start a new container from the image you pulled. For example, to run a new Ubuntu container, run `docker run -it ubuntu`.
5. **Use the Docker CLI**: The Docker CLI is used to manag

PosixPath('logs/faq_agent_20250930_022336_b7fae0.json')

In [42]:
system_prompt = """
You are a helpful assistant for a course.  

Use the search tool to find relevant information from the course materials before answering questions.  

If you can find specific information through search, use it to provide accurate answers.

Always include references by citing the filename of the source material you used.  
When citing the reference, replace "faq-main" by the full path to the GitHub repository: "https://github.com/DataTalksClub/faq/blob/main/"
Format: [LINK TITLE](FULL_GITHUB_LINK)

If the search doesn't return relevant results, let the user know and provide general guidance.  
""".strip()

# Create another version of agent, let's call it faq_agent_v2
agent = Agent(
    name="faq_agent_v2",
    instructions=system_prompt,
    tools=[text_search],
    model='groq:llama-3.1-8b-instant'
)


In [43]:
evaluation_prompt = """
Use this checklist to evaluate the quality of an AI agent's answer (<ANSWER>) to a user question (<QUESTION>).
We also include the entire log (<LOG>) for analysis.

For each item, check if the condition is met. 

Checklist:

- instructions_follow: The agent followed the user's instructions (in <INSTRUCTIONS>)
- instructions_avoid: The agent avoided doing things it was told not to do  
- answer_relevant: The response directly addresses the user's question  
- answer_clear: The answer is clear and correct  
- answer_citations: The response includes proper citations or sources when required  
- completeness: The response is complete and covers all key aspects of the request
- tool_call_search: Is the search tool invoked? 

Output true/false for each check and provide a short explanation for your judgment.
""".strip()


In [44]:
from pydantic import BaseModel

class EvaluationCheck(BaseModel):
    check_name: str
    justification: str
    check_pass: bool

class EvaluationChecklist(BaseModel):
    checklist: list[EvaluationCheck]
    summary: str


In [45]:
eval_agent = Agent(
    name='eval_agent',
    model='groq:llama-3.1-8b-instant',
    output_type=EvaluationChecklist
)


In [46]:
user_prompt_format = """
<INSTRUCTIONS>{instructions}</INSTRUCTIONS>
<QUESTION>{question}</QUESTION>
<ANSWER>{answer}</ANSWER>
<LOG>{log}</LOG>
""".strip()


In [47]:
def load_log_file(log_file):
    with open(log_file, 'r') as f_in:
        log_data = json.load(f_in)
        log_data['log_file'] = log_file
        return log_data


In [50]:
#log_record = load_log_file('./logs/faq_agent_v2_20250926_072928_467470.json')
log_record = load_log_file('/workspaces/7-Days-AI-Agents-Email-Crash-Course/logs/faq_agent_20250930_022336_b7fae0.json')

instructions = log_record['system_prompt']
question = log_record['messages'][0]['parts'][0]['content']
answer = log_record['messages'][-1]['parts'][0]['content']
log = json.dumps(log_record['messages'])

user_prompt = user_prompt_format.format(
    instructions=instructions,
    question=question,
    answer=answer,
    log=log
)


In [52]:
result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)

checklist = result.output
print(checklist.summary)

for check in checklist.checklist:
    print(check)


ModelHTTPError: status_code: 400, model_name: llama-3.1-8b-instant, body: {'error': {'message': "tool call validation failed: attempted to call tool 'brave_search' which was not in request.tools", 'type': 'invalid_request_error', 'code': 'tool_use_failed', 'failed_generation': '<function=brave_search>{"query": "using docker on windows"} </function>'}}

In [None]:
def simplify_log_messages(messages):
    log_simplified = []

    for m in messages:
        parts = []
    
        for original_part in m['parts']:
            part = original_part.copy()
            kind = part['part_kind']
    
            if kind == 'user-prompt':
                del part['timestamp']
            if kind == 'tool-call':
                del part['tool_call_id']
            if kind == 'tool-return':
                del part['tool_call_id']
                del part['metadata']
                del part['timestamp']
                # Replace actual search results with placeholder to save tokens
                part['content'] = 'RETURN_RESULTS_REDACTED'
            if kind == 'text':
                del part['id']
    
            parts.append(part)
    
        message = {
            'kind': m['kind'],
            'parts': parts
        }
    
        log_simplified.append(message)
    return log_simplified


In [None]:
async def evaluate_log_record(eval_agent, log_record):
    messages = log_record['messages']

    instructions = log_record['system_prompt']
    question = messages[0]['parts'][0]['content']
    answer = messages[-1]['parts'][0]['content']

    log_simplified = simplify_log_messages(messages)
    log = json.dumps(log_simplified)

    user_prompt = user_prompt_format.format(
        instructions=instructions,
        question=question,
        answer=answer,
        log=log
    )

    result = await eval_agent.run(user_prompt, output_type=EvaluationChecklist)
    return result.output 


#log_record = load_log_file('./logs/faq_agent_v2_20250926_072928_467470.json')
log_record = load_log_file('/workspaces/7-Days-AI-Agents-Email-Crash-Course/logs/faq_agent_20250930_022336_b7fae0.json')

eval1 = await evaluate_log_record(eval_agent, log_record)


In [None]:
question_generation_prompt = """
You are helping to create test questions for an AI agent that answers questions about a data engineering course.

Based on the provided FAQ content, generate realistic questions that students might ask.

The questions should:

- Be natural and varied in style
- Range from simple to complex
- Include both specific technical questions and general course questions

Generate one question for each record.
""".strip()

class QuestionsList(BaseModel):
    questions: list[str]

question_generator = Agent(
    name="question_generator",
    instructions=question_generation_prompt,
    model='groq:llama-3.1-8b-instant',
    output_type=QuestionsList
)


In [None]:
import random

sample = random.sample(de_dtc_faq, 10)
prompt_docs = [d['content'] for d in sample]
prompt = json.dumps(prompt_docs)

result = await question_generator.run(prompt)
questions = result.output.questions


In [None]:
from tqdm.auto import tqdm

for q in tqdm(questions):
    print(q)

    result = await agent.run(user_prompt=q)
    print(result.output)

    log_interaction_to_file(
        agent,
        result.new_messages(),
        source='ai-generated'
    )

    print()


  from .autonotebook import tqdm as notebook_tqdm
  0%|          | 0/10 [00:00<?, ?it/s]

What operating system is ideal for the data engineering course?


 10%|█         | 1/10 [00:00<00:06,  1.39it/s]

No specific information was found regarding the ideal operating system for the data engineering course. However, most data engineering tasks can be performed on any operating system that supports the necessary tools and software, such as Linux, macOS, or Windows. It's generally recommended to use a 64-bit operating system with at least 8 GB of RAM and a multi-core processor for optimal performance. 

For more information, you can refer to the course materials or contact the course instructor.

How do I add Anaconda to my system's PATH?


 20%|██        | 2/10 [00:01<00:07,  1.03it/s]

It appears that the search results did not provide a clear answer to your question. However, I can offer some general guidance.

To add Anaconda to your system's PATH, you typically need to follow these steps:

1. **Find the Anaconda installation directory**: This is usually located in your user directory, such as `~/anaconda3` or `~/opt/anaconda3`.
2. **Locate the Anaconda bin directory**: Within the Anaconda installation directory, there should be a `bin` directory that contains the executable files for Anaconda.
3. **Add the bin directory to your system's PATH environment variable**: The exact steps to do this vary depending on your operating system. On Linux or macOS, you can usually add a line to your shell configuration file (e.g., `~/.bashrc` or `~/.zshrc`) that exports the PATH variable with the Anaconda bin directory included. On Windows, you can right-click on "Computer" or "This PC", select "Properties", then click on "Advanced system settings" and finally click on "Environm

 30%|███       | 3/10 [00:02<00:06,  1.09it/s]

It seems that the search results did not provide the information you are looking for. 

To find the network name in Docker, you can use the command `docker network ls` in your terminal. This will list all the networks available in your Docker environment, along with their names and other details.

If you are looking for a specific network, you can use the `docker network inspect` command followed by the network ID or name to get more detailed information about that network.

For more information, you can refer to the [Docker documentation](https://docs.docker.com/engine/reference/commandline/network_ls/) on Docker networking. 

Please note that the information provided is general guidance and may not be specific to your course materials. If you have any further questions or need more specific information, feel free to ask.

How do I set the environment variable for DLT_DATA_DIR?


 40%|████      | 4/10 [00:03<00:05,  1.08it/s]

Unfortunately, the search results did not provide a specific answer to your question. However, I can offer some general guidance.

The environment variable DLT_DATA_DIR is likely used to specify the directory where data for the course is stored. To set an environment variable, you can typically use the export command in your terminal, followed by the variable name and the desired value. For example:

export DLT_DATA_DIR=/path/to/data/directory

You can replace /path/to/data/directory with the actual path where you want to store the data.

If you are using a Jupyter notebook or another environment, the process for setting environment variables may be different. You may need to consult the documentation for your specific environment or contact the course instructors for further guidance.

For more information, you can refer to the course materials, such as the [README](https://github.com/DataTalksClub/faq/blob/main/README.md) or other relevant files in the course repository.

What is the

 50%|█████     | 5/10 [00:03<00:03,  1.48it/s]

<function=text_search{"query": "UnicodeDecodeError pandas dataframe solution"}</function>

Why is 'PULocationID' not recognized in PostgreSQL?


 60%|██████    | 6/10 [00:04<00:02,  1.92it/s]

<function=text_search{"query": "PULocationID PostgreSQL not recognized"}</function>

How can I use pgcli within a Docker container?


 70%|███████   | 7/10 [00:04<00:01,  2.38it/s]

<function=text_search {"query": "pgcli Docker container"} </function>

How do I convert the difference between two TimestampType values to hours in PySpark?


 80%|████████  | 8/10 [00:04<00:00,  2.78it/s]

<function=text_search {"query": "PySpark TimestampType to hours"} </function>

How do I integrate a DLT pipeline into Apache Airflow?


 90%|█████████ | 9/10 [00:05<00:00,  1.64it/s]

Unfortunately, the search did not return any relevant results. 

To integrate a DLT (Data Loading and Transformation) pipeline into Apache Airflow, you would typically use Airflow's built-in operators and hooks to interact with your DLT system.

Here are some general steps you can follow:
1. Define your DLT pipeline: Determine the data sources, data transformations, and data loading processes that make up your DLT pipeline.
2. Create Airflow DAG: Create a new DAG in Airflow that outlines the tasks and dependencies involved in your DLT pipeline.
3. Use Airflow operators: Use Airflow's built-in operators (such as BashOperator, PythonOperator, etc.) to execute the tasks in your DLT pipeline. You can also create custom operators if needed.
4. Configure hooks: Configure Airflow hooks to interact with your DLT system. For example, you might use a hook to connect to a database or a cloud storage service.
5. Test and deploy: Test your DAG to ensure it runs successfully, then deploy it to your 

100%|██████████| 10/10 [00:06<00:00,  1.50it/s]

It seems that the search results did not provide any relevant information to answer your question. 

However, I can provide some general guidance on how to resolve the issue of pip not working with Anaconda. 

1. Check if you have the latest version of Anaconda and pip installed. You can do this by running `conda update --all` and `pip install --upgrade pip` in your terminal.
2. Make sure that you are using the correct version of pip. Anaconda comes with its own version of pip, which can be different from the system's default pip. You can check which version of pip you are using by running `which pip` in your terminal.
3. Try resetting the package cache by running `conda clean --all` in your terminal.
4. If none of the above steps work, you can try reinstalling Anaconda or seeking help from the Anaconda community or support team.

For more information, you can refer to the [official Anaconda documentation](https://github.com/DataTalksClub/faq/blob/main/README.md) or [troubleshooting gu




In [None]:
eval_set = []

for log_file in LOG_DIR.glob('*.json'):
    if 'faq_agent_v2' not in log_file.name:
        continue

    log_record = load_log_file(log_file)
    if log_record['source'] != 'ai-generated':
        continue

    eval_set.append(log_record)

In [None]:
eval_results = []

for log_record in tqdm(eval_set):
    eval_result = await evaluate_log_record(eval_agent, log_record)
    eval_results.append((log_record, eval_result))


 10%|█         | 1/10 [00:01<00:10,  1.22s/it]


ModelHTTPError: status_code: 400, model_name: llama-3.3-70b-versatile, body: {'error': {'message': "Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.", 'type': 'invalid_request_error', 'code': 'tool_use_failed', 'failed_generation': '<function=final_result><![CDATA[{\n"checklist": [\n  {\n    "check_name": "Ideal Operating System",\n    "check_pass": false,\n    "justification": "No specific information was found regarding the ideal operating system for the data engineering course. However, most data engineering tasks can be performed on any operating system that supports the necessary tools and software, such as Linux, macOS, or Windows."\n  },\n  {\n    "check_name": "System Recommendations",\n    "check_pass": true,\n    "justification": "It\'s generally recommended to use a 64-bit operating system with at least 8 GB of RAM and a multi-core processor for optimal performance."\n  }\n],\n"summary": "No specific operating system is ideal for the data engineering course, but a 64-bit operating system with at least 8 GB of RAM and a multi-core processor is recommended for optimal performance. For more information, refer to the course materials or contact the course instructor. [Course Materials](https://github.com/DataTalksClub/faq/blob/main/README.md)"\n}]></function>'}}

In [None]:
#!uv add pandas

In [53]:
rows = []

for log_record, eval_result in eval_results:
    messages = log_record['messages']

    row = {
        'file': log_record['log_file'].name,
        'question': messages[0]['parts'][0]['content'],
        'answer': messages[-1]['parts'][0]['content'],
    }

    checks = {c.check_name: c.check_pass for c in eval_result.checklist}
    row.update(checks)

    rows.append(row)


In [54]:
import pandas as pd

df_evals = pd.DataFrame(rows)

In [55]:
df_evals.mean(numeric_only=True)


Anaconda Version    1.0
Pip Version         1.0
Package Cache       1.0
dtype: float64

In [56]:
def evaluate_search_quality(search_function, test_queries):
    results = []
    
    for query, expected_docs in test_queries:
        search_results = search_function(query, num_results=5)
        
        # Calculate hit rate
        relevant_found = any(doc['filename'] in expected_docs for doc in search_results)
        
        # Calculate MRR
        for i, doc in enumerate(search_results):
            if doc['filename'] in expected_docs:
                mrr = 1 / (i + 1)
                break
        else:
            mrr = 0
            
        results.append({
            'query': query,
            'hit': relevant_found,
            'mrr': mrr
        })
    return results

