# Live Stream Companion Agent

This module demonstrates how to build agent application leveraging the live vidoe analysis from previous modules: 

- Module 20 (Visual Understanding) for scene change detection with visual analysis
- Module 30 (Audio Understanding) for speech-to-text transcription
- Module 40 (Modality Fused Understanding) for chapter and topic boundary detection


You will build this agent using [Strands Agents](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/model-providers/amazon-bedrock/) and [Amazon Bedrock AgentCore](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/what-is-bedrock-agentcore.html). It will processes live events from previous modules in real-time, serving as a viewing companion that provides instant context, generates ongoing summaries, and offers quick access to key moments—all designed to enhance viewer engagement and experience.

<img src="images/architecture.png" alt="Visual Understanding Architecture" width="800">

This module incrementally builds the agent's capabilities toward a final cloud-deployed version. You will progress through the following steps:

1. Introduction to Strands Agent
2. Building live agent memory for video understanding
3. Building agent tools for show summarization and key event search
4. Deploying the agent to production

## Import Modules

In [None]:
from strands import Agent, tool
from strands.models import BedrockModel
from IPython.display import Markdown, display, HTML, JSON
from contextlib import redirect_stdout
from bedrock_agentcore.memory import MemoryClient
import json_repair


%store -r AUDIOVISUAL_MODEL_ID
%store -r AWS_REGION
%store -r kb_id
%store -r ds_id
%store -r data_bucket_name
%store -r chapter_mem_id
%store -r chapter_session_id
%store -r actor_id

## Step 1: Introduction to Strands Agents

Strands Agent is an open-source SDK developed by AWS to help developers simplify agent development. At its core, Strands adopts a model-driven approach, allowing agents to leverage language models' natural capabilities. The agent building process is streamlined to just three components: a language model, a prompt that defines its behavior, and a set of tools it can use. 

Here is a quick example to illustrate and get you started on a simple agent that can answer questions about the current workshop.


In [None]:
# 1) Tools
@tool
def get_session_name() -> str:
    """Get the title of the current workshop session"""
    return "Build Live Video Understanding fSolutions Using Ageentic AI On AWS (IND319)"

@tool 
def get_session_detail() -> str:
    """Get the session detail and key learning objectives"""
    return """
    This workshop shows how to enhance live video processing using AI agents on AWS. Using Strands Agent SDK and Amazon Bedrock AgentCore, you'll learn to build intelligent systems that analyze live broadcast content in real-time, generate automated metadata, and enable improved content discovery. The hands-on exercises focus on implementing practical agent-driven solutions for media operations and content monetization.
    """

@tool
def get_session_location() -> str:
    """Get the location of the current workshop session"""
    return "Where: Mandalay Bay Ballroom K"

@tool
def get_speakers() -> str:
    """Get the location of the current workshop session"""
    return "Speakers: Maheshwaran G, Principal Solution Architect and James Wu, Principle AI/ML Specialist SA"

# 1) Bedrock model
bedrock_model = BedrockModel(
  model_id=AUDIOVISUAL_MODEL_ID,
  temperature=0.3,
  streaming=True,
)

# 3) prompt
system_prompt = """You're a helpful workshop assistant. Answer questions about the current session.
    Guideline: 
    1. answer user query in clear, concise, and polite manner.
    3. never ask questions back to the user.
    2. DO NOT answer query outside the context of this workshop session.
    """

# Initialize Agent
agent = Agent(
    model=bedrock_model,
    system_prompt=system_prompt,
    callback_handler=None,
    tools=[get_session_name, get_session_location, get_speakers, get_session_detail]
)

Here are some questions you can ask the agent:

- Who are the speakers for this workshop?
- What is the current location?
- What is the name of this session?
- What is this workshop all about?

In [None]:

result = agent(prompt="What is this workshop all about?")

# Parse result
try:
    response_text = result.get('output', result) if isinstance(result, dict) else str(result)
    display(Markdown(response_text))
except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(result)}")

## Step 2: Building live agent memory for video understanding

You saw how easy it is to build an agent using Strands. Now, you will tackle a more challenging problem: how can we enable our agent to understand live streaming video in real-time?

While we could build a tool that queries live video events from our previous lab, we're going to take a more elegant approach using agent memory. This method allows our agent to maintain immediate awareness of streaming content with minimal latency.

