In [9]:
import sys
import os
from pathlib import Path
import json
import yaml
from IPython.display import display, Markdown, JSON

# Try to find MRP source directory in different possible locations
possible_mrp_paths = [
    Path('../src/mrp').resolve(),  # Local development (from notebooks/)
    Path('./src/mrp').resolve(),   # If running from root
    Path('../../src/mrp').resolve(),  # If in nested folder
    Path('/Users/hamidbagheri/GitHub/a10i/bcb_demo/src/mrp'),  # Absolute local path
    Path.cwd() / 'src' / 'mrp',    # Current working directory
    Path.cwd() / 'mrp',            # MRP files in current directory
]

mrp_path = None
for path in possible_mrp_paths:
    if path.exists() and (path / 'compile.py').exists():
        mrp_path = path
        print(f"‚úÖ Found MRP source at: {mrp_path}")
        break

if mrp_path is None:
    print("‚ùå Error: Could not find MRP source directory!")
    print("üìÅ Searched paths:")
    for path in possible_mrp_paths:
        exists = "‚úÖ" if path.exists() else "‚ùå"
        print(f"   {exists} {path}")
    
    print("\nüí° Solutions:")
    print("   1. Run this notebook from the notebooks/ directory")
    print("   2. Copy compile.py and run_job.py to your current directory")
    print("   3. Use the fallback demo below")
    
    # Create a fallback demo with inline code
    print("\nüîß Creating fallback demo setup...")
    mrp_path = Path.cwd()
else:
    # Add the MRP source directory to Python path
    sys.path.insert(0, str(mrp_path))

print(f"\nüìÅ Working with path: {mrp_path}")
print(f"üìÅ Current directory: {Path.cwd()}")

‚úÖ Found MRP source at: /Users/hamidbagheri/GitHub/a10i/bcb_demo/src/mrp

üìÅ Working with path: /Users/hamidbagheri/GitHub/a10i/bcb_demo/src/mrp
üìÅ Current directory: /Users/hamidbagheri/GitHub/a10i/bcb_demo/notebooks


In [10]:
# Try to import MRP modules, fallback to inline definitions if needed
try:
    from compile import TEMPLATES, main as compile_main
    from run_job import run, validate_spec, AGENT_REGISTRY, REDUCE_REGISTRY, PRODUCE_REGISTRY
    
    print("‚úÖ MRP modules imported successfully")
    print(f"üîß Available agents: {list(AGENT_REGISTRY.keys())}")
    print(f"üìä Available reducers: {list(REDUCE_REGISTRY.keys())}")
    print(f"üì§ Available producers: {list(PRODUCE_REGISTRY.keys())}")
    
    IMPORT_SUCCESS = True
    
except ImportError as e:
    print(f"‚ö†Ô∏è  Import failed: {e}")
    print("üîß Using fallback inline demo code...")
    
    IMPORT_SUCCESS = False
    
    # Inline fallback code for demo purposes
    TEMPLATES = {
        "run the biological data processing given the data": '''
job: bio_hello
description: "Demo: uppercase + filter length>=5, then aggregate stats"
map:
  agent: process_bio
  params:
    min_len: 5
  data:
    - ["ATCG","GCTA","TGCA","CGAT","AATG"]
    - ["PROTEIN_A","ENZYME_B","RECEPTOR_C","KINASE_D"]
    - ["ATP","NADH","GLUCOSE","GLYCOGEN","LACTATE"]
    - ["HEMOGLOBIN","INSULIN","COLLAGEN","KERATIN"]
reduce:
  op: stats
produce:
  op: print
''',
        "save biological data processing to json": '''
job: bio_hello
description: "Demo: uppercase + filter length>=5, then aggregate stats"
map:
  agent: process_bio
  params:
    min_len: 5
  data:
    - ["ATCG","GCTA","TGCA","CGAT","AATG"]
    - ["PROTEIN_A","ENZYME_B","RECEPTOR_C","KINASE_D"]
    - ["ATP","NADH","GLUCOSE","GLYCOGEN","LACTATE"]
    - ["HEMOGLOBIN","INSULIN","COLLAGEN","KERATIN"]
reduce:
  op: stats
produce:
  op: save_json
  path: bio_results.json
'''
    }
    
    # Simplified agent for demo
    def agent_process_bio(chunk, params):
        """Process biological data: uppercase + filter by length"""
        min_len = params.get('min_len', 0)
        upper = [s.upper() for s in chunk]
        processed = [s for s in upper if len(s) >= min_len]
        return {
            "input_size": len(chunk),
            "kept_size": len(processed),
            "kept": processed,
        }
    
    def reduce_stats(results):
        """Aggregate results from all shards"""
        total_input = sum(r["input_size"] for r in results)
        total_kept = sum(r["kept_size"] for r in results)
        kept_flat = []
        for r in results:
            kept_flat.extend(r["kept"])
        return {
            "total_input": total_input,
            "total_kept": total_kept,
            "kept": kept_flat,
            "shards": results,
        }
    
    AGENT_REGISTRY = {"process_bio": agent_process_bio}
    REDUCE_REGISTRY = {"stats": reduce_stats}
    PRODUCE_REGISTRY = {"print": print, "save_json": lambda x, s: print("[Simulated] Saved to JSON")}
    
    print("‚úÖ Fallback demo setup complete")
    print(f"üîß Available agents: {list(AGENT_REGISTRY.keys())}")
    print(f"üìä Available reducers: {list(REDUCE_REGISTRY.keys())}")
    print(f"üì§ Available producers: {list(PRODUCE_REGISTRY.keys())}")

