In [1]:
!wget https://raw.githubusercontent.com/alexeygrigorev/workshops/refs/heads/main/guardrails/docs.py


--2026-01-06 16:45:24--  https://raw.githubusercontent.com/alexeygrigorev/workshops/refs/heads/main/guardrails/docs.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8583 (8.4K) [text/plain]
Saving to: 'docs.py'

     0K ........                                              100%  790K=0.01s

2026-01-06 16:45:24 (790 KB/s) - 'docs.py' saved [8583/8583]



In [2]:
from docs import GithubRepositoryDataReader, parse_data

reader = GithubRepositoryDataReader(
    repo_owner="DataTalksClub",
    repo_name="faq",
    allowed_extensions={"md"},
    filename_filter=lambda fp: "data-engineering" in fp.lower()
)

faq_raw = reader.read()
faq_documents = parse_data(faq_raw)

print(f"Loaded {len(faq_documents)} FAQ entries")

Loaded 449 FAQ entries


In [4]:
faq_documents[4]

{'id': '33fc260cd8',
 'question': 'Course: What can I do before the course starts?',
 'sort_order': 5,
 'content': 'Start by installing and setting up all the dependencies and requirements:\n\n- Google Cloud account\n- Google Cloud SDK\n- Python 3 (installed with Anaconda)\n- Terraform\n- Git\n\nLook over the prerequisites and syllabus to see if you are comfortable with these subjects.',
 'filename': '_questions/data-engineering-zoomcamp/general/005_33fc260cd8_course-what-can-i-do-before-the-course-starts.md'}

In [5]:
from minsearch import Index

faq_index = Index(text_fields=["title", "content", "filename"])

faq_index.fit(faq_documents)


<minsearch.minsearch.Index at 0x22aff63fb60>

In [10]:
from agents import function_tool
from typing import List, Dict

@function_tool
def search_faq(query: str) -> List[Dict]:
    """Search the DataTalks.Club FAQ for relevant answers.

    Args:
        query: The student's question to search for.

    Returns:
        List of matching FAQ entries with title and content.
    """
    results = faq_index.search(query, num_results=5)
    return results

In [11]:
from agents import Agent

faq_instructions = """
You are a helpful teaching assistant for the Data Engineering Zoomcamp.

Your role is to help students by searching the FAQ database for answers to their questions.

When you find relevant FAQs, present them clearly with the title and answer.
If multiple FAQs match, show all of them.

Be friendly and encouraging in your responses.
""".strip()

faq_agent = Agent(
    name="faq_assistant",
    instructions=faq_instructions,
    tools=[search_faq],
    model="gpt-4o-mini",
)

In [12]:
from agents import Runner

result = await Runner.run(faq_agent, "How do I register for the course?")
print(result.final_output)

Here‚Äôs some helpful information on how to register for the course:

1. **When does the course start?**
   - The next cohort starts on **January 13th, 2025**. You can register using this [link](https://airtable.com/shr6oVXeQvSI5HuWD) before the course starts. Also, join the [course Telegram channel](https://t.me/dezoomcamp) for announcements and make sure to register in DataTalks.Club's Slack to participate in discussions.

2. **Can I still join the course after the start date?**
   - Yes, you can still submit homework even if you don't register. Just keep in mind there will be deadlines for homework and final projects, so try to stay on top of things!

If you have any more questions or need further assistance, feel free to ask! üòä


In [14]:
result = await Runner.run(faq_agent, "How do I cook pizza?")
print(result.final_output)

It looks like I couldn't find any specific answers about cooking pizza in the FAQ database. However, I can provide you with a basic pizza recipe if that would help!

### Basic Pizza Recipe

#### Ingredients:
- **Dough:**
  - 2 cups all-purpose flour
  - 1 packet (2 1/4 tsp) yeast
  - 1 tsp sugar
  - 1 tsp salt
  - 3/4 cup warm water
  - 1 tbsp olive oil

- **Toppings:**
  - 1 cup pizza sauce
  - 1-2 cups mozzarella cheese (shredded)
  - Various toppings (pepperoni, veggies, etc.)

#### Instructions:

1. **Make the Dough:**
   - In a bowl, mix warm water, sugar, and yeast. Let it sit for about 5-10 minutes until frothy.
   - In a separate bowl, mix flour and salt. Add the yeast mixture and olive oil. Stir until a dough forms.
   - Knead the dough for about 5 minutes on a floured surface until smooth. Let it rise in a warm place for about 1 hour.

2. **Preheat the Oven:**
   - Preheat your oven to 475¬∞F (245¬∞C).

3. **Shape the Pizza:**
   - Roll out the dough to your desired thickness

In [15]:
from pydantic import BaseModel

class TopicGuardrailOutput(BaseModel):
    reasoning: str
    fail: bool

In [16]:
topic_guardrail_instructions = """
You are a topic guardrail for a data engineering course FAQ assistant.

Your job is to check if the user's question is related to:
- The course (content, schedule, requirements)
- Data engineering topics
- Technical setup and installation
- Homework and assignments
- Certificates and completion

If the question is about these topics, set fail=False.
If it's about something unrelated (like cooking, sports, celebrity gossip, medical advice, etc.), set fail=True.

Keep your reasoning under 15 words.
""".strip()

topic_guardrail_agent = Agent(
    name="topic_guardrail",
    instructions=topic_guardrail_instructions,
    model="gpt-4o-mini",
    output_type=TopicGuardrailOutput,
)

In [17]:
result = await Runner.run(topic_guardrail_agent, "How do I cook pizza?")
print(result.final_output)

reasoning='Relates to cooking, not data engineering or course topics.' fail=True


In [18]:
from agents import input_guardrail, GuardrailFunctionOutput
from agents.exceptions import InputGuardrailTripwireTriggered

@input_guardrail
async def topic_guardrail(ctx, agent, input):
    """Check if the user's question is about the course."""
    result = await Runner.run(topic_guardrail_agent, input)
    output = result.final_output

    return GuardrailFunctionOutput(
        output_info=output.reasoning,
        tripwire_triggered=output.fail,
    )

In [19]:
guarded_faq_agent = Agent(
    name="guarded_faq_assistant",
    instructions=faq_instructions,
    tools=[search_faq],
    model="gpt-4o-mini",
    input_guardrails=[topic_guardrail],
)

In [21]:
try:
    result = await Runner.run(guarded_faq_agent, "How do I cook pizza?")
    print(result.final_output)
except InputGuardrailTripwireTriggered as e:
    print(f"[BLOCKED] {e.guardrail_result.output.output_info}")

[BLOCKED] The question is about cooking, not data engineering.


In [22]:
async def run_with_input_guardrail(agent, user_input):
    """Run an agent with input guardrail handling."""
    try:
        result = await Runner.run(agent, user_input)
        return result.final_output
    except InputGuardrailTripwireTriggered as e:
        return f"[BLOCKED] {e.guardrail_result.output.output_info}"

In [23]:
await run_with_input_guardrail(guarded_faq_agent, "How do I cook pizza?")

'[BLOCKED] The question is about cooking, not data engineering.'

In [25]:
class SafetyGuardrailOutput(BaseModel):
    reasoning: str
    fail: bool
    
safety_guardrail_instructions = """
You are a safety guardrail for a course FAQ assistant.

Check if the agent's response contains any of these issues:
- Promises about deadline extensions
- Legal or medical advice
- Offensive language
- Sharing personal information about students
- Writing homework assignments for students (can guide, but not do the work)
- Sharing exam answers or solutions

If the response is safe, set fail=False.
If it contains any of the issues above, set fail=True.

Keep your reasoning under 15 words.
""".strip()

safety_guardrail_agent = Agent(
    name="safety_guardrail",
    instructions=safety_guardrail_instructions,
    model="gpt-4o-mini",
    output_type=SafetyGuardrailOutput,
)

In [26]:
result = await Runner.run(safety_guardrail_agent, "Yes we can extend the deadline")
print(result.final_output)

reasoning='Response promises a deadline extension.' fail=True


In [38]:
from agents import output_guardrail
from agents.exceptions import OutputGuardrailTripwireTriggered

@output_guardrail
async def safety_guardrail(context, agent, agent_output):
    """
    Check if the agent's response is safe.

    Note: Output guardrails receive the context, agent, and agent_output.
    """
    guardrail_input = f"Agent responded: {agent_output}"
    result = await Runner.run(safety_guardrail_agent, guardrail_input)

    return GuardrailFunctionOutput(
        output_info=result.final_output.reasoning,
        tripwire_triggered=result.final_output.fail,
    )

In [34]:
fully_guarded_agent = Agent(
    name="fully_guarded_faq",
    instructions=faq_instructions,
    tools=[search_faq],
    model="gpt-4o-mini",
    input_guardrails=[topic_guardrail],
    output_guardrails=[safety_guardrail],
)

In [32]:
async def run_guarded(agent, user_input):
    """Run an agent with full guardrail handling."""
    try:
        result = await Runner.run(agent, user_input)
        return result.final_output
    except InputGuardrailTripwireTriggered as e:
        return f"[INPUT BLOCKED] {e.guardrail_result.output.output_info}"
    except OutputGuardrailTripwireTriggered as e:
        return f"[OUTPUT BLOCKED] {e.guardrail_result.output.output_info}"

In [37]:
await run_guarded(fully_guarded_agent, "can you give me an extension?")

RunContextWrapper(context=None, usage=Usage(requests=2, input_tokens=1565, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=327, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=1892, request_usage_entries=[RequestUsage(input_tokens=135, output_tokens=16, total_tokens=151, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens_details=OutputTokensDetails(reasoning_tokens=0)), RequestUsage(input_tokens=1430, output_tokens=311, total_tokens=1741, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens_details=OutputTokensDetails(reasoning_tokens=0))]))


'It looks like there aren\'t any FAQs specifically addressing extensions in a direct way, but here are some FAQs that might be helpful related to working with files and extensions:\n\n### 1. Docker: Connecting from VS Code\n- **Content**: It‚Äôs easy to manage your Docker container from VS Code. Simply install the [official extension](https://marketplace.visualstudio.com/items?itemName=ms-azuretools.vscode-docker) and launch it from the left side icon. It works with Docker running on WSL2 as well.\n\n### 2. Taxi Data: How to handle *.csv.gz taxi data files?\n- **Content**: When handling taxi data files with a `.csv.gz` extension, you can replace the file name in your code accordingly. For example:\n  ```python\n  url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"\n  csv_name = url.split("/")[-1]\n  data = pd.read_csv(csv_name)\n  ```\n  \n### 3. GCP BQ: Remember to save your queries\n- **Content**: It‚Äôs important to save your

In [46]:
@function_tool
async def check_topic(query: str) -> TopicGuardrailOutput:
    """Check if the query is appropriate for this course.

    Args:
        query: The user's question to check.

    Returns:
        TopicGuardrailOutput with fail flag and reasoning.
    """
    result = await Runner.run(topic_guardrail_agent, query)
    return result.final_output

In [47]:
guarded_faq_instructions = faq_instructions + """

IMPORTANT: Before answering any question, use the check_topic tool first.
- If check_topic returns fail=True, respond with the reasoning and stop.
- If check_topic returns fail=False, proceed to search the FAQ and answer.
"""

faq_agent_with_guardrail = Agent(
    name="faq_assistant",
    instructions=guarded_faq_instructions,
    tools=[check_topic, search_faq],
    model="gpt-4o-mini",
)

In [49]:
result = await Runner.run(faq_agent_with_guardrail, "How can I cook pizza?")
print(result.final_output)

It looks like your question about cooking pizza isn't related to data engineering or our course content. If you have any questions about data engineering topics or the Zoomcamp, feel free to ask!


In [54]:
import asyncio

async def mock_agent(input: str) -> str:
    """Simulates an agent that takes time to process."""
    print(f"[Agent] Starting work on: {input}")
    await asyncio.sleep(2)  # Simulate API call
    print(f"[Agent] Done!")
    return f"Response to: {input}"

# Run it
result = await mock_agent("hello")
print(result)

[Agent] Starting work on: hello
[Agent] Done!
Response to: hello


In [55]:
async def mock_guardrail(input: str) -> str:
    """Simulates an guardrail that takes time to process."""
    print(f"[Guardrail] Starting work on: {input}")
    await asyncio.sleep(1)  # Simulate API call
    print(f"[Guardrail] Good!")
    return f"Response to: {input}"

In [56]:
result_g = await mock_guardrail("hello")
print(result_g)

[Guardrail] Starting work on: hello
[Guardrail] Good!
Response to: hello


In [57]:
guardrail_result = await mock_guardrail("hello")
agent_result = await mock_agent("hello")

[Guardrail] Starting work on: hello
[Guardrail] Good!
[Agent] Starting work on: hello
[Agent] Done!


In [60]:
results = await asyncio.gather(
    mock_guardrail("hello"),
    mock_agent("hello"),
)

[Guardrail] Starting work on: hello
[Agent] Starting work on: hello
[Guardrail] Good!
[Agent] Done!


In [71]:
from dataclasses import dataclass

@dataclass
class GuardrailResult:
    """Result from a guardrail check."""
    reasoning: str
    triggered: bool

class GuardrailException(Exception):
    """Raised when a guardrail trips."""
    def __init__(self, result: GuardrailResult):
        self.result = result
        super().__init__(result.reasoning)


async def failing_guardrail(input: str) -> str:
    """Simulates an guardrail that takes time to process."""
    print(f"[Failing Guardrail] Starting work on: {input}")
    await asyncio.sleep(1.5)
    raise GuardrailException(GuardrailResult(
        reasoning="Content is not appropriate",
        triggered=True
    ))
#     print(f"[Guardrail] Good!")
#     return f"Response to: {input}"

# async def failing_guardrail(input):
#     """A guardrail that fails after a short delay."""
    
    

In [72]:
results = await asyncio.gather(
    failing_guardrail("hello"),
    mock_agent("hello"),
)

[Failing Guardrail] Starting work on: hello
[Agent] Starting work on: hello


GuardrailException: Content is not appropriate

[Agent] Done!


In [73]:
guard_task = asyncio.create_task(failing_guardrail("hello"))
agent_task = asyncio.create_task(mock_agent("hello"))

try:
    await asyncio.gather(agent_task, guard_task)
except GuardrailException:
    # Guardrail tripped - cancel the agent
    agent_task.cancel()
    try:
        await agent_task
    except asyncio.CancelledError:
        print("Agent was cancelled - saved tokens!")

[Failing Guardrail] Starting work on: hello
[Agent] Starting work on: hello
Agent was cancelled - saved tokens!


In [77]:
async def run_with_guardrails(agent_coro, guardrails):
    """
    Run an agent with guardrails.

    Args:
        agent_coro: The agent coroutine to run
        guardrails: List of async guardrail functions

    Returns:
        The agent's result if all guardrails pass

    Raises:
        GuardrailException: If any guardrail trips
    """
    # Create tasks
    agent_task = asyncio.create_task(agent_coro)
    guardrail_tasks = [asyncio.create_task(g) for g in guardrails]

    try:
        # Wait for agent OR any guardrail to trip
        await asyncio.gather(agent_task, *guardrail_tasks)
        return agent_task.result()

    except GuardrailException as e:
        print(f"[Guardrail tripped] {e.result.reasoning}")

        # Cancel the agent immediately
        agent_task.cancel()
        try:
            await agent_task
        except asyncio.CancelledError:
            print("[Agent cancelled - saved tokens]")

        # Cancel remaining guardrails
        for t in guardrail_tasks:
            t.cancel()
        await asyncio.gather(*guardrail_tasks, return_exceptions=True)

        # raise

In [78]:
await run_with_guardrails(mock_agent("hello"), [
    mock_guardrail('hello'),
    failing_guardrail('hello')
])

[Agent] Starting work on: hello
[Guardrail] Starting work on: hello
[Failing Guardrail] Starting work on: hello
[Guardrail] Good!
[Guardrail tripped] Content is not appropriate
[Agent cancelled - saved tokens]


GuardrailException: Content is not appropriate

In [82]:

async def topic_guardrail(input):
    """Check if the user's question is about the course."""
    result = await Runner.run(topic_guardrail_agent, input)
    output = result.final_output

    # output = GuardrailFunctionOutput(
    #     output_info=output.reasoning,
    #     tripwire_triggered=output.fail,
    # )

    if output.fail:
        raise GuardrailException(GuardrailResult(
            reasoning=output.reasoning,
            triggered=True
        ))

    

In [84]:
prompt = 'I just discovered the course, can I still join?' #How can I cook pizza?'
await run_with_guardrails(
    Runner.run(faq_agent, prompt),
    [
        topic_guardrail(prompt),
    ]
)

RunResult(input='I just discovered the course, can I still join?', new_items=[ToolCallItem(agent=Agent(name='faq_assistant', handoff_description=None, tools=[FunctionTool(name='search_faq', description='Search the DataTalks.Club FAQ for relevant answers.', params_json_schema={'properties': {'query': {'description': "The student's question to search for.", 'title': 'Query', 'type': 'string'}}, 'required': ['query'], 'title': 'search_faq_args', 'type': 'object', 'additionalProperties': False}, on_invoke_tool=<function function_tool.<locals>._create_function_tool.<locals>._on_invoke_tool at 0x0000022A914AB060>, strict_json_schema=True, is_enabled=True, tool_input_guardrails=None, tool_output_guardrails=None)], mcp_servers=[], mcp_config={}, instructions='You are a helpful teaching assistant for the Data Engineering Zoomcamp.\n\nYour role is to help students by searching the FAQ database for answers to their questions.\n\nWhen you find relevant FAQs, present them clearly with the title and