# üß¨ Multi-Agent Genomics Pipeline: Graph Patterns and Edge Conditions

This notebook demonstrates advanced Strands Agents concepts by building a complete **Multi-Agent Genomics Pipeline** using the Graph pattern with conditional execution and quality gates. The goal of the agent is to take a sample ID for a cancer sample (tumor/ normal pair) as input and process that through multiple steps to identify the significance of any genomic mutations in that sample.

## üéØ What You'll Learn

1. **Multi-Agent Patterns** - Graph, Swarm, and Workflow architectures
2. **Graph Construction** - Building deterministic agent workflows
3. **Edge Conditions** - Conditional execution based on agent results
4. **Quality Gates** - Data quality assessment and flow control
5. **Agent Orchestration** - Coordinating specialized agents
6. **Production Deployment** - AgentCore deployment for multi-agent systems

## üèóÔ∏è What We'll Build

A complete **Genomics Analysis Pipeline** with four specialized agents:

1. **üîç Data Discovery Agent** - Find genomics data files
2. **üî¨ Quality Control Agent** - Assess data quality (with quality gate)
3. **‚öôÔ∏è Workflow Orchestrator Agent** - Run variant calling workflows
4. **üìä Interpretation Agent** - Generate clinical reports

## üï∏Ô∏è Multi-Agent Graph Architecture

The agents are connected in a **directed graph** with **conditional edges** that create quality gates:

![image.png](./image.png)

### üîó **Graph Components:**

#### **Nodes (Agents):**
- **Entry Point**: üîç Data Discovery Agent
- **Always Execute**: üîç Data Discovery ‚Üí üî¨ Quality Control
- **Conditional Execute**: ‚öôÔ∏è Workflow Orchestrator (quality gate)
- **Conditional Execute**: üìä Interpretation Agent (completion gate)

#### **Edges (Connections):**
1. **üîç ‚Üí üî¨** (Unconditional): Always proceed to quality control
2. **üî¨ ‚Üí ‚öôÔ∏è** (Conditional): `quality_passed_condition()`
3. **‚öôÔ∏è ‚Üí üìä** (Conditional): `workflow_completed_condition()`

#### **Edge Conditions (Quality Gates):**
- **Quality Gate**: Analyzes QC results for phrases like:
  - ‚úÖ "Data quality status: SUFFICIENT for variant calling"
  - ‚ùå "Data quality status: INSUFFICIENT for variant calling"
- **Completion Gate**: Checks workflow results for:
  - ‚úÖ "Workflow execution completed successfully"
  - ‚ùå "Workflow execution failed"

### üéØ **Execution Flow:**
1. **Always**: Data Discovery finds genomics files
2. **Always**: Quality Control assesses data quality
3. **If quality passes**: Workflow Orchestrator runs variant calling
4. **If workflow succeeds**: Interpretation Agent generates clinical reports
5. **If quality fails**: Pipeline stops after QC with quality assessment
6. **If workflow fails**: Pipeline stops after orchestrator with failure diagnosis

This architecture ensures **data-driven decision making** and **resource efficiency** by only running expensive computations when data quality is sufficient!

---

## üìö Part 1: Multi-Agent Patterns Overview

### Why Use Multiple Agents

1. Artificial General Intelligence (AGI) isn't here yet meaning one agent probably can't do everything
2. Individual agents can be specialized with crisp system prompts
3. Agents with fewer tools use fewer tokens to 'understand' those tools
4. Agents with simpler tasks can use simpler (and more cost effective) models or more specialized models

### Strands Multi-Agent Patterns

Strands Agents supports three primary multi-agent patterns:

#### 1. **Graph Pattern** (What we'll build)
- **Deterministic flow** with defined dependencies
- **Conditional execution** based on agent results
- **Quality gates** and validation checkpoints
- **Best for**: Structured workflows with clear dependencies

#### 2. **Swarm Pattern**
- **Collaborative agents** working together
- **Dynamic task distribution**
- **Shared context and memory**
- **Best for**: Complex problem-solving requiring multiple perspectives

#### 3. **Workflow Pattern**
- **Sequential execution** with handoffs
- **State management** between agents
- **Error recovery** and retry logic
- **Best for**: Linear processes with clear stages

### Why Graph Pattern for Genomics?

Genomics analysis requires:
- **Quality validation** before expensive computations
- **Conditional execution** based on data quality
- **Deterministic flow** for reproducible results
- **Specialized agents** for different analysis stages

## üõ†Ô∏è Step 1: Environment Setup and Dependencies

