# Mappr

> Scale up evaluation report mapping against evaluation frameworks using agentic workflows


::: {.callout-warning}
This notebook is a work in progress.
:::

Manually mapping evaluation reports against IOM's [Strategic Results Framework (SRF)](https://srf.iom.int) is time-consuming and resource-intensive with ~150 outputs to analyze. Additionally, the mapping process needs transparent and human-readable traces of LLM decision flows that both reflect natural reasoning patterns and allow human evaluators to audit the mapping logic.

A three-stage async pipeline leveraging [Global Compact for Migration (GCM) UN General Assembly Resolution](https://www.un.org/en/development/desa/population/migration/generalassembly/docs/globalcompact/A_RES_73_195.pdf) objectives as SRF Outputs pruning mechanism:



**Stage 1**: SRF Enablers & Cross-cutting Analysis

- **Async parallel analysis** of Enablers (7 categories) and Cross-cutting Priorities (4 categories) using shared semaphore for rate limiting
- **Purpose**: Identify if report is primarily meta-evaluation/transversal in nature
- **Fast processing**: ~11 items total with concurrent execution, provides context for subsequent stages

**Stage 2**: Informed GCM Analysis

- **Rate-limited parallel processing** of GCM Objectives (23 items) informed by Stage 1 results
- **Condensed representations**: UN General Assembly Resolution formulation simplified for retrieval efficiency
- **Concurrent theme analysis** with API quota management

**Stage 3**: Targeted SRF Analysis

- **SRF Filtering**: Use GCM results + `gcm_srf_lut` lookup table to prune ~150 SRF outputs to ~20-50 relevant ones
- **Deep parallel analysis**: Full hierarchy context (objective → outcome → output → indicators)
- **Async batch processing**: Final targeted analysis of pruned SRF outputs with retry logic and error handling

::: {.column-body}
![Three-stage Pipeline Overview](img/three-stage-pipeline-overview.png){fig-align="center" width="800px"}
:::

In [1]:
#| default_exp mappr

In [2]:
#| exports
from pathlib import Path
from functools import reduce
from toolslm.md_hier import *
from rich import print
import json
from fastcore.all import *
from enum import Enum
import logging
import uuid
from datetime import datetime
from typing import List, Callable
import dspy
from asyncio import Semaphore, gather, sleep
import time
from collections import defaultdict
import copy

from pydantic import BaseModel, Field
from typing import List

from evaluatr.frameworks import (EvalData, 
                                 IOMEvalData, 
                                 FrameworkInfo, 
                                 Framework,
                                 FrameworkCat,
                                 find_srf_output_by_id)

#from evaluatr.db_traces import TraceDB, Trace
from fastlite import database

from lisette import Chat, AsyncChat
import json

In [3]:
#| exports
from dotenv import load_dotenv
import os

load_dotenv()
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')

In [4]:
#| exports
cfg = AttrDict({
    'lm': 'gemini/gemini-2.0-flash',
    'api_key': GEMINI_API_KEY,
    'max_tokens': 8192,
    'track_usage': False,
    'call_delay': 0.1, # in seconds
    'semaphore': 30,
    'dirs': AttrDict({
        'data': '.evaluatr',
        'trace': 'traces'
    }),
    'verbosity': 1,
    'cache': AttrDict({
        'is_active': False,
        'delay': 0.05 # threshold in seconds below which we consider the response is cached
    }),
    'max_iter': 10
})

In [5]:
#| exports
traces_dir = Path.home() / cfg.dirs.data / cfg.dirs.trace
traces_dir.mkdir(parents=True, exist_ok=True)

In [6]:
#| eval: false
# class Trace:
#     id: int
#     timestamp: str
#     event: str
#     report_id: str
#     stage: str
#     framework: str
#     framework_category: str
#     framework_theme_id: str
#     data: dict

In [7]:
#| eval: false
# class TraceDB:
#     def __init__(self, db_path=None):
#         if db_path is None:
#             # db_path = Path.home() / cfg.dirs.data / "traces.db"
#             db_path = Path.home() / ".evaluatr/data/traces.db"
            
#         self.db = database(db_path)
#         self.traces = self.db.create(Trace, pk='id', transform=True)

In [8]:
#| eval: false
# db_traces = TraceDB()

In [9]:
#| exports
lm = dspy.LM(cfg.lm, api_key=cfg.api_key, cache=cfg.cache.is_active)
dspy.configure(lm=lm)

In [10]:
#| eval: false
doc = Path("../_data/md_library/49d2fba781b6a7c0d94577479636ee6f/abridged_evaluation_report_final_olta_ndoja_pdf/enriched")
pages = doc.ls(file_exts=".md").sorted(key=lambda p: int(p.stem.split('_')[1]))
report = '\n\n---\n\n'.join(page.read_text() for page in pages)
print(report[:1000])

## Hierarchical report navigation

Thanks to `toolslm.md_hier` and a clean markdown structure of a `report` markdown, we can create a nested dictionary of section, subsection, ... as follows:

In [11]:
#| eval: false
hdgs = create_heading_dict(report); hdgs

{'PPMi .... page 1': {},
 'CONTENTS .... page 3': {},
 '1. Introduction .... page 4': {},
 '2. Background of the JI-HoA .... page 5': {'2.1. Context and design of the JI-HoA .... page 5': {},
  '2.2. External factors affecting the implementation of the JI .... page 7': {}},
 '3. Methodology .... page 8': {},
 '4. Findings .... page 10': {'4.1. Relevance .... page 10': {'4.1.1. Relevance of programme activities for migrants, returnees, and communities .... page 10': {}},
  'Overall performance score for relevance: $3.9 / 5$ <br> Robustness score for the evidence: $4.5 / 5$': {'4.1.1.1 Needs of migrants .... page 10': {},
   '4.1.1.2 Needs of returnees .... page 10': {},
   '4.1.1.3 Needs of community members .... page 12': {},
   "4.1.2. Programme's relevance to the needs of stakeholders .... page 12": {'4.1.2.1 Needs of governments .... page 12': {},
    '4.1.2.2 Needs of other stakeholders .... page 13': {}},
   '4.2. Coherence .... page 13': {"4.2.1. The JI-HoA's alignment with the o

In [12]:
#| exports
def find_section_path(
    hdgs: dict, # The nested dictionary structure
    target_section: str # The section name to find
) -> list: # The nested key path for the given section name
    "Find the nested key path for a given section name."
    def search_recursive(current_dict, path=[]):
        for key, value in current_dict.items():
            current_path = path + [key]
            if key == target_section:
                return current_path
            if isinstance(value, dict):
                result = search_recursive(value, current_path)
                if result:
                    return result
        return None
    
    return search_recursive(hdgs)

Then we can retrieve the subsection path (list of nested headings to reach this specific section) in this nested `hdgs` dict :

In [13]:
#| eval: false
path = find_section_path(hdgs, "4.1.1.1 Needs of migrants .... page 10"); path

['4. Findings .... page 10',
 'Overall performance score for relevance: $3.9 / 5$ <br> Robustness score for the evidence: $4.5 / 5$',
 '4.1.1.1 Needs of migrants .... page 10']

Then retrieve the specific subsection content:

In [14]:
#| exports
def get_content_tool(
    hdgs: dict, # The nested dictionary structure
    keys_list: list, # The list of keys to navigate through
    ) -> str: # The content of the section
    "Navigate through nested levels using the exact key strings."
    return reduce(lambda current, key: current[key], keys_list, hdgs).text

In [15]:
#| eval: false
content = get_content_tool(hdgs, path)
print(content[:500])

In [16]:
#| exports
def flatten_sections(hdgs, path=[]):
    """Extract flat list of (key, full_path) tuples from nested hdgs"""
    sections = []
    for key, value in hdgs.items():
        current_path = path + [key]
        sections.append((key, current_path))
        if isinstance(value, dict):
            sections.extend(flatten_sections(value, current_path))
    return sections

In [17]:
#| exports
def extract_content(section_key: str, sections_lookup: dict, hdgs: dict) -> str:
    path = sections_lookup[section_key]
    return get_content_tool(hdgs, path)

## Formatters

We define here a set of function formatting both evaluation frameworks themes to analyze (SRF enablers, objectives, GCM objectives, ...) and traces.

In [18]:
#| exports
def format_enabler_theme(
    theme: EvalData # The theme object
    ) -> str: # The formatted theme string
    "Format SRF enabler into structured text for LM processing."
    parts = [
        f'## Enabler {theme.id}: {theme.title}',
        '### Description', 
        theme.description
    ]
    return '\n'.join(parts)

For instance: 

In [19]:
#| eval: false
eval_data = IOMEvalData()
data_evidence = eval_data.srf_enablers[3]  # "Data and evidence" is at index 3
print(format_enabler_theme(data_evidence))

In [20]:
#| exports
def format_crosscutting_theme(
    theme: EvalData # The theme object
    ) -> str: # The formatted theme string
    "Format SRF cross-cutting into structured text for LM processing."
    parts = [
        f'## Cross-cutting {theme.id}: {theme.title}',
        '### Description', 
        theme.description
    ]
    return '\n'.join(parts)

For instance:

In [21]:
eval_data = IOMEvalData()
env_sustainability = eval_data.srf_crosscutting_priorities[3]  # "Data and evidence" is at index 3
print(format_crosscutting_theme(env_sustainability))

In [22]:
#| exports
def format_gcm_theme(
    theme: dict # The GCM theme object from gcm_small
    ) -> str: # The formatted theme string
    "Format GCM objective into structured text for LM processing."
    parts = [
        f'## GCM Objective {theme["id"]}: {theme["title"]}',
        '### Core Theme', 
        theme["core_theme"]
    ]
    
    if theme.get("key_principles"):
        parts.extend(['### Key Principles', ', '.join(theme["key_principles"])])
    
    if theme.get("target_groups"):
        parts.extend(['### Target Groups', ', '.join(theme["target_groups"])])
        
    if theme.get("main_activities"):
        parts.extend(['### Main Activities', ', '.join(theme["main_activities"])])
    
    return '\n'.join(parts)

For instance:

In [23]:
#| eval: false
gcm_small = eval_data.gcm_objectives_small
print(format_gcm_theme(gcm_small[0]))

In [24]:
#| exports
def format_srf_output(output_context: dict) -> str:
    "Format SRF output with full hierarchical context for LM processing."
    parts = [
        f'## SRF Output {output_context["output"]["id"]}: {output_context["output"]["title"]}',
        '### Strategic Context',
        f'**Objective {output_context["objective"]["id"]}**: {output_context["objective"]["title"]}',
        f'**Long    -term Outcome {output_context["long_outcome"]["id"]}**: {output_context["long_outcome"]["title"]}',
        f'**Short-term Outcome {output_context["short_outcome"]["id"]}**: {output_context["short_outcome"]["title"]}'
    ]
    
    return '\n'.join(parts)

For instance:

In [25]:
#| eval: false
test_output_id = '1a11'
output_context = find_srf_output_by_id(eval_data, test_output_id)
if output_context:
    formatted = format_srf_output(output_context)
    print(formatted)

## Pydantic models

In [26]:
# class IdentifyAspectsOutput(BaseModel):
#     "Identify the key aspects of a theme"
#     key_aspects: List[str]
#     rationale: str

In [27]:
#| exports
class SelectSectionOutput(BaseModel):
    "Select the next most relevant section based on current evidence summary and gaps"
    section_index: int
    reasoning: str

In [28]:
# class IdentifyAspectsOutput(BaseModel):
#     "Identify the key aspects of a theme"
#     key_aspects: List[str]
#     rationale: str

In [29]:
#| exports
class SelectSectionOutput(BaseModel):
    "Select the next most relevant section based on current evidence summary and gaps"
    section_index: int
    reasoning: str

In [30]:
#| exports
class SummarizeContentOutput(BaseModel):
    "Summarize the content of a section and identify the key findings"
    summary: str
    key_findings: List[str]

In [31]:
#| exports
class EvaluateEvidenceOutput(BaseModel):
    theme_covered: bool
    coverage_reasoning: str
    gaps_identified: str
    should_continue: bool

In [32]:
#| exports
class State(BaseModel):
    theme: str
    prior_coverage_context: str = ""
    section_summaries: List[dict] = []  # Renamed from evidences
    explored_sections: List[str] = []
    available_sections: List[str]
    evaluation_history: List[dict] = []  # Track reasoning evolution
    iterations_completed: int = 0
    theme_covered: bool = False
    coverage_reasoning: str = ""
    gaps_identified: str = ""
    stop_reason: str = ""

## System prompts

In [33]:
# identify_aspects_sp = """You are analyzing an evaluation report theme to identify the CORE aspects that should be covered.

# Given a theme description, identify 3-5 essential aspects that capture the core of what an evaluation report must address.

# Be concise and focus on the quintessence - these aspects should be:
# - More concise than the theme itself
# - Specific enough to be measurable
# - Broad enough to capture meaningful coverage

# Output JSON with:
# - key_aspects: list of 3-5 core aspects (concise and essential)
# - rationale: brief explanation of why these aspects are critical
# """


In [34]:
#| exports
select_section_sp = """You are analyzing an evaluation report to determine if it covers a specific theme.

Given:
- The theme being analyzed
- Current evaluation state (summaries collected, evaluation reasoning so far)
- Available sections (as JSON array of [index, section_name] pairs)

Your task: Select the next section most likely to contain relevant evidence, considering:
- What gaps were identified in previous evaluations
- Which sections haven't been explored yet
- Where you're most likely to find missing evidence

Output JSON with:
- section_index: the index number from the pair (integer)
- reasoning: why you chose this section based on current gaps and needs
"""

In [35]:
#| exports
summarize_sp = """You are summarizing content from an evaluation report section.

Your task: Extract and condense the key points relevant to the theme being analyzed.

This summary will be used to:
- Maintain context across iterations without inflating the prompt
- Provide the evaluation step with essential information from this section

Keep it concise but capture:
- Main findings or claims related to the theme
- Supporting evidence (data, quotes, examples)
- Methodological details if relevant

Output JSON with:
- summary: concise summary of the content
- key_findings: list of specific findings relevant to the theme
"""

In [36]:
#| exports
evaluate_evidence_sp = """You are evaluating evidence collected from an evaluation report.

Given:
- The theme being analyzed
- Previous evidence summaries from explored sections
- Your own previous evaluation reasoning (if any) showing how your understanding evolved
- New content just extracted from the latest section
- Exploration progress (how many sections explored vs. available)

Your task: Assess whether the theme is adequately covered in the report based on all accumulated evidence.

Build on your previous reasoning - consider what gaps you identified before and whether new evidence addresses them.

A theme is considered "adequately covered" when:
- The report explicitly discusses the theme's core concepts with substantive detail
- There is concrete, verifiable evidence supporting claims (e.g., quantitative data, qualitative findings, interview quotes, case studies, triangulated sources)
- Claims are backed by methodology and data sources, not just assertions
- The depth and quality of evidence is sufficient to draw meaningful conclusions
- Superficial mentions or anecdotal references without supporting evidence are NOT sufficient

Be critical: distinguish between robust evidence-based coverage versus mere mentions or weak claims.

Before concluding a theme is adequately covered:
- Explore multiple sections to ensure comprehensive coverage
- Verify evidence from different parts of the report (not just one section)
- Be especially cautious on first 1-2 iterations - continue exploring unless coverage is exceptionally strong

Output JSON with:
- theme_covered: boolean, true if the theme is adequately addressed with strong evidence
- coverage_reasoning: detailed explanation citing specific evidence types and their strength/weakness
- gaps_identified: what aspects, evidence types, or depth are still missing
- should_continue: boolean, true if more exploration would be beneficial
"""


In [37]:
#| exports
def parse_response(result):
    "Extract JSON from Lisette response"
    return json.loads(result.choices[0].message.content)

In [38]:
# async def identify_key_aspects(
#     theme: str,  # The theme to analyze
#     model: str = 'gemini/gemini-2.0-flash'  # The model to use
# ) -> dict:  # The parsed JSON response
#     "Identify the key aspects of a theme"
#     chat = AsyncChat(model=model, sp=identify_aspects_sp, temp=0)
    
#     prompt = f"Theme to analyze:\n{theme}"
    
#     result = await chat(prompt, response_format=IdentifyAspectsOutput)
#     return parse_response(result)

In [39]:
# #| eval: false
# theme = """## Data and Evidence
# Organizations need robust systems for collecting, analyzing, and using data to inform decision-making. This includes establishing data governance frameworks, ensuring data quality and accuracy, building staff capacity for data analysis, and creating mechanisms for evidence-based policy development."""

# result = await identify_key_aspects(theme)
# print(f"Key aspects: {result['key_aspects']}")
# print(f"Rationale: {result['rationale']}")

In [40]:
#| exports
def format_sections_for_selection(available_sections: List[str]) -> str:
    "Format available sections as indexed JSON array"
    return json.dumps([
        [i+1, s] for i, s in enumerate(available_sections)
    ], indent=2)

In [41]:
#| exports
async def select_section(
    state: State,
    model: str = 'gemini/gemini-2.0-flash'
) -> dict:
    "Select next section to explore based on current state"
    chat = AsyncChat(model=model, sp=select_section_sp, temp=0)
    
    sections_json = format_sections_for_selection(state.available_sections)
    
    # Format evaluation history for context
    eval_summary = "\n".join([
        f"Iteration {ev['iteration']}: Theme covered={ev['theme_covered']}, Gaps: {ev['gaps_identified']}"
        for ev in state.evaluation_history
    ]) if state.evaluation_history else "No evaluations yet - initial exploration"
    
    parts = [
        state.prior_coverage_context,
        f"Theme being analyzed:\n{state.theme}",
        f"Evaluation history:\n{eval_summary}",
        f"Available sections:\n{sections_json}",
        f"Explored sections: {state.explored_sections}"
    ]
    prompt = "\n\n".join(p for p in parts if p)
    
    result = await chat(prompt, response_format=SelectSectionOutput)
    parsed = parse_response(result)
    
    section_key = state.available_sections[parsed['section_index'] - 1]
    return {'section_key': section_key, 'reasoning': parsed['reasoning']}

To give an example, we first create a state with some sections:

In [42]:
#| eval: false
# state = State(
#     theme="## Data and Evidence\nOrganizations need robust data systems...",
#     key_aspects=["Data governance", "Data quality", "Analytical capacity"],
#     available_sections=["4.1. Relevance", "4.2. Coherence", "4.3.1. Data availability"],
#     explored_sections=[],
#     confidence_score=0.0
# )

Then we select the next section to explore:

In [43]:
#| eval: false
# result = await select_section(state)
# print(f"Selected: {result['section_key']}")
# print(f"Reasoning: {result['reasoning']}")

To prevent context bloat across iterations, we'll store only summaries and key findings rather than full section contents in the state. The following function handles this summarization:

In [44]:
#| exports
async def summarize_content(
    state: State, # The current state of the analysis
    section_key: str, # The key of the section to summarize
    content: str, # The content of the section to summarize
    model: str = 'gemini/gemini-2.0-flash' # The model to use
) -> dict:
    "Summarize section content relevant to the theme"
    chat = AsyncChat(model=model, sp=summarize_sp, temp=0)
    
    parts = [
        state.prior_coverage_context,
        f"Theme: {state.theme}",
        f"Section: {section_key}",
        f"Content to summarize:\n{content}"
    ]
    prompt = "\n\n".join(p for p in parts if p)
    
    result = await chat(prompt, response_format=SummarizeContentOutput)
    return parse_response(result)

Assuming we have a state and selected section:

In [45]:
#| eval: false
# state = State(
#     theme="## Data and Evidence\nOrganizations need robust data systems...",
#     key_aspects=["Data governance", "Data quality", "Analytical capacity"],
#     available_sections=["4.1. Relevance", "4.3.1. Data availability"],
#     explored_sections=[]
# )

We can summarize the content of the section: 

In [46]:
#| eval: false
# section_key = "4.3.1. Data availability"
# content = """The JI-HoA faced significant gaps in migration data. 
# The Regional Data Hub produced 20 research outputs and engaged with National Statistical Offices.
# Data collection methodologies were harmonized across countries."""

# result = await summarize_content(state, section_key, content)
# print(f"Summary: {result['summary']}")
# print(f"Key findings: {result['key_findings']}")


In [47]:
#| exports
async def evaluate_evidence(
    state: State,
    new_content: str,
    model: str = 'gemini/gemini-2.0-flash'
) -> dict:
    "Evaluate evidence collected and determine if more exploration needed"
    chat = AsyncChat(model=model, sp=evaluate_evidence_sp, temp=0)
    
    # Format previous summaries
    prev_summaries = "\n\n".join([
        f"Section: {state.explored_sections[i]}\n"
        f"Summary: {state.section_summaries[i]['summary']}\n"
        f"Key findings: {', '.join(state.section_summaries[i]['key_findings'])}"
        for i in range(len(state.section_summaries))
    ]) if state.section_summaries else "None yet"
    
    # Format evaluation history
    prev_evaluations = "\n\n".join([
        f"Iteration {ev['iteration']}:\n"
        f"Theme covered: {ev['theme_covered']}\n"
        f"Reasoning: {ev['coverage_reasoning']}\n"
        f"Gaps: {ev['gaps_identified']}"
        for ev in state.evaluation_history
    ]) if state.evaluation_history else "First evaluation - no previous assessments"
    
    parts = [
        state.prior_coverage_context,
        f"Theme being analyzed:\n{state.theme}",
        f"Previous evidence summaries:\n{prev_summaries}",
        f"Previous evaluation reasoning:\n{prev_evaluations}",
        f"Exploration progress: {len(state.explored_sections)} sections explored out of {len(state.explored_sections) + len(state.available_sections)} total available",
        f"New content to evaluate:\n{new_content}"
    ]
    prompt = "\n\n".join(p for p in parts if p)
    
    result = await chat(prompt, response_format=EvaluateEvidenceOutput)
    return parse_response(result)

For example, let's consider a state after exploring one section:

In [48]:
# #| eval: false
# state = State(
#     theme="## Data and Evidence\nOrganizations need robust data systems...",
#     key_aspects=["Data governance", "Data quality", "Analytical capacity"],
#     explored_sections=["4.3.1. Data availability"],
#     evidences=[{
#         'summary': 'Regional Data Hub produced 20 research outputs',
#         'key_findings': ['Data collection harmonized', 'NSO engagement']
#     }],
#     available_sections=["4.1. Relevance"],
#     confidence_score=0.33
# )

In [49]:
#| eval: false
# new_content = """The programme established data governance frameworks in Ethiopia, Somalia, and Sudan. 
# National stakeholders received training on data management protocols and reporting standards. 
# However, political instability and staff turnover undermined capacity building efforts. 
# Survey results showed 97% of stakeholders reported increased knowledge on return and reintegration issues."""


# result = await evaluate_evidence(state, new_content)
# print(f"Aspects covered: {result['aspects_covered']}")
# print(f"Should continue: {result['should_continue']}")
# print(f"Gaps: {result['gaps_identified']}")

In [50]:
#| exports
async def limit(semaphore, coro, delay=None):
    "Execute coroutine with semaphore concurrency control"
    async with semaphore:
        result = await coro
        if delay: await sleep(delay)
        return result

For instance, let's create

- a semaphore limiting to 3 concurrent calls:

In [51]:
#| eval: false
sem = Semaphore(3)

- 5 different themes to analyze:

In [52]:
#| eval: false
themes = [
    "## Data and Evidence\nOrganizations need robust data systems...",
    "## Workforce\nStaff capacity and skills development...",
    "## Partnerships\nCollaboration with stakeholders...",
    "## Gender Equality\nGender mainstreaming in programs...",
    "## Innovation\nAdopting new technologies and approaches..."
]

Then, we can run all 5 in parallel, but max 3 concurrent LLM calls:

In [53]:
#| eval: false
# results = await gather(*[
#     limit(sem, identify_key_aspects(theme)) 
#     for theme in themes
# ])

# for i, result in enumerate(results):
#     print(f"Theme {i+1}: {result['key_aspects']}")

## Mapping a theme

In [54]:
#| exports
class Stage(Enum):
    "Pipeline stage number."
    STAGE1 = "stage1"
    STAGE2 = "stage2"
    STAGE3 = "stage3"
    def __str__(self): return self.value

In [55]:
#| exports
class TraceContext(AttrDict):
    "Context for tracing the mapping process"
    def __init__(self, 
                 report_id:str,  # Report identifier
                 stage:Stage,  # Pipeline stage number
                 framework:FrameworkInfo,  # Framework info (name, category, theme_id)
                 ): 
        # self.run_id = str(uuid.uuid4())[:8]  # Short unique ID
        store_attr()

    def __repr__(self):
        return f"TraceContext(report_id={self.report_id}, stage={self.stage}, framework={self.framework})"


In [56]:
#| eval: false
tr_ctx = TraceContext(
    report_id='49d2fba781b6a7c0d94577479636ee6f', 
    stage=Stage.STAGE1, 
    framework=FrameworkInfo(Framework.SRF, FrameworkCat.ENABLERS, "4")
    )

tr_ctx

```json
TraceContext(report_id=49d2fba781b6a7c0d94577479636ee6f, stage=stage1, framework={'category': 'Enablers', 'theme_id': '4', 'name': 'SRF'})
```

In [57]:
#| exports
def setup_logger(name, handler, level=logging.INFO, **kwargs):
    "Helper function to setup a logger with common configuration"
    logger = logging.getLogger(name)
    logger.handlers.clear()
    logger.addHandler(handler)
    logger.setLevel(level)
    for k,v in kwargs.items(): setattr(logger, k, v)
    return logger

In [58]:
#| exports
def setup_trace_logging(report_id, verbosity=cfg.verbosity):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f'{report_id}_{timestamp}.jsonl'
    file_handler = logging.FileHandler(traces_dir / filename, mode='w')
    setup_logger('trace.file', file_handler)    
    console_handler = logging.StreamHandler()
    setup_logger('trace.console', console_handler, verbosity=verbosity)

In [59]:
#| exports
def log_analysis_event(event: str, trace_ctx: TraceContext, **extra_data):
    """Log an analysis event to file and console with different verbosity levels"""
    file_logger = logging.getLogger('trace.file')
    console_logger = logging.getLogger('trace.console')
    
    base_data = {
        "timestamp": datetime.now().isoformat(),
        "event": event,
        "report_id": trace_ctx.report_id,
        "stage": str(trace_ctx.stage),
        "framework": str(trace_ctx.framework.name),
        "framework_category": str(trace_ctx.framework.category),
        "framework_theme_id": str(trace_ctx.framework.theme_id),
    }
    base_data.update(extra_data)
    
    # File logger - always full JSON
    file_logger.info(json.dumps(base_data, indent=2))
    
    # Console logger - verbosity-based formatting
    if hasattr(console_logger, 'verbosity'):
        if console_logger.verbosity == 1:
            console_msg = f"{base_data['report_id']} - {base_data['stage']}"
        elif console_logger.verbosity == 2:
            console_msg = f"{base_data['report_id']} - {base_data['stage']} - {base_data['event']}"
        else:  # verbosity == 3
            console_msg = json.dumps(base_data, indent=2)
        
        console_logger.info(console_msg)

In [60]:
async def analyze_theme(
    theme: str,
    sections_lookup: dict,
    hdgs: dict,
    semaphore: Semaphore,
    select_fn: Callable = select_section,
    summarize_fn: Callable = summarize_content,
    evaluate_fn: Callable = evaluate_evidence,
    log_fn: Callable = None,
    prior_coverage_context: str = "",
    max_iterations: int = 5,
    model: str = 'gemini/gemini-2.0-flash'
):
    "Analyze if a theme is covered in the evaluation report"
    log = log_fn or (lambda event, **kw: None)
    
    log("Starting Analysis", theme=theme)
    
    # Initialize state
    state = State(
        theme=theme,
        prior_coverage_context=prior_coverage_context,
        available_sections=list(sections_lookup.keys())
    )
    
    # Iterative exploration
    for i in range(max_iterations):
        log("Iteration Start", iteration=i+1)
        
        # Select section
        selected = await limit(semaphore, select_fn(state, model))
        log("Section Selected", 
            iteration=i+1,
            section=selected['section_key'], 
            reasoning=selected['reasoning'])
                
        # Extract content
        path = sections_lookup.get(selected['section_key'])
        if not path:
            log("Section Not Found", 
                iteration=i+1,
                section=selected['section_key'])
            continue
        content = get_content_tool(hdgs, path)
        
        # Summarize
        summary = await limit(semaphore, summarize_fn(state, selected['section_key'], content, model))
        log("Content Summarized", 
            iteration=i+1,
            section=selected['section_key'],
            summary=summary['summary'],
            key_findings=summary['key_findings'])
        
        # Update state with summary and section BEFORE evaluation
        state.section_summaries.append(summary)
        state.explored_sections.append(selected['section_key'])
        state.available_sections.remove(selected['section_key'])
        
        # Evaluate
        evaluation = await limit(semaphore, evaluate_fn(state, content, model))
        log("Evidence Evaluated",
            iteration=i+1,
            theme_covered=evaluation['theme_covered'],
            coverage_reasoning=evaluation['coverage_reasoning'],
            gaps_identified=evaluation['gaps_identified'],
            should_continue=evaluation['should_continue'])
        
        if len(state.evaluation_history) > 1:
            prev = state.evaluation_history[-2]
            log("Understanding Progression",
                iteration=i+1,
                previous_covered=prev['theme_covered'],
                current_covered=evaluation['theme_covered'],
                previous_gaps=prev['gaps_identified'],
                current_gaps=evaluation['gaps_identified'])
            
        # Update state with evaluation results
        state.evaluation_history.append({
            'iteration': i + 1,
            'theme_covered': evaluation['theme_covered'],
            'coverage_reasoning': evaluation['coverage_reasoning'],
            'gaps_identified': evaluation['gaps_identified']
        })
        
        state.theme_covered = evaluation['theme_covered']
        state.coverage_reasoning = evaluation['coverage_reasoning']
        state.gaps_identified = evaluation['gaps_identified']
            
        log("State Updated",
            iteration=i+1,
            explored_sections=state.explored_sections,
            evidence_count=len(state.section_summaries),
            remaining_sections=len(state.available_sections),
            theme_covered=state.theme_covered,
            current_gaps=state.gaps_identified)
        
        # Check stopping
        if not evaluation['should_continue']:
            state.stop_reason = "sufficient_evidence"
            log("Analysis Complete", 
                iteration=i+1,
                reason=state.stop_reason)
            break
    else:
        state.stop_reason = "max_iterations"
        log("Analysis Complete", 
            reason=state.stop_reason,
            iterations_completed=state.iterations_completed,
            theme_covered=state.theme_covered,
            final_reasoning=state.coverage_reasoning,
            sections_explored_count=len(state.explored_sections),
            total_sections=len(sections_lookup))

    
    return state


In [61]:
# 1. Setup trace logging
setup_trace_logging(report_id="49d2fba781b6a7c0d94577479636ee6f", verbosity=3)

In [62]:
# 2. Prepare document structure
hdgs = create_heading_dict(report)
sections_lookup = {key: path for key, path in flatten_sections(hdgs)}

In [63]:
# 3. Setup trace context
trace_ctx = TraceContext(
    report_id="49d2fba781b6a7c0d94577479636ee6f",
    stage=Stage.STAGE1,
    framework=FrameworkInfo(Framework.SRF, FrameworkCat.ENABLERS, "4")
)

In [64]:
# 4. Create log function
log_fn = lambda event, **kw: log_analysis_event(event, trace_ctx, **kw)

In [65]:
# 5. Prepare theme
theme = format_enabler_theme(eval_data.srf_enablers[3])  # Data and evidence
print(theme)

In [66]:
# 6. Run analysis
sem = Semaphore(30)
result = await analyze_theme(
    theme=theme,
    sections_lookup=sections_lookup,
    hdgs=hdgs,
    semaphore=sem,
    log_fn=log_fn
)

{
  "timestamp": "2025-10-04T14:42:57.484963",
  "event": "Starting Analysis",
  "report_id": "49d2fba781b6a7c0d94577479636ee6f",
  "stage": "stage1",
  "framework": "SRF",
  "framework_category": "Enablers",
  "framework_theme_id": "4",
  "theme": "## Enabler 4: Data and evidence\n### Description\nIOM will be the pre-eminent source of migration and displacement data for action, which help save lives and deliver solutions; data for insight, which help facilitate regular migration pathways; and data for foresight, which help drive anticipatory action. IOM will have the systems and data fluency to collect, safely store, analyze, share and apply disaggregated data and evidence across the mobility spectrum. Our extensive data and research repositories will underpin evidence-based policies and practices. Data will be central to the internal decision-making and management of the Organization."
}
{
  "timestamp": "2025-10-04T14:42:57.486124",
  "event": "Iteration Start",
  "report_id": "49d2

In [None]:
# 7. Review results
print(f"\nFinal Results:")
print(f"Confidence: {result.confidence_score:.2f}")
print(f"Explored sections: {result.explored_sections}")

## Signatures

A [DSPy signature](https://dspy.ai/learn/programming/signatures) is a declarative specification of input/output behavior of a DSPy module. Signatures allow you to tell the LM (Language Model) what it needs to do, rather than specify how we should ask the LM to do it.

In [None]:
#| exports
class SectionSelection(dspy.Signature):
    "Choose the next most relevant section based on current evidence summary and gaps."
    theme: str = dspy.InputField(desc="Theme being analyzed")
    evidence_summary: str = dspy.InputField(desc="Current summary of key evidence", default="No evidence collected yet - beginning analysis")
    gaps_identified: str = dspy.InputField(desc="Knowledge gaps to address", default="No gaps identified yet - initial exploration")
    all_headings: str = dspy.InputField(desc="Complete document structure")
    sections_explored: str = dspy.InputField(desc="Sections already explored", default="")
    next_section: str = dspy.OutputField(desc="Next section key to explore - must be an exact key from all_headings and NOT in sections_explored, or 'DONE'")
    reasoning: str = dspy.OutputField(desc="Why this section was chosen")

In [None]:
# hdgs

In [None]:
#| eval: false
# Test the signature
# selector = dspy.ChainOfThought(SectionSelection)
# result = selector(
#    theme = format_enabler_theme(env_sustainability),
#    all_headings=str(hdgs)
# )
# print(f"Selected: {result.next_section}")
# print(f"Reasoning: {result.reasoning}")

In [None]:
#| exports
class Assessment(dspy.Signature):
    "Assess evidence sufficiency and update running summary by incorporating new evidence. Calculate confidence as coverage completeness percentage."
    theme: str = dspy.InputField(desc="Theme being analyzed with key aspects to cover")
    evidence_summary: str = dspy.InputField(desc="Current summary of key evidence", default="No evidence collected yet - beginning analysis")
    gaps_identified: str = dspy.InputField(desc="Knowledge gaps from previous assessment", default="No gaps identified yet - initial exploration")
    new_evidence: str = dspy.InputField(desc="New evidence just collected from the latest section")
    sections_explored: str = dspy.InputField(desc="Sections already checked", default="")
    sufficient: bool = dspy.OutputField(desc="Is evidence sufficient?")
    confidence_score: float = dspy.OutputField(desc="Coverage completeness: 0.0-1.0 representing what percentage of theme's key aspects have been addressed")
    updated_evidence_summary: str = dspy.OutputField(desc="Updated summary incorporating the new evidence")
    updated_gaps: str = dspy.OutputField(desc="Updated knowledge gaps after reviewing new evidence")
    reasoning: str = dspy.OutputField(desc="Assessment reasoning including which key aspects are covered/missing")



We treat observability and LLM evaluation as core requirements for our mapping pipeline. While DSPy's built-in `dspy.inspect_history()` provides valuable reasoning chains, we enhance it with structured metadata (`report_id`, `phase`, `framework`) to create comprehensive audit trails. This enriched tracing enables systematic evaluation of mapping accuracy, supports human evaluator annotation workflows, and provides the detailed context necessary for debugging and improving our LLM-based document analysis system. 

We define below enum and configuration classes for pipeline tracing and validation. These provide structured metadata for audit trails and evaluation.

In [None]:
#| exports
class Stage(Enum):
    "Pipeline stage number."
    STAGE1 = "stage1"
    STAGE2 = "stage2"
    STAGE3 = "stage3"
    def __str__(self): return self.value
#| exports
class TraceContext(AttrDict):
    "Context for tracing the mapping process"
    def __init__(self, 
                 report_id:str,  # Report identifier
                 stage:Stage,  # Pipeline stage number
                 framework:FrameworkInfo,  # Framework info (name, category, theme_id)
                 ): 
        # self.run_id = str(uuid.uuid4())[:8]  # Short unique ID
        store_attr()

    def __repr__(self):
        return f"TraceContext(report_id={self.report_id}, stage={self.stage}, framework={self.framework})"
#| eval: false
tr_ctx = TraceContext(
    report_id='49d2fba781b6a7c0d94577479636ee6f', 
    stage=Stage.STAGE1, 
    framework=FrameworkInfo(Framework.SRF, FrameworkCat.ENABLERS, "4")
    )

tr_ctx

In [None]:
#| exports
class Synthesis(dspy.Signature):
    "Provide detailed rationale and synthesis of theme analysis."
    trace_ctx: str = dspy.InputField(desc="Trace context")
    theme: str = dspy.InputField(desc="Theme being analyzed")
    evidence_summary: str = dspy.InputField(desc="Final summary of key evidence")
    gaps_identified: str = dspy.InputField(desc="Final knowledge gaps")
    sections_explored: str = dspy.InputField(desc="List of sections explored")
    theme_covered: bool = dspy.OutputField(desc="Final decision on theme coverage")
    confidence_explanation: str = dspy.OutputField(desc="Detailed explanation of confidence score")
    evidence_summary: str = dspy.OutputField(desc="Key evidence supporting the conclusion")
    gaps_identified: str = dspy.OutputField(desc="Any gaps or missing aspects")

## Reasoning & Acting (ReAct)

**Why We Built a Custom Iterative Analyzer Instead of Using DSPy ReAct?**

We could have leveraged DSPy's built-in [`ReAct` module](https://dspy.ai/api/modules/ReAct), which provides an agent-based approach where the LLM automatically decides when and how to use exploration tools. The "ReAct" concept has been introduced in [this paper](https://arxiv.org/pdf/2210.03629). However, we chose to implement our own iterative analyzer from scratch for several critical reasons:

- **Open-ended vs. Structured Nature**: DSPy's ReAct is designed for open-ended problem solving where the agent explores freely using available tools. Our use case requires a more structured, methodical approach to document analysis with predictable exploration patterns.

- **Document-Specific Control**: Our approach is tailored specifically for structured document exploration with hierarchical headings, allowing us to implement domain-specific logic for section navigation and content retrieval.

- **Evaluator Requirements**: Since traces will be reviewed by human evaluators for error analysis, we needed explicit, step-by-step decision logging rather than the more implicit reasoning chains that ReAct provides.

In [None]:
#| exports
def setup_logger(name, handler, level=logging.INFO, **kwargs):
    "Helper function to setup a logger with common configuration"
    logger = logging.getLogger(name)
    logger.handlers.clear()
    logger.addHandler(handler)
    logger.setLevel(level)
    for k,v in kwargs.items(): setattr(logger, k, v)
    return logger

In [None]:
#| exports
def setup_trace_logging(report_id, verbosity=cfg.verbosity):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f'{report_id}_{timestamp}.jsonl'
    file_handler = logging.FileHandler(traces_dir / filename, mode='w')
    setup_logger('trace.file', file_handler)    
    console_handler = logging.StreamHandler()
    setup_logger('trace.console', console_handler, verbosity=verbosity)

In [None]:
#| exports
class ThemeAnalyzer(dspy.Module):
    """
    Analyzes a theme across a document by iteratively exploring sections, collecting evidence, and synthesizing findings. 
    Uses a structured pipeline of section selection -> assessment -> synthesis.
    """
    def __init__(self, 
                 section_selection_sig: dspy.Signature,
                 assessment_sig: dspy.Signature, 
                 synthesis_sig: dspy.Signature, 
                 trace_ctx: TraceContext,
                 confidence_threshold: float = 0.8,
                 max_iter: int = cfg.max_iter,
                 semaphore = None):
        self.section_selector = dspy.ChainOfThought(section_selection_sig)
        self.assess = dspy.ChainOfThought(assessment_sig)
        self.synthesize = dspy.ChainOfThought(synthesis_sig)
        self.max_iter = max_iter
        self.trace_ctx = trace_ctx
        self.confidence_threshold = confidence_threshold
        self.semaphore = semaphore


In [None]:
#| exports
@patch
async def aforward(
    self:ThemeAnalyzer, 
    theme: str, # The formatted theme to analyze
    hdgs: dict, # The headings TOC of the document
    get_content_fn: Callable = get_content_tool, # The function to get the content of a section using `hdgs[keys_list].text` for instance
    prior_coverage_context: str = "" # The themes already covered in this report, indicating its scope and analytical focus
) -> Synthesis:
    "Executes a structured analysis process."
    self._log_trace(event="Starting Analysis", theme=theme)
    
    # Main iterative exploration
    evidence = await self.explore_iteratively(theme, hdgs, get_content_fn, prior_coverage_context)
    
    # Final synthesis with summary and gaps from last assessment
    return await self.synthesize_findings(
        theme, 
        evidence["final_summary"], 
        evidence["final_gaps"], 
        evidence["sections"], 
        prior_coverage_context
    )

In [None]:
#| exports
@patch
async def explore_iteratively(
    self:ThemeAnalyzer, 
    theme: str,
    hdgs: dict,
    get_content_fn: Callable,
    prior_coverage_context: str = ""
) -> dict:
    "Iteratively explore sections to collect evidence."
    evidence_collected = []
    sections_explored = []
    evidence_summary = "No evidence collected yet - beginning analysis"
    gaps = "No gaps identified yet - initial exploration"
    
    for i in range(self.max_iter):
        # 1. Select next section
        decision = await self.select_next_section(
            theme, evidence_summary, gaps, str(hdgs), sections_explored, prior_coverage_context)
        
        if decision.next_section == 'DONE':
            self._log_trace(event="Iterative Exploration", iteration_nb=i+1, decision="Done")
            break
            
        # # 2. Process section
        # evidence_collected, sections_explored = self.process_section(
        #     decision, hdgs, get_content_fn, evidence_collected, sections_explored, [])
        
        # # 3. Assess and update summary/gaps
        # assessment = await self.assess_evidence(
        #     theme, evidence_summary, gaps, sections_explored, prior_coverage_context)
        # 2. Process section
        old_evidence_count = len(evidence_collected)
        evidence_collected, sections_explored = self.process_section(decision, hdgs, get_content_fn, evidence_collected, sections_explored, [])

        # Extract new evidence
        new_evidence = evidence_collected[old_evidence_count:] if len(evidence_collected) > old_evidence_count else ""
        new_evidence_text = "\n".join(new_evidence) if new_evidence else "No new evidence found"

        # 3. Assess and update summary/gaps  
        assessment = await self.assess_evidence(
            theme, evidence_summary, gaps, new_evidence_text, sections_explored, prior_coverage_context)
        evidence_summary = assessment.updated_evidence_summary
        gaps = assessment.updated_gaps
        
        if assessment.sufficient and assessment.confidence_score > self.confidence_threshold:
            break
    
    return {
        "evidence": evidence_collected,
        "sections": sections_explored,
        "final_summary": evidence_summary,
        "final_gaps": gaps
    }


In [None]:
#| exports
@patch
async def assess_evidence(
    self:ThemeAnalyzer, 
    theme: str,
    evidence_summary: str,
    gaps: str,
    new_evidence: str,
    sections_explored: list,
    prior_coverage_context: str = ""
):
    assessment = await self._rate_limited_fn(
        self.assess,
        theme=theme,
        evidence_summary=evidence_summary,
        gaps_identified=gaps,
        new_evidence=new_evidence,
        sections_explored=str(sections_explored),
        prior_coverage_context=prior_coverage_context
    )
    
    # Log the assessment
    self._log_trace(
        event="Evidence Assessment",
        sufficient=assessment.sufficient,
        confidence=assessment.confidence_score,
        updated_evidence_summary=assessment.updated_evidence_summary,
        updated_gaps=assessment.updated_gaps,
        sections_explored=sections_explored, 
        reasoning=assessment.reasoning
    )
    
    return assessment


In [None]:
#| exports
@patch
def process_section(
    self:ThemeAnalyzer, 
    decision:SectionSelection, # The next section to explore
    hdgs: dict, # The headings TOC of the document
    get_content_fn: Callable, # The function to get the content of a section using `hdgs[keys_list].text` for instance
    evidence_collected: list, # The evidence collected so far
    sections_explored: list, # The sections explored so far
    available_sections: list # Not used anymore but kept for compatibility
):
    evidence_collected = evidence_collected.copy()
    sections_explored = sections_explored.copy()
    
    path = find_section_path(hdgs, decision.next_section)
    if path:
        content = get_content_fn(hdgs, path)
        evidence_collected.append(f"# Section: {decision.next_section}\n## Content\n{content}")
        sections_explored.append(decision.next_section)
        self._log_trace(
            event="Section Found", 
            section=decision.next_section
        )
    else:
        self._log_trace(
            event="Section Not Found", 
            section=decision.next_section, 
            warning="No path found for section"
        )
    
    return evidence_collected, sections_explored

In [None]:
#| exports
@patch
async def select_next_section(
    self:ThemeAnalyzer, 
    theme: str, # The formatted theme to analyze
    evidence_summary: str, # The summary of the evidence collected so far
    gaps: str, # The gaps identified so far
    hdgs: dict, # The headings TOC of the document
    sections_explored: list, # The sections explored so far
    prior_coverage_context: str = "" # The themes already covered in this report, indicating its scope and analytical focus
):
    decision = await self._rate_limited_fn(
        self.section_selector,
        theme=theme,
        evidence_summary=evidence_summary,
        gaps_identified=gaps,
        all_headings=str(hdgs),
        sections_explored=str(sections_explored),
        prior_coverage_context=prior_coverage_context
    )
    
    # Log the section selection
    self._log_trace(
        event="Section Selection",
        selected_section=decision.next_section,
        reasoning=decision.reasoning
    )
    
    return decision

In [None]:
#| exports
@patch
async def synthesize_findings(
    self:ThemeAnalyzer, 
    theme: str,
    evidence_summary: str,
    gaps: str,
    sections_explored: list,
    prior_coverage_context: str = ""
):
    synthesis = await self._rate_limited_fn(
        self.synthesize,
        trace_ctx=str(self.trace_ctx),
        theme=theme,
        evidence_summary=evidence_summary,
        gaps_identified=gaps,
        sections_explored=str(sections_explored),
        prior_coverage_context=prior_coverage_context
    )
    
    # Log synthesis results
    self._log_trace(
        event="Synthesis",
        theme_covered=synthesis.theme_covered,
        confidence_explanation=synthesis.confidence_explanation,
        evidence_summary=synthesis.evidence_summary,
        gaps_identified=synthesis.gaps_identified
    )
    
    # Add framework metadata
    synthesis.framework_name = self.trace_ctx.framework.name
    synthesis.framework_category = self.trace_ctx.framework.category  
    synthesis.framework_theme_id = self.trace_ctx.framework.theme_id
    return synthesis


In [None]:
#| exports
@patch
def _log_trace(self:ThemeAnalyzer, event, **extra_data):
    file_logger = logging.getLogger('trace.file')
    console_logger = logging.getLogger('trace.console')
    
    base_data = {
        "timestamp": datetime.now().isoformat(),
        "event": event,
        "report_id": self.trace_ctx.report_id,
        "stage": str(self.trace_ctx.stage),
        "framework": str(self.trace_ctx.framework.name),
        "framework_category": str(self.trace_ctx.framework.category),
        "framework_theme_id": str(self.trace_ctx.framework.theme_id),
    }
    base_data.update(extra_data)
    
    # File logger - always full JSON
    file_logger.info(json.dumps(base_data, indent=2))
    
    # Console logger - verbosity-based formatting
    if hasattr(console_logger, 'verbosity'):
        if console_logger.verbosity == 1:
            console_msg = f"{base_data['report_id']} - {base_data['stage']}"
        elif console_logger.verbosity == 2:
            console_msg = f"{base_data['report_id']} - {base_data['stage']} - {base_data['framework']} - {base_data['framework_category']} - {base_data['framework_theme_id']} - {base_data['event']}"
        else:  # verbosity == 3
            console_msg = json.dumps(base_data, indent=2)
        
        console_logger.info(console_msg)

In [None]:
#| exports
@patch    
async def _rate_limited_fn(self:ThemeAnalyzer, mod, **kwargs):
    async with self.semaphore:
        start = time.time()
        result = await mod.acall(**kwargs)
        
        # Check if cached (fast response + no usage)
        elapsed = time.time() - start
        if elapsed > cfg.cache.delay: await sleep(cfg.call_delay)
        return result

To use it:

#### Single theme

Setup the trace logging (verbosity and report_id):

In [None]:
#| eval: false
setup_trace_logging(report_id="49d2fba781b6a7c0d94577479636ee6f", verbosity=3)

In [None]:
#| eval: false
# Number of concurrent requests to the LM to avoid rate limiting
stage1_semaphore = Semaphore(cfg.semaphore)

Create the analyzer:

In [None]:
#| eval: false
print(f"Trace Context:\n{tr_ctx}")
theme = format_enabler_theme(eval_data.srf_enablers[3])  # "Data and evidence"
print(f"Test theme:\n{theme}")
analyzer = ThemeAnalyzer(SectionSelection, Assessment, Synthesis, tr_ctx, semaphore=stage1_semaphore)
# analyzer = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, tr_ctx, semaphore=stage1_semaphore)

Then analyze the framework's theme of choice:

In [None]:
#| eval: false
result = await analyzer.acall(theme, hdgs, get_content_tool)

{
  "timestamp": "2025-09-29T23:38:28.253809",
  "event": "Starting Analysis",
  "report_id": "49d2fba781b6a7c0d94577479636ee6f",
  "stage": "stage1",
  "framework": "SRF",
  "framework_category": "Enablers",
  "framework_theme_id": "4",
  "theme": "## Enabler 4: Data and evidence\n### Description\nIOM will be the pre-eminent source of migration and displacement data for action, which help save lives and deliver solutions; data for insight, which help facilitate regular migration pathways; and data for foresight, which help drive anticipatory action. IOM will have the systems and data fluency to collect, safely store, analyze, share and apply disaggregated data and evidence across the mobility spectrum. Our extensive data and research repositories will underpin evidence-based policies and practices. Data will be central to the internal decision-making and management of the Organization."
}
{
  "timestamp": "2025-09-29T23:38:29.174350",
  "event": "Section Selection",
  "report_id": "49

In [None]:
#| eval: false
path = find_section_path(hdgs, "Data availability .... page 16")
print(f"Path found: {path}")

In [2]:
#| eval: false
result

NameError: name 'result' is not defined

#### Multiple themes in parallel

In [None]:
#| eval: false
setup_trace_logging(report_id="49d2fba781b6a7c0d94577479636ee6f", verbosity=2)

Let's use two SRF enablers:

In [None]:
#| eval: false
tr_ctx1 = TraceContext(
    report_id='49d2fba781b6a7c0d94577479636ee6f', 
    phase=Phase.STAGE1, 
    framework=FrameworkInfo(Framework.SRF, FrameworkCat.ENABLERS, "1")
)
tr_ctx2 = TraceContext(
    report_id='49d2fba781b6a7c0d94577479636ee6f', 
    phase=Phase.STAGE1, 
    framework=FrameworkInfo(Framework.SRF, FrameworkCat.ENABLERS, "4")
    )   
print(tr_ctx1, tr_ctx2)

Create analyzers with shared semaphore:

In [None]:
#| eval: false
stage_semaphore = Semaphore(cfg.semaphore)
analyzer1 = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, tr_ctx1,semaphore=stage_semaphore)
analyzer2 = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, tr_ctx2,semaphore=stage_semaphore)

In [None]:
#| eval: false
theme1 = format_enabler_theme(eval_data.srf_enablers[0]) # Workforce
theme2 = format_enabler_theme(eval_data.srf_enablers[3]) # Data and evidence
print(f"Theme 1:\n{theme1}\n\nTheme 2:\n{theme2}")

In [None]:
#| eval: false
results = await gather(
    analyzer1.acall(theme1, hdgs, get_content_tool),
    analyzer2.acall(theme2, hdgs, get_content_tool)
)

49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Should stop exploring
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Ex

In [None]:
#| eval: false
results

[Prediction(
     reasoning="The theme focuses on IOM's workforce as a valuable asset and outlines several key areas for improvement, including workforce planning, people management, professional development, workplace experience, flexible systems, staff safety and security, inclusivity, and leadership.\n\nSince no evidence has been provided yet, it's impossible to determine the extent to which this theme is covered in the available data. Therefore, I must mark it as not covered.",
     theme_covered=False,
     confidence_explanation='Confidence is very low (10%) because there is absolutely no evidence provided to assess the coverage of this theme. Without any data, it is impossible to determine if the report addresses any of the aspects mentioned in the theme description.',
     evidence_summary='No evidence was provided.',
     gaps_identified='The primary gap is the complete absence of evidence related to the "Workforce" theme. This includes any information on workforce planning, p

## Pipeline Orchestrator

In [None]:
#| exports
class PipelineResults(dict):
    def __init__(self):
        super().__init__()
        self[Phase.STAGE1] = defaultdict(lambda: defaultdict(dict))
        self[Phase.STAGE2] = defaultdict(lambda: defaultdict(dict))
        self[Phase.STAGE3] = defaultdict(lambda: defaultdict(dict))

In [None]:
#| exports
@patch
def __call__(self:PipelineResults, stage=Phase.STAGE1, filter_type="all"):
    themes = []
    for frameworks in self[stage].values():
        for categories in frameworks.values():
            for theme in categories.values():
                if filter_type == "all" or \
                   (filter_type == "covered" and theme.theme_covered) or \
                   (filter_type == "uncovered" and not theme.theme_covered):
                    themes.append(theme)
    return themes

In [None]:
#| exports
class PipelineOrchestrator:
    "Orchestrator for the IOM evaluation report mapping pipeline"
    def __init__(self, 
                 report_id:str, # Report identifier
                 headings:dict, # Report headings
                 get_content_fn:Callable, # Function to get the content of a section
                 eval_data:EvalData, # Evaluation data
                 verbosity:int=2, # Verbosity level
                 ):
        store_attr()
        setup_trace_logging(report_id, verbosity)
        self.results = PipelineResults()

In [None]:
#| exports
@patch
async def run_stage1(self:PipelineOrchestrator, semaphore):
    "Run stage 1 of the pipeline"
    setup_trace_logging(self.report_id, self.verbosity)
    analyzers = []
    
    collections = [
        (self.eval_data.srf_enablers, FrameworkCat.ENABLERS, format_enabler_theme),
        (self.eval_data.srf_crosscutting_priorities, FrameworkCat.CROSSCUT, format_crosscutting_theme)
    ]

    for items, framework_cat, format_fn in collections:
        for item in items:
            trace_ctx = TraceContext(self.report_id, Phase.STAGE1, FrameworkInfo(Framework.SRF, framework_cat, item.id))
            theme = format_fn(item)
            analyzer = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, trace_ctx, semaphore=semaphore)
            analyzers.append((analyzer, theme))

    results = await gather(*[analyzer.acall(theme, self.headings, self.get_content_fn) 
                             for analyzer, theme in analyzers])
    for result in results: 
        self.results[Phase.STAGE1][result.framework_name][result.framework_category][result.framework_theme_id] = result

In [None]:
#| eval: false
report_id = "49d2fba781b6a7c0d94577479636ee6f"
hdgs = create_heading_dict(report)
get_content_fn = get_content_tool
eval_data = IOMEvalData()

orchestrator = PipelineOrchestrator(report_id, hdgs, get_content_fn, eval_data)

In [None]:
#| eval: false
await orchestrator.run_stage1(Semaphore(1))

49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 1 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers -

49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 2 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 3 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 4 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 5 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 6 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 7 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Crosscutting Priorities - 1 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Crosscutting Priorities - 2 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Crosscutting Priorities - 3 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Crosscutting Priorities - 4 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 2 - Iterative Exploration
49d2fba781b6a7c0d94577479636ee6f - stage1 - SRF - Enablers - 3 - Iterative Exploration
49d2fba781b6a7c0d945774796

In [None]:
#| eval: false
print(orchestrator.results(Phase.STAGE1, filter_type="covered"))

In [None]:
#| exports
def get_stage1_covered_context(results: PipelineResults, eval_data: EvalData) -> str:
    "Get and format covered themes in Stage 1."
    covered_themes = results(Phase.STAGE1, filter_type="covered")
    if not covered_themes: return ""
    
    context_parts = []
    for theme in covered_themes:
        if theme.framework_category == str(FrameworkCat.ENABLERS):
            theme_data = next(t for t in eval_data.srf_enablers if t.id == theme.framework_theme_id)
        elif theme.framework_category == str(FrameworkCat.CROSSCUT):
            theme_data = next(t for t in eval_data.srf_crosscutting_priorities if t.id == theme.framework_theme_id)
        
        context_parts.append(f"- **{theme.framework_category} {theme_data.id}**: {theme_data.title}")
    
    return f"### Report Preliminary Context\nThis evaluation report covers the following Strategic Results Framework themes:\n" + "\n".join(context_parts)


For instance:

In [None]:
#| eval: false
print(get_stage1_covered_context(orchestrator.results, eval_data))

In [None]:
#| exports
@patch
async def run_stage2(self:PipelineOrchestrator, semaphore):
    "Run stage 2 of the pipeline - GCM objectives analysis"
    setup_trace_logging(self.report_id, self.verbosity)
    stage1_context = get_stage1_covered_context(self.results, self.eval_data)
    analyzers = []
    
    for gcm_obj in gcm_small:
        trace_ctx = TraceContext(self.report_id, Phase.STAGE2, FrameworkInfo(Framework.GCM, FrameworkCat.OBJS, gcm_obj["id"]))
        theme = format_gcm_theme(gcm_obj)
        analyzer = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, trace_ctx, semaphore=semaphore)
        analyzers.append((analyzer, theme, stage1_context))

    results = await gather(*[analyzer.acall(theme, self.headings, self.get_content_fn, context) 
                             for analyzer, theme, context in analyzers])
    
    for result in results: 
        self.results[Phase.STAGE2][result.framework_name][result.framework_category][result.framework_theme_id] = result

In [None]:
#| eval: false
await orchestrator.run_stage2(Semaphore(1))

49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 1 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 2 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 3 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 4 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 5 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 6 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 7 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 8 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 9 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 10 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 11 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 

49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 1 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 2 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 3 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 4 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 5 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 6 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 7 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 8 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 9 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 10 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 11 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 12 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage2 - GCM - Objectives - 13 - Overview
49d2fba7

In [None]:
#| eval: false
print(orchestrator.results(Phase.STAGE2, filter_type="covered"))

In [None]:
#| exports
def get_filtered_srf_output_ids(
    results: PipelineResults, # PipelineResults
    eval_data: EvalData # EvalData
    ) -> list: # list of SRF output IDs
    "Get filtered SRF output IDs based on covered GCM themes."
    covered_gcm = results(Phase.STAGE2, filter_type="covered")
    srf_output_ids = set()
    
    for gcm_theme in covered_gcm:
        gcm_id = gcm_theme.framework_theme_id
        if gcm_id in eval_data.gcm_srf_lut:
            srf_output_ids.update(eval_data.gcm_srf_lut[gcm_id])
    
    return list(srf_output_ids)

For instance:

In [None]:
#| eval: false
get_filtered_srf_output_ids(orchestrator.results, eval_data)[:5]

['2b11', '3d22', '3c11', '3d31', '3c13']

In [None]:
#| export
def get_combined_context(
    results: PipelineResults, # PipelineResults
    eval_data: EvalData, # EvalData
    ) -> str: # combined context
    "Get combined context from previous stages (1 and 2)."
    stage1_context = get_stage1_covered_context(results, eval_data)
    covered_gcm = results(Phase.STAGE2, filter_type="covered")
    
    if not covered_gcm: return stage1_context
    
    gcm_context = "\n".join([f"- **GCM {theme.framework_theme_id}**: {eval_data.gcm_objectives_small[int(theme.framework_theme_id)-1]['title']}" 
                            for theme in covered_gcm])
    
    return f"{stage1_context}\n\n### Covered GCM Objectives\n{gcm_context}"


For instance:

In [None]:
#| eval: false
combined_context = get_combined_context(orchestrator.results, eval_data)
print(combined_context)

In [None]:
#| exports
@patch
async def run_stage3(self:PipelineOrchestrator, semaphore):
    "Run stage 3 of the pipeline - Targeted SRF outputs analysis"
    setup_trace_logging(self.report_id, self.verbosity)
    
    combined_context = get_combined_context(self.results, self.eval_data)
    filtered_output_ids = get_filtered_srf_output_ids(self.results, self.eval_data)
    analyzers = []
    
    for output_id in filtered_output_ids:
        output_context = find_srf_output_by_id(self.eval_data, output_id)
        if output_context:
            trace_ctx = TraceContext(self.report_id, Phase.STAGE3, FrameworkInfo(Framework.SRF, FrameworkCat.OUTPUTS, output_id))
            theme = format_srf_output(output_context)
            analyzer = ThemeAnalyzer(Overview, Exploration, Assessment, Synthesis, trace_ctx, semaphore=semaphore)
            analyzers.append((analyzer, theme, combined_context))

    results = await gather(*[analyzer.acall(theme, self.headings, self.get_content_fn, context) 
                             for analyzer, theme, context in analyzers])
    
    for result in results: 
        self.results[Phase.STAGE3][result.framework_name][result.framework_category][result.framework_theme_id] = result

In [None]:
#| eval: false
await orchestrator.run_stage3(Semaphore(3))

49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 2b11 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 2b11 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3c11 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3c11 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3d31 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3d31 - Overview
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3c13 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 1b11 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 2b21 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3a43 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 2c12 - Starting Analysis
49d2fba781b6a7c0d94577479636ee6f - stage3 - SRF - Outputs - 3a51 - Starting Analysis
49d2fba

In [None]:
#| eval: false
n_outputs = len(orchestrator.results(Phase.STAGE3, filter_type="covered"))
print(f"Number of outputs: {n_outputs}")

In [None]:
#| eval: false
print(orchestrator.results(Phase.STAGE3, filter_type="covered"))