‚úÖ MRP modules imported successfully
üîß Available agents: ['process_bio']
üìä Available reducers: ['stats']
üì§ Available producers: ['print', 'save_json', 'save_markdown']


In [11]:
# Function to generate YAML from natural language
def generate_yaml(query, output_file="temp_job.yml"):
    """Generate YAML specification from natural language query."""
    if query not in TEMPLATES:
        print(f"‚ùå Error: No template for query: {query!r}")
        return None
    
    yaml_content = TEMPLATES[query]
    
    # Only write file if we have write permissions
    try:
        output_path = Path(output_file)  # Use current directory
        output_path.write_text(yaml_content, encoding="utf-8")
        print(f"‚úÖ Generated YAML: {output_file}")
    except (PermissionError, FileNotFoundError) as e:
        print(f"‚ö†Ô∏è  Could not write file ({e}), showing content only")
        output_path = None
    
    return yaml_content, output_path

# Generate YAML for console output
console_query = "run the biological data processing given the data"
yaml_content, yaml_path = generate_yaml(console_query, "demo_console.yml")

print("\nüìÑ Generated YAML Specification:")
print("=" * 40)
print(yaml_content)

‚úÖ Generated YAML: demo_console.yml

üìÑ Generated YAML Specification:
job: bio_hello
description: "Demo: uppercase + filter length>=5, then aggregate stats"
map:
  agent: process_bio
  params:
    min_len: 5
  data:
    - ["ATCG","GCTA","TGCA","CGAT","AATG"]
    - ["PROTEIN_A","ENZYME_B","RECEPTOR_C","KINASE_D"]
    - ["ATP","NADH","GLUCOSE","GLYCOGEN","LACTATE"]
    - ["HEMOGLOBIN","INSULIN","COLLAGEN","KERATIN"]
reduce:
  op: stats
produce:
  op: print



In [13]:
import concurrent.futures
import time

# Parse the YAML and simulate execution
spec = yaml.safe_load(yaml_content)

print("üöÄ Simulating Parallel MapReduce Job...")
print("=" * 40)
print(f"‚ñ∂ Job: {spec.get('job', 'unnamed')}")
print(f"  - agent: {spec['map']['agent']}")
print(f"  - shards: {len(spec['map']['data'])}")
print(f"  - reduce: {spec['reduce']['op']}")
print(f"  - produce: {spec['produce']['op']}")

# Map phase - process shards in parallel
agent_fn = AGENT_REGISTRY[spec['map']['agent']]
params = spec['map'].get('params', {})
shards = spec['map']['data']

results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # Submit all shards for processing
    future_to_shard = {executor.submit(agent_fn, shard, params): i for i, shard in enumerate(shards)}
    
    # Collect results as they complete
    for future in concurrent.futures.as_completed(future_to_shard):
        shard_idx = future_to_shard[future]
        result = future.result()
        results.append((shard_idx, result))
        print(f"    ‚úì shard {shard_idx} done")

# Sort results by shard index
results.sort(key=lambda x: x[0])
shard_results = [r[1] for r in results]

# Reduce phase
reducer_fn = REDUCE_REGISTRY[spec['reduce']['op']]
final_result = reducer_fn(shard_results)

# Display results
print("\nüìä Final Results:")
print(json.dumps(final_result, indent=2))

# Store for later analysis
result_data = final_result

üöÄ Simulating Parallel MapReduce Job...
‚ñ∂ Job: bio_hello
  - agent: process_bio
  - shards: 4
  - reduce: stats
  - produce: print
    ‚úì shard 2 done
    ‚úì shard 0 done
    ‚úì shard 1 done
    ‚úì shard 3 done