Let's set up our environment for multi-agent development.

In [None]:
# Install required dependencies for multi-agent systems
!pip install strands-agents boto3 pandas --quiet

## üß± Step 2: The Individual Agents

The genomics multi agent is composed of four specialized agents. These have been created as python files and all follow patterns introduced in [notebook 01](01-strands-agents-introduction.ipynb). Feel free to review the individual agent definitions as you read through the descriptions here.

### üïµÔ∏è‚Äç‚ôÄÔ∏è Data Discovery Agent

The data discovery agent is responsible for identifying genomics files in S3 that meet the requirements of the user. It uses tools provided by an MCP server to achieve this goal.

[data_discovery_agent.py](data_discovery_agent.py)

### üî¨ Quality Control Agent

Next up is the quality control agent which is responsible for assessing the quality of the genomics data files. This agent will be executed after the data discovery agent and will provide recommendations on whether to proceed with variant calling or not. It has built in `@tool` functions that let it locate, download, unpack and review FASTQ quality reports. It summarizes the quality of the data and makes recommendations to proceed or stop.

[qc_agent.py](qc_agent.py)

### üë∑ Workflow Orchestrator Agent

The workflow orchestrator is essentially the same as what we built in [notebook 01](01-strands-agents-introduction.ipynb). It runs the
workflow required to analyze the samples.

[workflow_orchestrator_agent.py](workflow_orchestrator_agent.py)

### üßë‚Äçüíª Interpretation and Reporting Agent

The interpretation and reporting agent features a number of `@tool`s that give the agent the ability to load
MAF files as well as various CIViC data files. These tools are used to load the MAF file generated by the tumor/ normal variant caller and perform the analysis. Using the data the tool generates a report with example clinical recommendations.

[interpretation_and_reporting_agent.py](interpretation_and_reporting_agent.py)

## üîó Step 3: Understanding Edge Conditions

Edges are used to connect the nodes (agents) in the graph. Edge conditions determine when edges should be traversed and when agents should execute based on previous or results of other conditions. An edge condition is any function that uses the agent state to return a `bool`.

Here, we make an edge condition function that will analyze the output of the QC agent to determine if it is sufficient to continue with variant calling. This implementation is a simplistic sentiment analysis that is suitable for simple cases (and this demonstration). A more sophisticated and robust edge condition could use vector embeddings and vector distance to achieve the same result.

In [None]:
%%writefile edge_functions.py

import logging
from strands.multiagent.base import NodeResult, Status
from typing import Any, Optional

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def quality_passed_condition(state: Any) -> bool:
    """Check if QC results indicate acceptable data quality for variant calling.
    
    This condition function analyzes the QC agent's results to determine if the
    sequencing data quality is sufficient to proceed with variant calling.
    
    Args:
        state: GraphState containing results from executed nodes
        
    Returns:
        bool: True if quality is acceptable, False otherwise
    """
    # Get QC results from the graph state
    qc_results: Optional[NodeResult] = state.results.get('qc_agent')
    if not qc_results:
        logger.warning("No QC results available for quality assessment")
        return False
    
    # Analyze QC results to determine if quality is acceptable
    qc_result_text: str = str(qc_results.result).lower()
    
    # Look for definitive quality assessment phrases
    definitive_good_phrases = [
        'sufficient for somatic variant calling',
        'sufficient for variant calling', 
        'data quality status: sufficient',
        'suitable for somatic variant calling',
        'proceed with confidence'
    ]
    
    definitive_bad_phrases = [
        'insufficient for variant calling',
        'data quality status: insufficient',
        'poor data quality',
        'not suitable for variant calling',
        'quality too low',
        'do not proceed'
    ]
    
    # Check for definitive assessment phrases
    for phrase in definitive_good_phrases:
        if phrase in qc_result_text:
            logger.info("Quality gate: PASSED - Found definitive positive quality assessment")
            return True
            
    for phrase in definitive_bad_phrases:
        if phrase in qc_result_text:
            logger.info("Quality gate: FAILED - Found definitive negative quality assessment")
            return False
    
    # Fallback to indicator counting if no definitive phrases found
    quality_indicators = {
        'good': ['‚úÖ pass', 'good quality', 'acceptable', 'high quality', 'suitable', 'meets', 'adequate', 'excellent'],
        'bad': ['poor quality', 'failed', 'low quality', 'unacceptable', 'contamination detected', 'below threshold', 'insufficient']
    }
    
    good_score = sum(1 for indicator in quality_indicators['good'] if indicator in qc_result_text)
    bad_score = sum(1 for indicator in quality_indicators['bad'] if indicator in qc_result_text)
    
    # Quality passes if we have more good indicators than bad ones
    quality_passed = good_score > bad_score
    
    if quality_passed:
        logger.info(f"Quality gate: PASSED - Data suitable for variant calling (good: {good_score}, bad: {bad_score})")
    else:
        logger.info(f"Quality gate: FAILED - Data quality insufficient (good: {good_score}, bad: {bad_score})")
    
    return quality_passed