We'll leverage [Amazon Bedrock AgentCore Memory](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory.html), a fully managed AWS service designed for maintaining both short-term and long-term context in AI agents. By integrating this memory system, we can continuously ingest live stream events, allowing our agent to maintain up-to-the-moment awareness of what's happening in the video stream.

<img src="images/local_agent_memory.png" alt="Visual Understanding Architecture" width="800">

### Important Memory Parameters

**chapter_memory_id:** Unique identifier for a memory store, referencing the specific memory instance being used.

**actor_id:** Identifies the entity (in this case the user) that the memory belongs to, ensuring correct association with individuals or systems.

**session_id:** Unique identifier for a particular conversation or interaction (in this case one viewing session on the streaming platform), grouping all related messages from that session.

**namespaces:** Structured, hierarchical path used to logically group and organize memories, enabling separation by actor, session, strategy, or other criteria for clarity and access control.


In [None]:
from agentcore_helper import create_memory_execution_role
from datetime import datetime, timezone
import random
import string
import boto3
from botocore.client import Config

memory_client = MemoryClient(region_name=AWS_REGION)
agentcore_client = boto3.client('bedrock-agentcore', region_name=AWS_REGION)

## create a dummy user_id and session_id representing a unique user and their viewing session
rolling_summary_namespace = f"summary/{actor_id}/{chapter_session_id}"
semantic_namespace = f"semantic/{actor_id}"

print(f"User: '{actor_id}', started a new viewing session: '{chapter_session_id}'")

### Step memory hook

