In [1]:
# =============================================================================
# Metadata Agent - Orchestrator Test Notebook
# =============================================================================
# This notebook demonstrates the multi-agent metadata extraction system.
#
# Features:
# - Plan Generation: LLM generates step-by-step extraction plan
# - Parallel Execution: Multiple players work on each step
# - Debate: Players critique and revise each other's work
# - Synthesis: Results are consolidated into final output
# =============================================================================

import logging
import os
import sys
from pprint import pprint

# Add project root to path
sys.path.insert(0, os.path.abspath('..'))

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Check config key
from src.config import get_config_summary
print(get_config_summary())

# Import orchestrator components
from src.orchestrator import Orchestrator
from src.orchestrator.schemas import Plan, PlanStep
from src.standards import METADATA_STANDARDS
from src.topology import EXECUTION_TOPOLOGIES
from src.players import PLAYER_CONFIGS

print("✅ All imports successful")


Configuration Summary:
----------------------
LLM Provider: surf (Custom OpenAI-compatible endpoint (e.g., vLLM, TGI))
LLM Model: Qwen 2.5 Coder 32B Instruct AWQ
Planning Temperature: 0.0
Player Temperature: 0.3
Default Topology: default
Default Metadata Standard: basic
API Key (SURF_API_KEY): Set

✅ All imports successful


## 1. Configuration

Set up the parameters for plan generation and execution.

In [2]:
# --- Configuration ---
DATASET_PATH = "../data/biota.csv"
FILE_TYPE = "CSV"
METADATA_STANDARD_NAME = "basic"  # Options: 'basic', 'dublin_core'
TOPOLOGY_NAME = "fast"  # Options: 'default', 'fast', 'thorough', 'single'

print("=" * 50)
print("Configuration")
print("=" * 50)
print(f"Dataset Path: {DATASET_PATH}")
print(f"File Type: {FILE_TYPE}")
print(f"Metadata Standard: {METADATA_STANDARD_NAME}")
print(f"Topology: {TOPOLOGY_NAME}")

# Show topology details
topology = EXECUTION_TOPOLOGIES[TOPOLOGY_NAME]
print(f"\nTopology Details:")
print(f"  - Players per step: {topology['players_per_step']}")
print(f"  - Debate rounds: {topology['debate_rounds']}")
print(f"  - Player pool: {topology['player_pool']}")

Configuration
Dataset Path: ../data/biota.csv
File Type: CSV
Metadata Standard: basic
Topology: fast

Topology Details:
  - Players per step: 2
  - Debate rounds: 1
  - Player pool: ['data_analyst', 'schema_expert']


## 2. Explore Available Configurations

View available topologies and player configurations.

In [3]:
# Show all available topologies
print("Available Execution Topologies:")
print("=" * 50)
for name, config in EXECUTION_TOPOLOGIES.items():
    print(f"\n{name}:")
    print(f"  Description: {config['description']}")
    print(f"  Players/step: {config['players_per_step']}")
    print(f"  Debate rounds: {config['debate_rounds']}")

Available Execution Topologies:

default:
  Description: Standard execution with 3 parallel players per step, 2 debate rounds, and comprehensive player pool.
  Players/step: 3
  Debate rounds: 2

fast:
  Description: Quick execution with 2 parallel players and minimal debate.
  Players/step: 2
  Debate rounds: 1

thorough:
  Description: Thorough execution with more players and extended debate.
  Players/step: 4
  Debate rounds: 3

single:
  Description: Single player execution with no debate. Fastest but least robust.
  Players/step: 1
  Debate rounds: 0


In [4]:
# Show all available player roles
print("Available Player Roles:")
print("=" * 50)
for name, config in PLAYER_CONFIGS.items():
    print(f"\n{name}:")
    print(f"  Role: {config['role_prompt'][:80]}...")
    tools = [t.name for t in config.get('tools', [])]
    print(f"  Tools: {tools if tools else 'None'}")

Available Player Roles:

data_analyst:
  Role: You are an expert data analyst. Your job is to perform statistical analysis on d...
  Tools: ['get_file_info', 'get_row_count', 'get_column_names', 'get_column_statistics', 'get_missing_values']

schema_expert:
  Role: You are a database schema expert. Your job is to describe the structure of datas...
  Tools: ['get_column_names', 'get_data_types', 'get_sample_rows']

metadata_specialist:
  Role: You are a metadata specialist familiar with standards like Dublin Core, DCAT, an...
  Tools: None

critic:
  Role: You are a meticulous quality assurance critic. Your job is to review analyses fr...
  Tools: None

synthesizer:
  Role: You are a metadata synthesizer. Your job is to consolidate multiple analyses int...
  Tools: None


## 3. Plan Generation

Generate a step-by-step plan for metadata extraction using the Orchestrator.