If the variant calling workflow fails, then it doesn't really make sense to proceed with the analysis so we will construct another edge condition that will only proceed to the analysis agent if the workflow is complete and successful.

In [None]:
%%writefile -a edge_functions.py

def workflow_completed_condition(state: Any) -> bool:
    """Check if workflow orchestrator completed successfully.
    
    Args:
        state: GraphState containing results from executed nodes
        
    Returns:
        bool: True if workflow completed successfully, False otherwise
    """
    # Get workflow results from the graph state
    workflow_results: Optional[NodeResult] = state.results.get('workflow_orchestrator')
    if not workflow_results:
        logger.warning("No workflow orchestrator results available")
        return False
    
    # Check if workflow completed successfully
    workflow_result_text: str = str(workflow_results.result).lower()
    
    # Look for success indicators
    success_indicators = [
        'workflow execution completed successfully',
        'workflow completed successfully',
        'run completed',
        'succeeded',
        'status: completed',
        '‚úÖ completed',
        'completed successfully',
        'vcf file',
        'filtered.vcf',
        'final output',
        'workflow finished'
    ]
    
    failure_indicators = [
        'workflow execution failed',
        'workflow failed',
        'run failed',
        'error',
        'failed to complete',
        'timeout',
        'cancelled'
    ]
    
    # Check for definitive success/failure indicators
    for indicator in success_indicators:
        if indicator in workflow_result_text:
            logger.info(f"Workflow condition: PASSED - Found success indicator: {indicator}")
            return True
            
    for indicator in failure_indicators:
        if indicator in workflow_result_text:
            logger.info("Workflow condition: FAILED - Workflow did not complete successfully")
            return False
    
    # If no clear indicators, check status
    if workflow_results.status == Status.COMPLETED:
        logger.info("Workflow condition: PASSED - Node status indicates completion")
        return True
    else:
        logger.info("Workflow condition: FAILED - Node status indicates failure")
        return False


## üèóÔ∏è Step 4: Build the Graph Agent

With the individual agents and edge conditions defined, it is time to build a function that will assemble the Graph agent. This is achieved by adding the agents as nodes and connecting the nodes with edges, some of which are conditional. 


In [None]:
%%writefile graph_agent.py

import logging
import boto3

from mcp_clients import setup_mcp_clients
from data_discovery_agent import create_data_discovery_agent
from qc_agent import create_qc_agent
from workflow_orchestrator_agent import create_healthomics_agent
from interpretation_and_reporting_agent import create_cancer_analysis_agent
from edge_functions import quality_passed_condition, workflow_completed_condition