üìä Final Results:
{
  "total_input": 18,
  "total_kept": 11,
  "kept": [
    "PROTEIN_A",
    "ENZYME_B",
    "RECEPTOR_C",
    "KINASE_D",
    "GLUCOSE",
    "GLYCOGEN",
    "LACTATE",
    "HEMOGLOBIN",
    "INSULIN",
    "COLLAGEN",
    "KERATIN"
  ],
  "shards": [
    {
      "input_size": 5,
      "kept_size": 0,
      "kept": []
    },
    {
      "input_size": 4,
      "kept_size": 4,
      "kept": [
        "PROTEIN_A",
        "ENZYME_B",
        "RECEPTOR_C",
        "KINASE_D"
      ]
    },
    {
      "input_size": 5,
      "kept_size": 3,
      "kept": [
        "GLUCOSE",
        "GLYCOGEN",
        "LACTATE"
      ]
    },
    {
      "input_size": 4,
      "kept_size": 4,
      "kept": [
        "HEMOGLOBIN",
        "INSULIN",
        "COLLAGEN",
    

In [14]:
# Analyze the biological data processing results
print("üß¨ Biological Data Processing Analysis:")
print("=" * 40)

total_input = result_data['total_input']
total_kept = result_data['total_kept']
efficiency = (total_kept / total_input) * 100

print(f"üìä Processing Statistics:")
print(f"   ‚Ä¢ Total input items: {total_input}")
print(f"   ‚Ä¢ Items kept (length ‚â• 5): {total_kept}")
print(f"   ‚Ä¢ Filter efficiency: {efficiency:.1f}%")
print(f"   ‚Ä¢ Number of parallel shards: {len(result_data['shards'])}")

print(f"\nüß™ Filtered Biological Items:")
for i, item in enumerate(result_data['kept'], 1):
    print(f"   {i:2d}. {item}")

print(f"\n‚ö° Shard Processing Details:")
for i, shard in enumerate(result_data['shards']):
    kept_items = ', '.join(shard['kept']) if shard['kept'] else 'none'
    print(f"   Shard {i}: {shard['input_size']} ‚Üí {shard['kept_size']} ({kept_items})")

# Display as structured JSON
print("\nüìã Structured Results:")
display(JSON(result_data, expanded=True))

üß¨ Biological Data Processing Analysis:
üìä Processing Statistics:
   ‚Ä¢ Total input items: 18
   ‚Ä¢ Items kept (length ‚â• 5): 11
   ‚Ä¢ Filter efficiency: 61.1%
   ‚Ä¢ Number of parallel shards: 4

üß™ Filtered Biological Items:
    1. PROTEIN_A
    2. ENZYME_B
    3. RECEPTOR_C
    4. KINASE_D
    5. GLUCOSE
    6. GLYCOGEN
    7. LACTATE
    8. HEMOGLOBIN
    9. INSULIN
   10. COLLAGEN
   11. KERATIN

‚ö° Shard Processing Details:
   Shard 0: 5 ‚Üí 0 (none)
   Shard 1: 4 ‚Üí 4 (PROTEIN_A, ENZYME_B, RECEPTOR_C, KINASE_D)
   Shard 2: 5 ‚Üí 3 (GLUCOSE, GLYCOGEN, LACTATE)
   Shard 3: 4 ‚Üí 4 (HEMOGLOBIN, INSULIN, COLLAGEN, KERATIN)

üìã Structured Results:


<IPython.core.display.JSON object>

In [15]:
print("üß¨ What Makes This System 'Agentic':")
print("=" * 40)

agentic_features = [
    "ü§ñ **Agents = Map Workers**: Each shard processed by named agent functions",
    "üîå **Registry Pattern**: Pluggable agents, reducers, producers without changing core runtime",
    "üìã **Spec-Driven**: All behavior declared in YAML, validated before execution", 
    "‚ö° **Parallel Execution**: Automatic work distribution across available cores",
    "üß© **Composable**: Easy to add new processing patterns and output formats",
    "üîÑ **Event-Driven**: Clear map ‚Üí reduce ‚Üí produce pipeline",
    "üìä **Multiple Outputs**: Console, JSON, Markdown reports from same specification"
]

for feature in agentic_features:
    display(Markdown(f"- {feature}"))

# Show registry extensibility
print("\nü§ñ Agent Registry System:")
print(f"   ‚Ä¢ Available Map Agents: {list(AGENT_REGISTRY.keys())}")
print(f"   ‚Ä¢ Available Reducers: {list(REDUCE_REGISTRY.keys())}")
print(f"   ‚Ä¢ Available Producers: {list(PRODUCE_REGISTRY.keys())}")

# Show processing summary
print(f"\nüìä Demo Processing Results:")
print(f"   ‚Ä¢ Total input items: {total_input}")
print(f"   ‚Ä¢ Items kept (length ‚â• 5): {total_kept}")
print(f"   ‚Ä¢ Filter efficiency: {efficiency:.1f}%")
print(f"   ‚Ä¢ Parallel shards: {len(result_data['shards'])}")

üß¨ What Makes This System 'Agentic':


- ü§ñ **Agents = Map Workers**: Each shard processed by named agent functions

- üîå **Registry Pattern**: Pluggable agents, reducers, producers without changing core runtime

- üìã **Spec-Driven**: All behavior declared in YAML, validated before execution

- ‚ö° **Parallel Execution**: Automatic work distribution across available cores

- üß© **Composable**: Easy to add new processing patterns and output formats

- üîÑ **Event-Driven**: Clear map ‚Üí reduce ‚Üí produce pipeline

- üìä **Multiple Outputs**: Console, JSON, Markdown reports from same specification


ü§ñ Agent Registry System:
   ‚Ä¢ Available Map Agents: ['process_bio']
   ‚Ä¢ Available Reducers: ['stats']
   ‚Ä¢ Available Producers: ['print', 'save_json', 'save_markdown']

üìä Demo Processing Results:
   ‚Ä¢ Total input items: 18
   ‚Ä¢ Items kept (length ‚â• 5): 11
   ‚Ä¢ Filter efficiency: 61.1%
   ‚Ä¢ Parallel shards: 4