In [5]:
# Initialize the orchestrator
orchestrator = Orchestrator(topology_name=TOPOLOGY_NAME)

# Get the metadata standard content
metadata_standard = METADATA_STANDARDS[METADATA_STANDARD_NAME]
print("Metadata Standard:")
print(metadata_standard)

2026-01-15 14:15:08,917 - INFO - PlanExecutor initialized with topology: fast
2026-01-15 14:15:08,918 - INFO -   Players per step: 2
2026-01-15 14:15:08,919 - INFO -   Debate rounds: 1
2026-01-15 14:15:08,920 - INFO -   Player pool: ['data_analyst', 'schema_expert']
2026-01-15 14:15:08,920 - INFO - Orchestrator initialized with topology: fast


Metadata Standard:

{
    "title": "...",
    "description": "...",
    "schema": {
        "fields": [
            {
                "name": "...",
                "type": "...",
                "description": "..."
            }
        ]
    }
}



In [6]:
# Generate the plan
print("Generating plan...")
print("=" * 50)

plan = orchestrator.generate_plan(
    file_type=FILE_TYPE,
    metadata_standard=metadata_standard
)

if plan:
    print("\n✅ Plan generated successfully!")
else:
    print("\n❌ Plan generation failed")

2026-01-15 14:15:12,876 - INFO - GENERATING PLAN
2026-01-15 14:15:12,877 - INFO - File type: CSV
2026-01-15 14:15:12,878 - INFO - Available players manifest:
2026-01-15 14:15:12,878 - INFO - Player: data_analyst
  Description: You are an expert data analyst. Your job is to perform statistical analysis on datasets, identify patterns, and extract meaningful insights. Focus on numerical summaries, distributions, and data quality.
  Tools:
    - get_file_info: Returns basic file information including size and estimated row count.
Useful as a first step to understand the dataset scale.
The input must be a valid file path.
    - get_row_count: Returns the total number of rows in a structured data file (e.g., CSV).
This is useful for getting a basic sense of the dataset's size.
Optimized: counts rows without loading entire file into memory.
The input must be a valid file path.
    - get_column_names: Returns a list of column names from a structured data file (e.g., CSV).
This is the first ste

Generating plan...


2026-01-15 14:15:33,156 - INFO - HTTP Request: POST https://willma.surf.nl/api/v0/chat/completions "HTTP/1.1 200 OK"
2026-01-15 14:15:33,355 - INFO - Plan generated successfully!
2026-01-15 14:15:33,356 - INFO - Number of steps: 7
2026-01-15 14:15:33,357 - INFO -   Step 1: get_file_info (player: data_analyst)
2026-01-15 14:15:33,359 - INFO -   Step 2: get_row_count (player: data_analyst)
2026-01-15 14:15:33,359 - INFO -   Step 3: get_column_names (player: schema_expert)
2026-01-15 14:15:33,360 - INFO -   Step 4: get_data_types (player: schema_expert)
2026-01-15 14:15:33,360 - INFO -   Step 5: get_column_statistics (player: data_analyst)
2026-01-15 14:15:33,361 - INFO -   Step 6: get_missing_values (player: data_analyst)
2026-01-15 14:15:33,361 - INFO -   Step 7: format_final_metadata (player: metadata_specialist)



✅ Plan generated successfully!


In [7]:
# Inspect the generated plan
if plan:
    print("Generated Plan:")
    print("=" * 50)
    
    for i, step in enumerate(plan.steps):
        print(f"\nStep {i + 1}: {step.task}")
        print(f"  Player: {step.player}")
        print(f"  Rationale: {step.rationale}")
        print(f"  Inputs: {step.inputs}")
        print(f"  Outputs: {step.outputs}")
else:
    print("No plan to inspect. Run the plan generation cell first.")

Generated Plan:

Step 1: get_file_info
  Player: data_analyst
  Rationale: First, gather basic file information such as size and estimated row count to understand the dataset scale.
  Inputs: {}
  Outputs: ['file_info']

Step 2: get_row_count
  Player: data_analyst
  Rationale: Get the exact number of rows in the dataset to ensure we have a precise count for our metadata.
  Inputs: {'file_path': 'file_info'}
  Outputs: ['row_count']

Step 3: get_column_names
  Player: schema_expert
  Rationale: Identify the column names to understand the schema of the dataset.
  Inputs: {'file_path': 'file_info'}
  Outputs: ['column_names']

Step 4: get_data_types
  Player: schema_expert
  Rationale: Determine the data types of each column to provide accurate schema information.
  Inputs: {'file_path': 'file_info'}
  Outputs: ['data_types']

Step 5: get_column_statistics
  Player: data_analyst
  Rationale: Extract basic statistics for all columns to provide detailed insights into the dataset's numerica