from strands import Agent
from strands.multiagent import GraphBuilder
from strands.multiagent.graph import Graph

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def build_genomics_graph(healthomics_client, aws_api_client) -> Graph:
    """Build the complete genomics multi-agent graph"""

    # Get tools from MCP clients
    healthomics_tools = healthomics_client.list_tools_sync()
    aws_tools = aws_api_client.list_tools_sync()

    # Create agents with appropriate tools
    data_discovery_agent = create_data_discovery_agent(healthomics_tools)
    data_discovery_agent.name = "data_discovery"
    data_discovery_agent.description = "Discovers genomics data files for analysis"
    data_discovery_agent.system_prompt = """
    You are responsible for identifying FASTQ files that the user is looking for.
    1. Find files in S3 that seem like the best match
    2. Report the S3 URIs for use by the next agent

    Provide terse and very focussed responses.
    Perform only these tasks, when you have completed these tasks handoff to the next agent.
    """

    qc_agent = create_qc_agent(aws_tools)
    qc_agent.name = "qc_agent"
    qc_agent.description = "Analyzes sequencing data quality and provides recommendations"
    qc_agent.system_prompt = """
    You are responsible for analyzing sequencing data quality and reporting on it's quality. You will be given information including a URI
    for one or more files. 
    1. Find the FASTQC reports for these files. These QC reports might be in a sibling directory relative to
    the location of the FASTQ files. 
    2. Determine if the quality is acceptable for somatic variant calling which will be performed by 
    another agent along with reasons why or why not.
    3. Ensure you include the full URIs of any files you are reporting on for use by the next agent

    Provide terse and very focussed responses.
    Perform only these tasks, when you have completed these tasks handoff to the next agent.
    """

    workflow_orchestrator = create_healthomics_agent(healthomics_tools + aws_tools)
    workflow_orchestrator.name = "workflow_orchestrator"
    workflow_orchestrator.description = "Orchestrates genomics workflows when data quality is sufficient"
    workflow_orchestrator.system_prompt = """
    You are responsible for orchestrating genomics workflows. You will be given URIs for FASTQ pairs from tumor/ normal
    samples. Your job is to:

    1. Find the aligned BAM files associated with these reads
    2. Run the most recent version of the genomics-ai-workshop-mutect2 workflow on the data 
        - When running the workflow you are responsible for finding the IAM OmicsServiceRole
        - The workflow outputs bucket is in my account and begins with s3://genomics-ai-workshop-results
        - Run the workflow in 'cooking show' mode and set vcf2maf_output to s3://aws-genomics-static-us-east-1/omics-data/tumor-normal/maf/test_civic.maf
    3. After starting the workflow run, use your built-in wait_for_workflow tool with the run ID to monitor the run status until completion.
    4. If the run fails, then diagnose the problem and report on potential solutions.
    5. If the run succeeds report the S3 URIs of all output files generated by the workflow. These will be used by the next agent.

    Provide terse and very focussed responses.
    Perform only these tasks, when you have completed these tasks handoff to the next agent.
    """

    interpretation_agent = create_cancer_analysis_agent(aws_tools)
    interpretation_agent.name = "interpretation_agent"
    interpretation_agent.description = "Interprets cancer variants and generates clinical reports"
    interpretation_agent.system_prompt = """
    You are responsible for interpreting cancer variants and generating clinical reports. You will be given the S3 URI of a MAF file
    containing somatic variants from the workflow orchestrator. Your job is to:

    1. Load the MAF file from the provided S3 URI
    2. Load CIViC annotation files for clinical evidence matching
    3. Match variants with CIViC database to find clinical evidence and therapeutic recommendations
    4. Generate a comprehensive clinical report with evidence-based therapeutic recommendations
    5. Upload the clinical report to S3 for sharing and archival (use generate_and_upload_clinical_report)
    6. Provide appropriate guidance for both matched variants (with clinical evidence) and unmatched variants (rare/novel)
    7. Write the final report to the bucket my account that begins with s3://genomics-ai-workshop-results

    Always provide clinically appropriate, evidence-based recommendations while being clear about the limitations of available data.
    When uploading reports to S3, use a meaningful filename and path structure (e.g., s3://bucket/reports/YYYY-MM-DD/sample_clinical_report.md).
    """

    logger.info("ü§ñ All agents created with proper configurations")

    # Build the multi-agent graph
    builder = GraphBuilder()

    # Add nodes (four specialized agents) - all should be agent instances
    builder.add_node(data_discovery_agent, "data_discovery")
    builder.add_node(qc_agent, "qc_agent")
    builder.add_node(workflow_orchestrator, "workflow_orchestrator")
    builder.add_node(interpretation_agent, "interpretation_agent")

    logger.info("üìä Graph nodes added")

    # Add edges with conditional logic
    builder.add_edge("data_discovery", "qc_agent")
    builder.add_edge("qc_agent", "workflow_orchestrator", condition=quality_passed_condition)
    builder.add_edge("workflow_orchestrator", "interpretation_agent", condition=workflow_completed_condition)

    # Set entry point
    builder.set_entry_point("data_discovery")

    # Configure execution limits
    builder.set_execution_timeout(3600)   # 1 hour
    builder.set_node_timeout(1800)        # 30 minutes per node

    # Build the graph
    graph = builder.build()

    logger.info("‚úÖ Genomics multi-agent graph built successfully!")
    return graph

# Global variables for clients and graph
_healthomics_client = None
_aws_api_client = None
_genomics_graph = None
_clients_initialized = False

def get_or_create_graph():
    """Get the genomics graph, keeping MCP clients active"""
    global _healthomics_client, _aws_api_client, _genomics_graph, _clients_initialized

    if _genomics_graph is None:
        # Initialize clients once and keep them active
        _healthomics_client, _aws_api_client = setup_mcp_clients()

        # Enter the context managers but don't exit them
        _healthomics_client.__enter__()
        _aws_api_client.__enter__()
        _clients_initialized = True

        # Build graph with active clients
        _genomics_graph = build_genomics_graph(_healthomics_client, _aws_api_client)

    return _genomics_graph