There are multiple ways to attach memory to your Strands agent. You can use [AgentCoreMemorySessionManager](https://github.com/aws/bedrock-agentcore-sdk-python/blob/main/src/bedrock_agentcore/memory/integrations/strands/README.md), create a Memory Hook to insert context into the system prompt, or attach memory as a tool for the agent to use. We will show both Memory Hook and Memory as a tool in this module.

Let's start with the memory hook. Strands Agents provide a hook feature, which is a callback mechanism that intercepts key agent lifecycle events (such as message addition or after tool invocation).

With this feature, we can create a memory hook to manage and synchronize short-term during specific agent state to ensure the agent maintains correct context as new events are streaming in.

In [None]:
from strands.hooks import AgentInitializedEvent, HookProvider, HookRegistry, MessageAddedEvent

class MemoryHook(HookProvider):
    def __init__(self, agentcore_client, memory_id, actor_id, session_id):
        self.agentcore_client = agentcore_client
        self.memory_id = memory_id
        self.actor_id = actor_id
        self.session_id = session_id

    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load recent events and metadata from short-term memory when agent starts"""
        try:
            # Get the most recent event with metadata
            events_resp = self.agentcore_client.list_events(
                memoryId=self.memory_id,
                actorId=self.actor_id,
                sessionId=self.session_id,
                maxResults=1,
                includePayloads=True
            )
            
            if events_resp.get('events'):
                latest_event = events_resp['events'][0]
                
                # Extract event content
                events = []
                for payload in latest_event.get('payload', []):
                    if 'conversational' in payload:
                        conv = payload['conversational']
                        events.append(f"{conv.get('role')}: {conv.get('content', {}).get('text', '')}")
                
                # Extract metadata
                metadata = latest_event.get('metadata', {})
                metadata_info = []
                if 'title' in metadata:
                    metadata_info.append(f"Show: {metadata['title'].get('stringValue', '')}")
                if 'genre' in metadata:
                    metadata_info.append(f"Genre: {metadata['genre'].get('stringValue', '')}")
                if 'start_ms' in metadata and 'end_ms' in metadata:
                    start = metadata['start_ms'].get('stringValue', '0')
                    end = metadata['end_ms'].get('stringValue', '0')
                    metadata_info.append(f"Timestamp: {start}ms - {end}ms")
                
                # Build context with metadata
                context_parts = []
                if metadata_info:
                    context_parts.append("Show Information:\n" + "\n".join(metadata_info))
                if events:
                    context_parts.append("\nMost Recent Event:\n" + "\n".join(events))
                
                if context_parts:
                    event.agent.system_prompt += "\n\n" + "\n\n".join(context_parts)
                
        except Exception as e:
            print(f"Memory load error: {e}")
    
    def register_hooks(self, registry: HookRegistry):
        registry.add_callback(AgentInitializedEvent, self.on_agent_initialized)  

Let's attached the Memory hook to the agent

In [None]:
system_prompt = """
You are a companion agent watching a live streaming show with the viewer. you receive events when topic changes in the show.
Guideline:

- Only respond to the user question about the show
- If provided, closely follow the output format user asked for. No extra explanations
- Don't ask questions back
- Respond in clear and professional manner
- Summarize response in one complete paragraphs, long response need ot follow chronological order
- DO NOT respond to questions outside of the shows you are watching.
"""

agent = Agent(
    model=bedrock_model,
    system_prompt=system_prompt,
    callback_handler=None,
    hooks=[MemoryHook(agentcore_client, chapter_mem_id, actor_id, chapter_session_id)],
)

Invoke agent and see what happens when our agent memory is empty

In [None]:
result = agent(prompt="what are we currently watching?")

# Parse result
try:
    response_text = result.get('output', result) if isinstance(result, dict) else str(result)
    display(Markdown(response_text))
except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(result)}")

Let's load the live stream events from the previous lab. We will load data every 5 seconds, simulating live stream video insights incrementally arriving in real-time.

In [None]:
import re
import json
import time

def sanitize_metadata_value(value: str) -> str:
    r"""Sanitize metadata value to match pattern: [a-zA-Z0-9\s._:/=+@-]*"""
    # Replace parentheses and other invalid chars with spaces or remove them
    value = value.replace('(', ' ').replace(')', ' ')
    # Keep only allowed characters
    return re.sub(r'[^a-zA-Z0-9\s._:/=+@-]', '', value).strip()

"""Load topics into memory with metadata"""
with open('chapters.jsonl', 'r') as f:
    chapters = [json.loads(line) for line in f]

chapters = chapters[:10]

for i, chapter in enumerate(chapters[:10]):
    # Prepare metadata (all values must be strings and match the pattern)
    metadata = {
        'title': {'stringValue': sanitize_metadata_value(chapter.get('show_title', ''))},
        'genre': {'stringValue': sanitize_metadata_value(chapter.get('genre', ''))},
        'start_ms': {'stringValue': str(chapter.get('start_ms', 0))},
        'end_ms': {'stringValue': str(chapter.get('end_ms', 0))}
    }
    
    # Build payload
    payload = [
        {'conversational': {'content': {'text': f"Chapter {i}"}, 'role': 'USER'}},
        {'conversational': {'content': {'text': chapter['topic_summary']}, 'role': 'ASSISTANT'}}
    ]
    
    # Use bedrock client directly to add metadata
    agentcore_client.create_event(
        memoryId=chapter_mem_id,
        actorId=actor_id,
        sessionId=chapter_session_id,
        eventTimestamp=datetime.now(timezone.utc),
        payload=payload,
        metadata=metadata
    )
    
    # Rate limit: 0.25 TPS per actor/session = 1 request every 4 seconds
    print(f"  Loaded chapter {i+1} ({i+1}/{len(chapters)}) - {chapter.get('start_ms', 0)}ms to {chapter.get('end_ms', 0)}ms")
    time.sleep(5)

Assuming the livestream is happening and we are currently playing Chapter 10, watch this part of the video. Play a few seconds to verify if the agent has the correct content of what's currently playing.

In [None]:
def render_video_segment(video_path, start_time, end_time=10000, width=640, height=360):
    randint = random.randint(0, 100000)
    html = f"""
    <video id="myvideo-{randint}" width="{width}" height="{height}" controls>
        <source src="{video_path}" type="video/mp4">
    </video>
    <script>
    (function() {{
        const video = document.getElementById("myvideo-{randint}");
        
        // Wait for metadata to load
        video.addEventListener('loadedmetadata', function() {{
            video.currentTime = {start_time};
        }});
        
        // Fallback: also try on canplay event
        video.addEventListener('canplay', function() {{
            if (video.currentTime === 0) {{
                video.currentTime = {start_time};
            }}
        }});
        
        // Add timeupdate listener to check for end time
        video.addEventListener('timeupdate', function() {{
            if (video.currentTime >= {end_time}) {{
                video.pause();
                video.currentTime = {end_time};
            }}
        }});
        
        // Force load
        video.load();
    }})();
    </script>
    """
    return HTML(html)

# Usage
video_path = '../sample_videos/Netflix_Open_Content_Meridian.mp4'
current = chapters[len(chapters)-1]
start_time = current['start_ms'] / 1000
end_time = current['end_ms'] / 1000
render_video_segment(video_path, start_time, end_time)

Now that the events are ingested, our agent has immediate context of what's happening.

Here are some example queries you can try:

What genre is this show?
What show am I watching?
What is happening right now?"


In [None]:
agent = Agent(
    model=bedrock_model,
    system_prompt=system_prompt,
    callback_handler=None,
    hooks=[MemoryHook(agentcore_client, chapter_mem_id, actor_id, chapter_session_id)],
)

result = agent(prompt="what show am I watching?")

# Parse result
try:
    response_text = result.get('output', result) if isinstance(result, dict) else str(result)
    display(Markdown(response_text))
except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(result)}")

## Step 3: Building agent tools for show summarization and key event search

Now we are going to add some tools to improve our agent to support couple new use cases. Using the `@tool` decorator in Strands makes it extremely easy to build tools by simply annotating regular Python functions, which automatically converts them into agent-callable tools with metadata and input validation generated from the function’s docstring and type hints.

We are going to add 2 tools:

1. summarize show up to now
2. find key moments

<img src="images/local_agent_tools.png" alt="Visual Understanding Architecture" width="800">


### Long-term memory strategy

Here we want to introduce the concept of long-term memory strategy in AgentCore Memory. Unlike short-term memory that just stores transactional memory data, long-term memory strategy determines how interactions are extracted, structured, and stored persistently across sessions, turning insights into vector-searchable knowledge.

Built-in strategies include semantic memory (facts), user preference memory, and session summary memory. Custom memory strategies allow full control over extraction and consolidation logic by defining your own algorithms, prompts, models, and data schemas. You can have multiple memory strategies attached to a memory store focusing on different purposes, and they are logically separated by namespaces.

In this example, we have pre-generated a long-term memory that is asynchronously extracting rolling summaries of the show. Below is how that memory was defined in Python.

``` python
custom_strategy = {
    "customMemoryStrategy": {
        "name": "rolling_summary",
        "namespaces": ["summary/{actorId}/{sessionId}"],
        "configuration": {
            "summaryOverride": {
                "consolidation": {
                    "appendToPrompt": "Merge new info with existing summary. Keep previous info and add new. This is cumulative - grow, don't replace.",
                    "modelId": AUDIOVISUAL_MODEL_ID
                }
            }
        }
    }
}

```

In [None]:
@tool
def get_show_summary():
    """
    Retrieve a comprehensive summary of what has happened in the current live video show so far.
    Combines rolling summary information with the latest events from the video understanding system.
    Use this when users ask about show content, current topics, or what they've missed.
    
    Returns:
        String containing the show summary and latest updates, or error message if unavailable
    """
    try:
        # Get latest summary from memory records
        summary_resp = agentcore_client.list_memory_records(
            memoryId=chapter_mem_id,
            namespace=rolling_summary_namespace,
            maxResults=1
        )
        summary = summary_resp['memoryRecordSummaries'][0]['content']['text'] if summary_resp['memoryRecordSummaries'] else ""
        
        # Get latest event
        events_resp = agentcore_client.list_events(
            memoryId=chapter_mem_id,
            actorId=actor_id,
            sessionId=chapter_session_id,
            maxResults=1,
            includePayloads=True
        )
        
        latest_event = ""
        if events_resp.get('events'):
            for payload in events_resp['events'][0].get('payload', []):
                if 'conversational' in payload:
                    latest_event = payload['conversational']['content']['text']
        
        # Combine summary and latest event
        context = []
        if summary:
            context.append(f"Show Summary: {summary}")
        if latest_event:
            context.append(f"Latest Update: {latest_event}")
            
        return "\n\n".join(context) if context else "No show information available yet."
        
    except Exception as e:
        return f"Unable to retrieve show summary: {str(e)}"

Maker Sure Long Term Strategy Extraction is complete

In [None]:
timeout = 180
start = time.time()
while time.time() - start < timeout:
    try:
        response = agentcore_client.list_memory_records(
            memoryId=chapter_mem_id,
            namespace=rolling_summary_namespace,
            maxResults=1
        )
        if response['memoryRecordSummaries']:
            break
    except:
        pass
    time.sleep(15)

Adding the `get_show_summary` tool to our agent.

In [None]:
agent = Agent(
    model=bedrock_model,
    system_prompt=system_prompt,
    tools=[get_show_summary],
    hooks=[MemoryHook(agentcore_client, chapter_mem_id, actor_id, chapter_session_id)],
)

result = agent(prompt="can you give me a summary of the show so far?")

# Parse result
try:
    response_text = result.get('output', result) if isinstance(result, dict) else str(result)
    display(Markdown(response_text))
except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(result)}")

### Implement Key Event Search

At the same time, we will ingest the events to Bedrock Knowledge Base for key moment search

create a few helper function to help us upload the events to S3 and dynamically ingest them into Bedrock knowledge base

In [None]:
bedrock_config = Config(
    connect_timeout=120,
    read_timeout=120,
    retries={'max_attempts': 0},
    region_name=AWS_REGION
)

# Initialize clients
s3_client = boto3.client('s3')
bedrock_agent_client = boto3.client('bedrock-agent', config=bedrock_config)
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name=AWS_REGION)

def upload_chapter_to_s3(s3_client, bucket_name, chapter, idx):
    """
    Upload chapter content to S3 and return the S3 URI.
    
    Args:
        s3_client: Boto3 S3 client
        bucket_name: S3 bucket name
        chapter: Chapter dictionary with metadata
        idx: Chapter index
        
    Returns:
        S3 URI of the uploaded file
    """
    start_sec = chapter['start_ms'] // 1000
    end_sec = chapter['end_ms'] // 1000
    filename = f"chapters/chapter_{idx:03d}_{start_sec}_{end_sec}.txt"
    
    content = chapter["topic_summary"]
    
    s3_client.put_object(
        Bucket=bucket_name,
        Key=filename,
        Body=content.encode('utf-8'),
        ContentType='text/plain'
    )
    
    return f"s3://{bucket_name}/{filename}"


With Document Level API (DLA), customers can now efficiently and cost-effectively ingest, update, or delete data directly from Amazon Bedrock Knowledge Bases using a single API call, without the need to perform a full sync with the data source periodically or after every change.

To read more about DLA, see the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/kb-direct-ingestion-add.html)

In [None]:
successful = 0
failed = 0

for idx, chapter in enumerate(chapters):
    try:
        # Upload to S3 and get URI
        s3_uri = upload_chapter_to_s3(s3_client, data_bucket_name, chapter, idx)
        
        # Prepare metadata
        start_sec = chapter['start_ms'] // 1000
        end_sec = chapter['end_ms'] // 1000
        metadata_attributes = [
            {'key': 'show_title', 'value': {'stringValue': chapter['show_title'], 'type': 'STRING'}},
            {'key': 'genre', 'value': {'stringValue': chapter['genre'], 'type': 'STRING'}},
            {'key': 'start_ms', 'value': {'numberValue': chapter['start_ms'], 'type': 'NUMBER'}},
            {'key': 'end_ms', 'value': {'numberValue': chapter['end_ms'], 'type': 'NUMBER'}},
        ]
        
        # Ingest to Knowledge Base
        document_config = {
            'content': {
                'dataSourceType': 'CUSTOM',
                'custom': {
                    'customDocumentIdentifier': {'id': f"chapter_{idx:03d}"},
                    'sourceType': 'S3_LOCATION',
                    's3Location': {'uri': s3_uri}
                }
            },
            'metadata': {
                'type': 'IN_LINE_ATTRIBUTE',
                'inlineAttributes': metadata_attributes
            }
        }
        
        bedrock_agent_client.ingest_knowledge_base_documents(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id,
            documents=[document_config]
        )
        
        successful += 1
        if (idx + 1) % 5 == 0:
            print(f"Processed {idx + 1}/{len(chapters)}")
            time.sleep(1)
            
    except Exception as e:
        failed += 1
        print(f"Failed chapter {idx}: {e}")

In [None]:
from typing import List, Dict
@tool
def search_key_moments(query: str, max_results: int = 2) -> List[Dict]:
    """
    Search key moments of the video from Knowledge Base.
    
    Args:
        query: Search query text describing what you're looking for
        max_results: Maximum number of results to return (default: 2)
        
    Returns:
        List of chapters with content, start_ms, and end_ms
    """

    
    response = bedrock_agent_runtime.retrieve(
        knowledgeBaseId=kb_id,
        retrievalQuery={'text': query},
        retrievalConfiguration={
            'vectorSearchConfiguration': {
                'numberOfResults': max_results
            }
        }
    )
    
    results = []
    for result in response['retrievalResults']:
        metadata = result.get('metadata', {})
        results.append({
            'content': result['content']['text'],
            'start_ms': metadata.get('start_ms', 0),
            'end_ms': metadata.get('end_ms', 0)
        })
    
    return results

function to help us parse the reponse into valide dictionary.

In [None]:
def parse_json_from_text(text: str) -> list:
    """Parse JSON and convert string numbers to integers."""
    text = re.sub(r'```json\s*', '', text)
    text = re.sub(r'```\s*$', '', text)
    text = text.replace('\\"', '"')
    text = text.replace('\\n', '\n')
    text = text.replace('\\:', ':')
    text = text.strip()
    
    text = text.replace(" _ms", "_ms") #fix spaces

    data =  json_repair.loads(text)
    print(data)
    
    # Convert string numbers to integers
    for item in data:
        if 'start_ms' in item:
            item['start_ms'] = float(item['start_ms'])
        if 'end_ms' in item:
            item['end_ms'] = float(item['end_ms'])
    
    return data

In [None]:
prompt = """

I want to find a moment where vintage car driving in the winding road

output list of JSON object only, strickly follow the schema below. Every JSON object MUST HAVE attributes: "content", "start_ms", "end_ms"
[
    {"content": "...", "start_ms": "...", "end_ms": "..."},
    {"content": "...", "start_ms": "...", "end_ms": "..."},
    ...
]
"""

agent = Agent(
    model=bedrock_model,
    system_prompt=system_prompt,
    tools=[get_show_summary, search_key_moments],
    hooks=[MemoryHook(agentcore_client, chapter_mem_id, actor_id, chapter_session_id)],
)

result = agent(prompt=prompt)

# Parse result
try:
    response_text = result.get('output', result) if isinstance(result, dict) else str(result)

    results = parse_json_from_text(response_text)

    for r in results:
        display(Markdown(f"Chapter summary: {r['content']}"))
        start = r['start_ms']/1000
        end = r['end_ms']/1000
        display(render_video_segment(video_path, start, end))

except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(result)}")

## Step 4: 4. Deploying the agent to production

<img src="images/agentcore_runtime.png" alt="Visual Understanding Architecture" width="800">

Now we will deploy the agent to Amazon Bedrock AgentCore runtime. before that, we need to store all configuration parameters in AWS Systems Manager Parameter Store so the runtime can access them.

**Why SSM Parameter Store?**
- The AgentCore Runtime needs configuration to connect to memory
- SSM provides secure, centralized configuration management
- The runtime's IAM role will have permission to read these parameters
- Easy to update configuration without redeploying the agent

In [None]:
# Store configuration parameters in SSM for runtime access
ssm_client = boto3.client('ssm', region_name=AWS_REGION)

# Define all parameters to store
parameters = {
    '/viewing-companion/model_id': AUDIOVISUAL_MODEL_ID,
    '/viewing-companion/memory_id': chapter_mem_id,
    '/viewing-companion/actor_id': actor_id,
    '/viewing-companion/session_id': chapter_session_id,
    '/viewing-companion/rolling_summary_namespace': rolling_summary_namespace,
    '/viewing-companion/kb_id': kb_id
}

print("Storing configuration in SSM Parameter Store...\n")

for param_name, param_value in parameters.items():
    ssm_client.put_parameter(
        Name=param_name,
        Value=param_value,
        Type='String',
        Overwrite=True
    )
    print(f"✓ Stored: {param_name}")
    print(f"  Value: {param_value}")

print("\n✅ All configuration stored in SSM Parameter Store")

We'll combine the code into a clean script. To ensure compatibility with AgentCore runtime, you need to make the following changes to your script:

- Import the Runtime App with `from bedrock_agentcore.runtime import BedrockAgentCoreApp`
- Initialize the App in your code with `app = BedrockAgentCoreApp()`
- Decorate the invocation function with the `@app.entrypoint decorator`
- Let AgentCoreRuntime control the running of the agent with `app.run()`

This setup remains the same regardless of which agent framework you use.

In [None]:
with open("script/viewing_companion_agent.py", "r") as f:
    code = f.read()

display(Markdown(f"""```python
{code}
```"""))

### Configure AgentCore Runtime

Configure the runtime deployment with execution permissions.

**Configuration Parameters:**

1. **Entrypoint** - `script/viewing_companion_agent.py`
   - Python file with `BedrockAgentCoreApp` and `@app.entrypoint`
   - Entry point for request handling

2. **Execution Role** - IAM role for runtime permissions
   - Grants access to SSM Parameter Store
   - Allows CloudWatch Logs writing
   - Enables Bedrock model invocation
   - Allows AgentCore Memory access

3. **Auto-create ECR** - Automatically creates ECR repository
   - Stores Docker container images
   - Manages image versioning

4. **Requirements File** - `script/requirements.txt`
   - Lists Python dependencies
   - Installed during container build

In [None]:
# Import helper
from agentcore_helper import AgentCoreHelper

# Initialize helper
agentcore_helper = AgentCoreHelper(region_name=AWS_REGION)

# Agent configuration
agent_name = "live_streaming_companion_agent"
execution_role_name = f"{agent_name}_role"
execution_policy_name = f"{agent_name}_policy"

print(f"Agent name: {agent_name}")
print(f"Execution role: {execution_role_name}")

Remove left over configuration scripts

In [None]:
!rm -rf .bedrock_agentcore.yaml

In [None]:
from bedrock_agentcore_starter_toolkit import Runtime

# Create execution role with necessary permissions
execution_role_arn = agentcore_helper.create_agentcore_runtime_execution_role(
    role_name=execution_role_name,
    policy_name=execution_policy_name
)

print(f"✓ Execution role created: {execution_role_arn}")
time.sleep(40) # add wait for role policy to propogate

# Initialize runtime toolkit
agentcore_runtime = Runtime()

# Configure the deployment
response = agentcore_runtime.configure(
    entrypoint="script/viewing_companion_agent.py",
    execution_role=execution_role_arn,
    auto_create_ecr=True,
    requirements_file="script/requirements.txt",
    region=AWS_REGION,
    agent_name=agent_name,
)

print("\n✅ Runtime configuration completed")
print(f"Agent name: {agent_name}")
print(f"Entrypoint: script/viewing_companion_agent.py")

### Launch Runtime

Deploy the agent to AgentCore Runtime. This will:
1. Build a Docker container with your agent code
2. Push the container to Amazon ECR
3. Create an AgentCore Runtime with the container
4. Configure auto-scaling and monitoring

**Note:** This step takes 5-10 minutes as it builds and deploys the container.

If the agent already exists, use `auto_update_on_conflict=True` to update it.

In [None]:
# Launch the runtime (builds and deploys the container)
print("Launching runtime... This may take 5-10 minutes.\n")

launch_result = agentcore_runtime.launch(
    auto_update_on_conflict=True  # Update if agent already exists
)

print("\n✅ Runtime launched successfully!")
print(f"Agent ARN: {launch_result.agent_arn}")
print(f"Agent ID: {launch_result.agent_id}")

### Wait for Deployment

Monitor the runtime deployment until it's ready to accept requests.

**Runtime States:**
- `CREATING` - Container is being deployed
- `UPDATING` - Runtime is being updated
- `READY` - Runtime is ready to accept requests
- `CREATE_FAILED` - Deployment failed
- `UPDATE_FAILED` - Update failed

In [None]:
# Wait for the agent to be ready
status_response = agentcore_runtime.status()
status = status_response.endpoint["status"]

end_status = ["READY", "CREATE_FAILED", "DELETE_FAILED", "UPDATE_FAILED"]
while status not in end_status:
    print(f"Waiting for deployment... Current status: {status}")
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint["status"]

if status == 'READY':
    print(f"Runtime is ready! Status: {status}")
else:
    print(f"Final status: {status}")

### Test the Deployed Agent

Let's try the same use cases to test our agent now running in Bedrock AgentCore

In [None]:
agent_arn = launch_result.agent_arn

print(f"Ready to invoke agent: {agent_arn}") 

Use case 1: What I am currently watching?

In [None]:
payload = {"prompt":"What show am I watching?"}

boto3_response = agentcore_client.invoke_agent_runtime(
    agentRuntimeArn=agent_arn,
    qualifier="DEFAULT",
    payload=json.dumps(payload)
)

# Handle streaming response (SSE format)
if "text/event-stream" in boto3_response.get("contentType", ""):
    print("Processing streaming response...")
    content = []
    for line in boto3_response["response"].iter_lines(chunk_size=1):
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                data = line[6:].replace('"', '')
                content.append(data)
    
    full_response = " ".join(content)
    display(Markdown(full_response))
else:
    print("Non-streaming response received")
    print(boto3_response)

Use case 2: summarize live show up to this point

In [None]:
payload = {"prompt":"can you give me a summary of the show so far?"}

boto3_response = agentcore_client.invoke_agent_runtime(
    agentRuntimeArn=agent_arn,
    qualifier="DEFAULT",
    payload=json.dumps(payload)
)

# Handle streaming response (SSE format)
if "text/event-stream" in boto3_response.get("contentType", ""):
    print("Processing streaming response...")
    content = []
    for line in boto3_response["response"].iter_lines(chunk_size=1):
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                data = line[6:].replace('"', '')
                content.append(data)
    
    full_response = " ".join(content)
    display(Markdown(full_response))
else:
    print("Non-streaming response received")
    print(boto3_response)

Use case 3: search for key moments

In [None]:
payload = {"prompt":"""

I want to find a moment where vintage car driving in the winding road

output list of JSON object only, strickly follow the schema below. Every JSON object MUST HAVE attributes: "content", "start_ms", "end_ms"
[
    {"content": "...", "start_ms": "...", "end_ms": "..."},
    {"content": "...", "start_ms": "...", "end_ms": "..."},
    ...
]
"""}

boto3_response = agentcore_client.invoke_agent_runtime(
    agentRuntimeArn=agent_arn,
    qualifier="DEFAULT",
    payload=json.dumps(payload)
)

# Handle streaming response (SSE format)
if "text/event-stream" in boto3_response.get("contentType", ""):
    print("Processing streaming response...")
    content = []
    for line in boto3_response["response"].iter_lines(chunk_size=1):
        if line:
            line = line.decode("utf-8")
            if line.startswith("data: "):
                data = line[6:].replace('"', '')
                content.append(data)
    
    full_response = " ".join(content)
    display(Markdown(full_response))
else:
    print("Non-streaming response received")
    print(boto3_response)

In [None]:
# Parse result
try:
    results = parse_json_from_text(full_response)

    for r in results:
        display(Markdown(f"Chapter summary: {r['content']}"))
        start = r['start_ms']/1000
        end = r['end_ms']/1000

        display(render_video_segment(video_path, start, end))

except Exception as e:
    print(f"Error parsing response: {e}")
    print(f"Raw response: {str(full_response)}")

## Clean Up

In [None]:
# memory_client.delete_memory_and_wait(
#     memory_id=memory_id,
#     max_wait=300,
#     poll_interval=10
# )

In [None]:
# # Delete SSM parameters
# ssm_parameters = [
#     '/viewing-companion/model_id',
#     '/viewing-companion/memory_id',
#     '/viewing-companion/actor_id',
#     '/viewing-companion/session_id',
#     '/viewing-companion/rolling_summary_namespace',
#     '/viewing-companion/kb_id'
# ]

# print("\nDeleting SSM parameters...")
# for param in ssm_parameters:
#     try:
#         ssm_client.delete_parameter(Name=param)
#         print(f"✓ Deleted: {param}")
#     except ssm_client.exceptions.ParameterNotFound:
#         print(f"  (already deleted: {param})")
#     except Exception as e:
#         print(f"  Note: {e}")

# print("\n✅ All SSM parameters deleted")


In [None]:
# # Delete AgentCore Runtime
# agentcore_helper.runtime_resource_cleanup(runtime_arn=launch_result.agent_arn)
# print("✅ AgentCore Runtime deleted")

# # Delete IAM role and policy
# agentcore_helper.delete_agentcore_runtime_execution_role(
#     role_name=execution_role_name,
#     policy_name=execution_policy_name
# )
# print("✅ IAM role and policy deleted")