In [8]:
# View plan as dictionary (full details)
if plan:
    print("Plan as Dictionary:")
    print("=" * 50)
    pprint(plan.model_dump())

Plan as Dictionary:
{'steps': [{'inputs': {},
            'outputs': ['file_info'],
            'player': 'data_analyst',
            'rationale': 'First, gather basic file information such as size '
                         'and estimated row count to understand the dataset '
                         'scale.',
            'task': 'get_file_info'},
           {'inputs': {'file_path': 'file_info'},
            'outputs': ['row_count'],
            'player': 'data_analyst',
            'rationale': 'Get the exact number of rows in the dataset to '
                         'ensure we have a precise count for our metadata.',
            'task': 'get_row_count'},
           {'inputs': {'file_path': 'file_info'},
            'outputs': ['column_names'],
            'player': 'schema_expert',
            'rationale': 'Identify the column names to understand the schema '
                         'of the dataset.',
            'task': 'get_column_names'},
           {'inputs': {'file_path': 'fi

## 4. Plan Validation

Validate the plan's dataflow dependencies to ensure all inputs are available.

In [9]:
# Validate plan dataflow
from src.orchestrator.utils import validate_plan_dataflow

if plan:
    # Convert to dict list for validation
    plan_dicts = plan.to_dict_list()
    
    is_valid, message = validate_plan_dataflow(plan_dicts)
    
    if is_valid:
        print(f"✅ {message}")
    else:
        print(f"❌ {message}")
else:
    print("No plan to validate.")

✅ Plan dataflow is valid.


## 5. Prepare Dataset

Ensure a test dataset exists for execution.

In [10]:
# Check if dataset exists, create sample if not
import pandas as pd

if not os.path.exists(DATASET_PATH):
    print(f"Dataset not found at {DATASET_PATH}")
    print("Creating sample dataset...")
    
    os.makedirs(os.path.dirname(DATASET_PATH), exist_ok=True)
    
    sample_df = pd.DataFrame({
        "id": [1, 2, 3, 4, 5],
        "name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
        "age": [25, 30, 35, 28, 32],
        "city": ["NYC", "LA", "Chicago", "NYC", "Boston"],
        "salary": [50000, 60000, 75000, 55000, 80000]
    })
    sample_df.to_csv(DATASET_PATH, index=False)
    print(f"✅ Sample dataset created at {DATASET_PATH}")
else:
    print(f"✅ Dataset found at {DATASET_PATH}")

# Preview the dataset
df = pd.read_csv(DATASET_PATH)
print(f"\nDataset Preview ({len(df)} rows, {len(df.columns)} columns):")
print(df.head())

✅ Dataset found at ../data/biota.csv

Dataset Preview (391018 rows, 4 columns):
   sample_id  sibes_id  abundance_m2  afdm_m2
0      33941        16       28.8716   0.4331
1      33941        20      115.4866   0.6986
2      33941        21      115.4866   0.0000
3      33941       289      230.9732   0.0462
4      33942        20      288.7165   0.7391


## 6. Full Execution

Execute the complete pipeline: plan generation + parallel players + debate.

**Note**: This will make multiple LLM calls and may take a few minutes.

In [11]:
# Execute the plan with parallel players and debate
# This runs the full pipeline

if plan:
    print("Executing plan with parallel players and debate...")
    print("=" * 50)
    
    result = orchestrator.execute_plan(
        plan=plan,
        dataset_path=DATASET_PATH,
        metadata_standard=metadata_standard
    )
    
    if result.success:
        print("\n✅ Execution completed successfully!")
    else:
        print(f"\n❌ Execution failed: {result.error}")
else:
    print("No plan to execute. Run plan generation first.")

2026-01-15 14:24:53,721 - INFO - STARTING PLAN EXECUTION
2026-01-15 14:24:53,721 - INFO - Dataset: ../data/biota.csv
2026-01-15 14:24:53,721 - INFO - Steps: 7
2026-01-15 14:24:53,724 - INFO - 
2026-01-15 14:24:53,726 - INFO - Task: get_file_info
2026-01-15 14:24:53,726 - INFO - Player: data_analyst
2026-01-15 14:24:53,727 - INFO - Rationale: First, gather basic file information such as size and estimated row count to understand the dataset scale.
2026-01-15 14:24:53,730 - INFO - --- STEP 0: PARALLEL EXECUTION ---
2026-01-15 14:24:53,731 - INFO - Task: get_file_info
2026-01-15 14:24:53,733 - INFO - Players: 2


Executing plan with parallel players and debate...


2026-01-15 14:27:05,157 - INFO - HTTP Request: POST https://willma.surf.nl/api/v0/chat/completions "HTTP/1.1 200 OK"
2026-01-15 14:27:05,160 - INFO -   Player 'data_analyst_1' completed execution
2026-01-15 14:27:30,232 - INFO - HTTP Request: POST https://willma.surf.nl/api/v0/chat/completions "HTTP/1.1 200 OK"
2026-01-15 14:27:30,234 - INFO -   Player 'schema_expert_2' completed execution
2026-01-15 14:27:30,235 - INFO - Max debate rounds (1) reached, synthesizing
2026-01-15 14:27:30,237 - INFO - --- STEP 0: SYNTHESIS ---
2026-01-15 14:27:35,672 - INFO - HTTP Request: POST https://willma.surf.nl/api/v0/chat/completions "HTTP/1.1 200 OK"
2026-01-15 14:27:35,675 - INFO -   Synthesis complete. Produced artifacts: ['file_info']
2026-01-15 14:27:35,676 - INFO - Step 1 completed successfully
2026-01-15 14:27:35,677 - INFO -   Artifacts produced: ['file_info']
2026-01-15 14:27:35,678 - INFO - 
2026-01-15 14:27:35,680 - INFO - Task: get_row_count
2026-01-15 14:27:35,681 - INFO - Player: data_


✅ Execution completed successfully!


In [None]:
# Inspect execution results
if 'result' in dir() and result:
    print("Execution Results Summary:")
    print("=" * 50)
    print(f"Success: {result.success}")
    print(f"Steps Completed: {result.steps_completed}/{result.plan_steps_count}")
    
    print("\n--- Step Results ---")
    for step_result in result.step_results:
        print(f"\nStep {step_result.step_index + 1}: {step_result.task}")
        print(f"  Player Role: {step_result.player_role}")
        print(f"  Success: {step_result.success}")
        print(f"  Debate Rounds: {step_result.debate_rounds_completed}")
        print(f"  Artifacts Produced: {list(step_result.artifacts.keys())}")
        if step_result.error:
            print(f"  Error: {step_result.error}")
else:
    print("No results to inspect. Run the execution cell first.")

In [None]:
# View final workspace and metadata
if 'result' in dir() and result:
    print("Final Workspace Artifacts:")
    print("=" * 50)
    for name, value in result.final_workspace.items():
        print(f"\n--- {name} ---")
        # Truncate long values for display
        value_str = str(value)
        if len(value_str) > 500:
            print(value_str[:500] + "...")
        else:
            print(value_str)
    
    print("\n" + "=" * 50)
    print("Final Metadata:")
    print("=" * 50)
    pprint(result.final_metadata)
else:
    print("No results to display.")

In [None]:
for key, value in result.final_metadata['artifacts'].items():
    print(f"\n--- {key} ---")
    print(value)
    print("\n" + "=" * 50)

## 7. Test Tools Directly

Test the available tools on the dataset without going through the full pipeline.

In [None]:
# Test tools directly on the dataset
from src.tools import pandas_tools

print("Testing Tools on Dataset:")
print("=" * 50)

# Get row count
print("\n1. Row Count:")
row_count = pandas_tools.get_row_count.invoke({"file_path": DATASET_PATH})
print(f"   {row_count} rows")

# Get column names
print("\n2. Column Names:")
columns = pandas_tools.get_column_names.invoke({"file_path": DATASET_PATH})
print(f"   {columns}")

# Get data types
print("\n3. Data Types:")
dtypes = pandas_tools.get_data_types.invoke({"file_path": DATASET_PATH})
for col, dtype in dtypes.items():
    print(f"   {col}: {dtype}")

# Get missing values
print("\n4. Missing Values:")
missing = pandas_tools.get_missing_values.invoke({"file_path": DATASET_PATH})
for col, count in missing.items():
    print(f"   {col}: {count}")

# Get sample rows
print("\n5. Sample Rows:")
sample = pandas_tools.get_sample_rows.invoke({"file_path": DATASET_PATH})
print(sample)

## Summary

This notebook demonstrated the full metadata extraction pipeline:

1. **Configuration** - Set file type, metadata standard, and execution topology
2. **Exploration** - Viewed available topologies and player roles
3. **Plan Generation** - LLM generated a step-by-step extraction plan
4. **Validation** - Verified plan dataflow dependencies
5. **Dataset Preparation** - Ensured test dataset exists
6. **Full Execution** - Ran parallel players with debate on each step
7. **Tools Testing** - Tested individual tools on the dataset

### Key Components:
- **Orchestrator**: Coordinates planning and execution
- **Players**: Execute tasks and participate in debates  
- **Tools**: Extract actual data from datasets
- **Topology**: Configures parallelism and debate rounds

### Configuration Options:
- Edit `src/config.py` to change the LLM model
- Modify `TOPOLOGY_NAME` to change execution strategy
- Add new player roles in `src/players/configs.py`
- Add new tools in `src/tools/pandas_tools.py`