def run_graph_locally(prompt: str):
    """Run the graph locally with a prompt (for testing)"""
    import asyncio
    
    async def _run():
        graph = get_or_create_graph()
        print(f"\nüöÄ Running graph with prompt: {prompt}\n")
        print("="*80)
        
        # Execute the graph
        result = await graph.invoke_async(prompt)
        
        # Display results
        print(f"\nStatus: {result.status}")
        print(f"Completed Nodes: {result.completed_nodes}/{result.total_nodes}")
        print(f"Execution Time: {result.execution_time}ms")
        
        print("\n" + "-"*80)
        for node_id, node_result in result.results.items():
            print(f"\nüîπ {node_id}: {node_result.status}")
            if node_result.result:
                result_text = str(node_result.result)
                if len(result_text) > 300:
                    print(f"   {result_text[:300]}...")
                else:
                    print(f"   {result_text}")
        
        print("="*80)
        print("\n‚úÖ Graph execution completed\n")
    
    asyncio.run(_run())


if __name__ == "__main__":
    # Example usage when running directly
    import sys
    if len(sys.argv) > 1:
        prompt = " ".join(sys.argv[1:])
        run_graph_locally(prompt)
    else:
        print("Usage: python graph_agent.py \"Your prompt here\"")
        print("\nExample:")
        print('  python graph_agent.py "Analyze genomics data for sample P001"')


## üöÄ Step 5: Running the Graph Agent

Now that we've built the complete multi-agent graph, let's test it! We've created a standalone script that allows you to run the graph agent from the command line.

### Running from Command Line

Use the `run_graph_agent.py` script to interact with the graph agent:

```bash
python run_graph_agent.py "Find and analyze genomics data for sample P001"
```

The script will:
1. Initialize the MCP clients
2. Build the multi-agent graph
3. Stream the response as each agent executes
4. Display results from each node in the graph


In [None]:
!python run_graph_agent.py "Find and analyze genomics data for sample P001"

### Understanding the Output

As the graph executes, you'll see a lot of output:
- **Node transitions**: Which agent is currently executing
- **Tool use**: Which tools are being used by each agent
- **MCP logs**: Detailed logs from the MCP clients
- **Agent outputs**: Results from each specialized agent
- **Workflow monitoring**: Monitoring of the workflow orchestration agent when the workflow is running
- **Quality gates**: Whether conditions passed or failed
- **Final results**: Complete analysis or early termination based on quality

Because none of the agents have explicit instructions on exactly how to perform their actions you will also see them reasoning
about their task, making mistakes and trying different approaches. For example, the workflow agent needs to find BAM files associated
with a sample as well as locate a reference genome and identify an IAM Role suitable for the workflow. It may make one or more guesses
about these but when it gets feedback on it's mistakes it will have to go looking for them to succeed.

The graph will automatically:
- Stop after QC if data quality is insufficient
- Stop after workflow orchestration if the workflow fails
- Complete all agents if quality passes and workflow succeeds

## üéì Summary: Multi-Agent Genomics Pipeline

Congratulations! You've successfully built a sophisticated **Multi-Agent Genomics Pipeline** using advanced Strands Agents patterns. Here's what we accomplished:

### üèóÔ∏è **Multi-Agent Architecture**
1. **Graph Pattern** - Deterministic workflow with conditional execution
2. **Four Specialized Agents** - Each with specific genomics expertise
3. **Quality Gates** - Data-driven decision making with edge conditions
4. **MCP Integration** - Multiple tool servers for comprehensive capabilities

### üîó **Key Multi-Agent Concepts**
- **GraphBuilder**: Constructing deterministic agent workflows
- **Edge Conditions**: Conditional execution based on agent results
- **Quality Gates**: Automated quality assessment and flow control
- **Agent Orchestration**: Coordinating specialized agents for complex tasks

### üìö **What You've Learned**
1. **Multi-Agent Patterns** - When and how to use Graph, Swarm, and Workflow patterns
2. **Conditional Execution** - Building intelligent decision points in agent workflows
3. **Quality Gates** - Implementing data-driven flow control
4. **Agent Specialization** - Creating focused agents for specific tasks

You now have the skills to build sophisticated multi-agent systems that can handle complex, real-world genomics workflows with intelligent decision making and robust error handling! üß¨ü§ñ‚ú®