In [3]:
from elasticsearch import Elasticsearch, exceptions

In [2]:
es = Elasticsearch(hosts="http://localhost:9200")

In [82]:
def search_videos(query: str, index_name: str="podcasts", size: int = 5) -> list[dict]:
    """
    Performs a full-text search across video titles and subtitles using Elasticsearch.
    
    This function utilizes a 'multi_match' query with a 'best_fields' type. It prioritizes 
    matches found in the title over subtitles and applies a custom English analyzer.

    Args:
        query (str): The search terms provided by the user.
        index_name (str, optional): The Elasticsearch index to query. 
            Defaults to "podcasts".
        size (int, optional): The maximum number of search results to return. 
            Defaults to 5.

    Returns:
        list: A list of dictionaries containing highlighted snippets and the 
            associated 'video_id'. Returns an empty list if an error occurs.

    Example:
        >>> results = search_videos("machine learning", size=1)
        >>> print(results)
        [
            {
                'title': ['Intro to *Machine Learning*'],
                'subtitles': ['In this video, we discuss *machine learning* basics...'],
                'video_id': 'vid_001'
            }
        ]

    Raises:
        ElasticsearchException: Logged internally, returns empty list on failure.
    """
    
    body = {
        "size": size,
        "query": {
            "multi_match": {
                "query": query,
                "fields": ["title^3", "subtitles"],
                "type": "best_fields",
                "analyzer": "my_english_analyzer"
            }
        },
        "highlight": {
            "pre_tags": ["*"],
            "post_tags": ["*"],
            "fields": {
                "title": {"fragment_size": 150, "number_of_fragments": 1},
                "subtitles": {
                    "fragment_size": 150, 
                    "number_of_fragments": 3, 
                    "order": "score"
                }
            }
        }
    }
    
    try:
        response = es.search(index=index_name, body=body)
        hits = response.body['hits']['hits']
        
        results = []
        for hit in hits:
            # Safely get highlights; default to empty dict if no matches found in fields
            highlight = hit.get('highlight', {})
            highlight['video_id'] = hit['_id']
            results.append(highlight)
            return results

    except exceptions.NotFoundError:
        print(f"Error: Index '{index_name}' not found.")
    except exceptions.ConnectionError:
        print("Error: Could not connect to Elasticsearch.")
    except exceptions.RequestError as e:
        print(f"Error: Invalid search request. {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    
    return []

In [73]:
search_videos("how to fight trauma")

[{'subtitles': ['*How* do we all become better at\n27:39 managing our *trauma*? What does that look\n27:41 like in our day-to-day?',
   "So this is *how*\n12:37 *trauma* is passed on from one generation\n12:40 to the next not intentionally by the\n12:43 most loving parents cuz we can't help it",
   'The word *trauma*\n5:50 comes from a Greek word for wound or\n5:51 wounding. So *trauma* is a wound.'],
  'title': ['*How* to understand & heal your *trauma*: Gabor Maté, M.D. | mbg Podcast'],
  'video_id': 'C-1Ukfaf7co'},
 {'subtitles': ['Mate has completely\n2:24 transformed *how* the world sees, talks\n2:28 about, and understands *trauma*.',
   '*How* does unresolved *trauma* impact\n44:42 the way that you deal with stress as an\n44:44 adult?',
   'So\n44:18 *how* do you like *how* does this sort of\n44:21 unresolved *trauma* from childhood that I\n44:25 would imagine you know a lot of us learn\n44:27 about'],
  'title': ['Gabor Maté on *Trauma* & *How* to Heal'],
  'video_id': 'tool-R8V

In [201]:
def get_subtitles_by_id(video_id: str) -> dict:
    """Function to help receive video transcripts from the elasticsearch

     Args:
        video_id (str): The youtube video id for which user can request subtitles"

    Returns:
        dict: A dictionary with video id, title of the video and its subtitles
    """

    result = es.get(index="podcasts", id=video_id)
    return result["_source"]

def timestamp_to_seconds(timestamp: str) -> int:
    """
    Converts 'HH:MM:SS', 'MM:SS', or 'S' formats to total seconds.
    Use this to generate the '?t=seconds' parameter for YouTube URLs.
    
    Examples:
    '01:02:03' -> 3723
    '05:10'    -> 310
    
    'https://youtu.be/abc (at 01:02:03)' -> https://youtu.be/abc?t=3723
    'https://youtu.be/abc (at 05:10)'    -> https://youtu.be/abc?t=310
    'https://youtu.be/abc (at 00:45)'    -> https://youtu.be/abc?t=45
    """
    parts = timestamp.split(':')
    # Multipliers for seconds, minutes, hours (reversed to match split order)
    multipliers = [1, 60, 3600]
    
    # Reverse the parts so seconds is always index 0
    parts = parts[::-1]
    
    total_seconds = sum(int(part) * multiplier for part, multiplier in zip(parts, multipliers))
    return total_seconds

In [146]:
from pydantic_ai.agent import Agent
from pydantic_ai.messages import ModelRequest, ModelResponse, ToolReturnPart, UserPromptPart, ToolCallPart, TextPart
from pydantic import BaseModel

In [210]:
research_instructions = """
## Role
You are a Video Transcript Researcher. Your goal is to find precise information within a library of YouTube video transcripts stored in Elasticsearch and provide answers with verifiable citations.

## Research process:

Stage 1: Initial Exploration  
- Using your own knowledge of the topic, perform 3-5 broad search queries to understand the main topic
  and identify related areas. Only use search function.
- After the initial search exploration, summarize key concepts, definitions, and major themes.
- You MUST inspect the full transcript to be able to provide a better write up for the user.

Stage 2: Deep Investigation 
- Perform 5-6 refined queries focusing on depth.
- Inspect relevant documents for specific mechanisms, case studies, and technical details.
- Gather diverse viewpoints and data to strengthen depth and accuracy.

## Operation Mode (ReAct)
For every request, you must follow these steps:
1. **THOUGHT**: Analyze the user's request. What are the key search terms? Do I need to search for a specific video or across all transcripts? 
2. **ACTION**: Call the `search_videos` tool with a refined query, and leverage `get_subtitles_by_id` tool for subtitles.
3. **OBSERVATION**: Review the snippets, timestamps, and metadata returned from Elasticsearch.
4. **THOUGHT**: Does the data answer the question? If not, refine the search. If yes, synthesize the final answer.
5. **FINAL ANSWER**: Combine the final answer with all the reference videos. Make sure the video url has a combined video ID and the resulting seconds as a link: `https://youtu.be/[ID]?t=[SECONDS].

## Citation Rules
- Every claim MUST be followed by a youtube link and a timestamp as per Action stage.
- Always provide the link as returned by the tool.
- Display the human-readable time (e.g., 05:20) in parentheses next to the youtube link for the user's convenience.

## Error Handling
- If no results are found, state that you couldn't find information in the transcripts.
- Do not hallucinate timestamps; only use what is returned in the `Observation` phase.
- Do not make incorrect urls, only what is returned at the Action stage
"""

In [211]:
summarization_instructions = """
Your task is to summarize the provided YouTube transcript for a specific topic.

Select the parts of the transcripts that are relevant for the topic and search queries.

Format: 
paragraph with discussion (timestamp)
""".strip()

In [212]:
research_agent = Agent(
    name="research",
    instructions=research_instructions,
    model='openai:gpt-4o-mini',
    tools=[search_videos, get_subtitles_by_id]
)

summarization_agent = Agent(
    name='summarization',
    instructions=summarization_instructions,
    model='openai:gpt-4o-mini'
)

In [213]:
query = "How to manage trauma?"

In [214]:
results = await research_agent.run(query)

In [215]:
print(results.output)

Managing trauma is a complex process that involves understanding both the impact of traumatic experiences and the biological and emotional mechanisms that underlie them. Here’s a synthesis of key concepts, strategies, and insights from a relevant interview with Dr. Gabor Maté on trauma and healing.

### Key Concepts of Trauma
1. **Definition of Trauma**:
   - Trauma is not merely an event (e.g., abuse, war, loss) but is characterized by the internal emotional and psychological wounds that one carries as a result. It is defined as a psychological wound that persists and influences one’s mental and physical health (05:51, [source](https://youtu.be/C-1Ukfaf7co?t=351)).

2. **Big T vs. Little t Trauma**:
   - **Big T Traumas** (e.g., war, sexual abuse) are typically recognized as significant traumatic events.
   - **Little t Traumas** refer to the everyday neglects and emotional injuries that occur when essential needs of children (like attachment, being held, and emotional expressiveness)

In [149]:
for message in results.new_messages():
    if isinstance(message, ModelRequest):
        if isinstance(message.parts[0], UserPromptPart):
            print(f"User Prompt: {message.parts[0].content}")
        elif isinstance(message.parts[0], ToolReturnPart):
            print(f"Tool Called: {message.parts[0].tool_name}")
        else:
            print("Nothing else important")
    elif isinstance(message, ModelResponse):
        if isinstance(message.parts[0], ToolCallPart):
            print(f"Tool called as per response: {message.parts[0].tool_name} for query {message.parts[0].args}")
        elif isinstance(message.parts[0], TextPart):
            print(f"Answer finalised: {message.parts[0].content}")
        else:
            print("Nothing else important")

User Prompt: How to manage trauma?
Tool called as per response: search_videos for query {"query":"manage trauma"}
Tool Called: search_videos
Tool called as per response: get_subtitles_by_id for query {"video_id":"C-1Ukfaf7co"}
Tool Called: get_subtitles_by_id
Answer finalised: Managing trauma requires a multifaceted approach that involves understanding the nature of trauma and taking concrete steps to heal from it. Here are some key insights and methods discussed in a conversation with Dr. Gabor Maté about managing and healing trauma:

1. **Understanding Trauma**: Trauma is not solely about the events that happen to us (big T traumas), like abuse or war; it is also about the emotional responses and internal experiences (small t traumas) that persist and leave a lasting impact on our psyche. Healing involves recognizing these wounds and their effects on our emotional and physical health. 

   - *Citation*: Gabor Maté explains, “Trauma is not what happened to you, it's what happened insi

In [193]:
import asyncio

In [194]:
from pydantic_ai.messages import FunctionToolCallEvent

class NamedCallback:
    """Stream handler that prints the tool calls triggered by an agent."""
    
    def __init__(self, agent: Agent):
        self.agent_name = agent.name
    
    async def _print_function_calls(self, ctx, event) -> None:
        # Detect nested streams
        if hasattr(event, "__aiter__"):
            async for sub_event in event:
                await self._print_function_calls(ctx, sub_event)
            return
        
        if isinstance(event, FunctionToolCallEvent):
            tool_name = event.part.tool_name
            args = event.part.args
            print(f"TOOL CALL ({self.agent_name}): {tool_name}({args})")
    
    async def __call__(self, ctx, event) -> None:
        await self._print_function_calls(ctx, event)

In [195]:
research_callback = NamedCallback(research_agent)

In [197]:
result = await research_agent.run("What is trauma?", event_stream_handler=research_callback)

TOOL CALL (research): search_videos({"query":"What is trauma"})
TOOL CALL (research): get_subtitles_by_id({"video_id":"C-1Ukfaf7co"})


In [179]:
user_query = 'What is trauma and how do we address it in life?'
search_queries = [
    "What is a trauma response",
    "Managing Trauma",
    "Improving relationships with others by understanding trauma"
]

subtitles = get_subtitles_by_id('C-1Ukfaf7co')['subtitles']

prompt = f"""
user query:
{user_query}

search engine queries: 
{'\n'.join(search_queries)}

subtitles:
{subtitles}
""".strip()

In [180]:
summary_result = await summarization_agent.run(prompt)
print(summary_result.output)

Trauma can be defined not just by the events that occur, but by the psychological wounds these events leave on individuals, shaping their functioning in life (6:38). The concept of trauma encompasses both "big T" traumas, such as war and abuse, and "little t" traumas, which are everyday emotional neglects and unmet needs. For instance, telling parents not to pick up a crying child can inflict psychological wounds by ignoring the child's fundamental need for connection (7:06). Resolving trauma requires recognizing how these experiences of emotional neglect and large traumatic events affect one's health and relationships in adulthood, impacting one’s emotional and physical well-being profoundly (5:23).

Managing trauma involves understanding its nature and recognizing its manifestations in our emotional responses. As discussed, trauma is not just what happens to an individual but what happens within them, leading to unhealed wounds that manifest in various forms (16:00). Healing involves

In [183]:
messages = results.new_messages()

In [189]:
for m in messages:
    for p in m.parts:
        kind = p.part_kind
        if kind == 'user-prompt':
            print('USER:', p.content)
            print()
        if kind == 'text':
            print('ASSISTANT:', p.content)
            print()
        if kind == 'tool-call':
            print('TOOL CALL:', p.tool_name, p.args)

USER: How to manage trauma?

TOOL CALL: search_videos {"query":"manage trauma"}
TOOL CALL: get_subtitles_by_id {"video_id":"C-1Ukfaf7co"}
ASSISTANT: Managing trauma requires a multifaceted approach that involves understanding the nature of trauma and taking concrete steps to heal from it. Here are some key insights and methods discussed in a conversation with Dr. Gabor Maté about managing and healing trauma:

1. **Understanding Trauma**: Trauma is not solely about the events that happen to us (big T traumas), like abuse or war; it is also about the emotional responses and internal experiences (small t traumas) that persist and leave a lasting impact on our psyche. Healing involves recognizing these wounds and their effects on our emotional and physical health. 

   - *Citation*: Gabor Maté explains, “Trauma is not what happened to you, it's what happened inside you” (00:21:26) [https://youtu.be/C-1Ukfaf7co?t=1286].

2. **Expressing Emotions**: Individuals who suppress their emotions, e

In [203]:
def fix_youtube_links(text: str) -> str:
    words = text.split()
    processed_words = []
    
    for word in words:
        # Check if the word is a YouTube link with the specific suffix
        if ("youtube.com" in word or "youtu.be" in word) and "?t=" in word:
            # 1. Split the URL into the base and the timestamp
            # Example: "https://youtu.be/abc?t=1:02" -> ["...abc", "1:02"]
            base_url, timestamp_raw = word.split("?t=", 1)
            
            # 2. Clean up any trailing punctuation (like a period or closing bracket)
            timestamp_clean = timestamp_raw.rstrip('.,)]')
            trailing_char = timestamp_raw[len(timestamp_clean):]
            
            try:
                # 3. Convert and Reconstruct
                seconds = timestamp_to_seconds(timestamp_clean)
                
                # Use &t= for long URLs that already have ?v=, otherwise ?t=
                sep = "&t=" if "watch?v=" in base_url else "?t="
                new_word = f"{base_url}{sep}{seconds}{trailing_char}"
                processed_words.append(new_word)
            except (ValueError, IndexError):
                # If splitting fails or timestamp isn't valid, keep original
                processed_words.append(word)
        else:
            processed_words.append(word)
            
    return " ".join(processed_words)

# --- Test ---
sample = "Check the answer at https://youtu.be/dQw4w9WgXcQ?t=1:02:03. Also see https://youtube.com/watch?v=H63RbTuHTy0?t=520"
print(fix_youtube_links(sample))

Check the answer at https://youtu.be/dQw4w9WgXcQ?t=3723. Also see https://youtube.com/watch?v=H63RbTuHTy0&t=520


In [None]:
from pydantic_ai import RunContext
import json

In [218]:
async def summarize(ctx: RunContext, video_id: str) -> str:
    """
    Generate a summary for a video based on the conversation history,
    search queries, and the video's subtitles.
    """
    user_queries = []
    search_queries = []

    for m in ctx.messages:
        for p in m.parts:
            kind = p.part_kind
            if kind == 'user-prompt':
                user_queries.append(p.content)
            elif kind == 'tool-call':
                if p.tool_name == 'search_videos':
                    args = json.loads(p.args)
                    query = args['query']
                    search_queries.append(query)
    subtitles = get_subtitles_by_id(video_id=video_id)['subtitles']
    prompt = f"""
        user query:
        {'\n'.join(user_queries)}
        
        search engine queries: 
        {'\n'.join(search_queries)}
        
        subtitles:
        {subtitles}
        """
    summary_result = await summarization_agent.run(prompt)
    return summary_result.output

In [219]:
research_agent = Agent(
    name="research",
    instructions=research_instructions,
    model='openai:gpt-4o-mini',
    tools=[search_videos, summarize]
)

In [222]:
async def fetch_results(query: str) -> str:

    result = await research_agent.run(query, event_stream_handler=research_callback)
    clean_result = fix_youtube_links(result.output)
    return clean_result

In [224]:
final_result = await fetch_results("Please explain to me what is trauma?")
print(final_result)

TOOL CALL (research): search_videos({"query":"trauma"})
TOOL CALL (research): search_videos({"query":"definition of trauma"})
TOOL CALL (research): search_videos({"query":"types of trauma"})
TOOL CALL (research): search_videos({"query":"effects of trauma"})
TOOL CALL (research): search_videos({"query":"trauma recovery"})
TOOL CALL (research): summarize({"video_id": "C-1Ukfaf7co"})
TOOL CALL (research): summarize({"video_id": "hhhTWYDPAXI"})
Trauma is defined as a psychological wound that deeply impacts an individual's emotional and psychological functioning, rather than just the distressing events themselves. The term "trauma" originates from a Greek word for "wound" (5:50). It can be categorized as "big T" trauma—major events such as war, abuse, or the death of a loved one—and "little t" trauma, which refers to more subtle experiences that can still create significant emotional wounds, like childhood neglect (10:12). The effects of trauma are profound and can extend into both mental a