# Map-Reduce Pattern with Workflow Visualization

This notebook demonstrates a complete **map-reduce workflow** implementation with **file-backed intermediate results** and **workflow visualization** capabilities.

## Key Concepts

### Map-Reduce Architecture
- **Split**: Divide input data into chunks for parallel processing
- **Map**: Process each chunk independently and emit key-value pairs
- **Shuffle**: Group map outputs by key and partition across reducers
- **Reduce**: Aggregate values for each key in parallel
- **Completion**: Collect final results from all reducers

### File-Backed Intermediate Results
- **Memory Efficiency**: Store intermediate data in files, not memory
- **Scalability**: Handle datasets larger than available RAM
- **Persistence**: Intermediate results survive process failures
- **Async I/O**: Non-blocking file operations with aiofiles

### Shared State Coordination
- **Context Sharing**: Executors communicate via `ctx.set_shared_state()`
- **Chunk Distribution**: Split executor assigns chunks to mappers
- **Partition Routing**: Shuffle executor assigns keys to reducers
- **Type-Safe State**: Strongly typed shared state objects

### Workflow Visualization
- **WorkflowViz**: Generate visual representations of workflow graphs
- **Mermaid Diagrams**: Text-based graph rendering
- **DiGraph**: NetworkX graph objects for programmatic analysis
- **SVG Export**: High-quality vector graphics for documentation

## Workflow Architecture

```
                  Split (divide text into chunks)
                    ↓
         ┌──────────┼──────────┐
      Map_0      Map_1      Map_2   (parallel word counting)
         └──────────┼──────────┘
                 Shuffle (group by key, partition)
                    ↓
         ┌──────────┼──────────┬──────────┐
     Reduce_0  Reduce_1  Reduce_2  Reduce_3  (parallel aggregation)
         └──────────┼──────────┴──────────┘
                Completion (collect results)
```

## What This Example Shows

1. **Scalable Parallel Processing**: Map-reduce for word counting
2. **File-Backed Storage**: Intermediate results persisted to disk
3. **Shared State Coordination**: Dynamic chunk and partition assignment
4. **Workflow Visualization**: Generate Mermaid diagrams and SVG exports
5. **Marker-Based Phases**: Dataclasses signal workflow progression
6. **Configurable Parallelism**: Adjust mapper and reducer counts

## Setup

Import required modules and configure the workflow environment.

In [None]:
from dotenv import load_dotenv
import asyncio
import os
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path

import aiofiles

from agent_framework.workflows import Executor, ExecutorContext, Workflow
from agent_framework.workflows.viz import WorkflowViz


load_dotenv('../../.env')
# Configuration
NUM_MAPPERS = 3  # Number of parallel map executors
NUM_REDUCERS = 4  # Number of parallel reduce executors
TMP_DIR = Path("tmp")  # Directory for intermediate files
RESOURCES_DIR = Path("../resources")  # Input data directory

# Ensure tmp directory exists
TMP_DIR.mkdir(exist_ok=True)

print(f"✅ Configuration:")
print(f"   - Mappers: {NUM_MAPPERS}")
print(f"   - Reducers: {NUM_REDUCERS}")
print(f"   - Temp directory: {TMP_DIR}")
print(f"   - Resources directory: {RESOURCES_DIR}")

## Define Message Types and Marker Classes

We use dataclasses to represent workflow state and phase transitions.

### Marker Classes (Phase Signals):
- **`SplitCompleted`**: Signals split phase finished, stores chunk assignments
- **`MapCompleted`**: Signals map phase finished, stores map result file paths
- **`ShuffleCompleted`**: Signals shuffle phase finished, stores shuffle file paths
- **`ReduceCompleted`**: Signals reduce phase finished, stores final result files

### Shared State:
- **`chunk_assignments`**: Maps mapper IDs to text chunks
- **`partition_assignments`**: Maps reducer IDs to key ranges
- Both stored in workflow context for cross-executor communication

In [None]:
@dataclass
class SplitCompleted:
    """Marker indicating split phase completed."""
    chunk_count: int


@dataclass
class MapCompleted:
    """Marker indicating map phase completed."""
    file_paths: list[Path]


@dataclass
class ShuffleCompleted:
    """Marker indicating shuffle phase completed."""
    file_paths: list[Path]


@dataclass
class ReduceCompleted:
    """Marker indicating reduce phase completed."""
    file_paths: list[Path]


print("✅ Marker classes defined for workflow phase signaling")

## Split Executor

The **SplitExecutor** reads the input text file and divides it into chunks for parallel processing.

### Key Features:
- Reads input text from `resources/long_text.txt`
- Splits text into `NUM_MAPPERS` chunks
- Stores chunk assignments in shared state
- Each mapper gets a unique chunk via `chunk_assignments` dictionary

### Shared State Storage:
```python
ctx.set_shared_state(
    "chunk_assignments",
    {"mapper_0": chunk0, "mapper_1": chunk1, ...}
)
```

### Why Shared State?
- Mappers need to know which chunk to process
- Avoids duplicating large text chunks in memory
- Enables dynamic chunk assignment at runtime

In [None]:
class SplitExecutor(Executor[None, SplitCompleted]):
    """Splits input text into chunks for parallel map processing."""

    async def execute(self, ctx: ExecutorContext[None]) -> SplitCompleted:
        print("\n" + "="*60)
        print("PHASE 1: SPLIT")
        print("="*60)

        # Read the input text file
        input_file = RESOURCES_DIR / "long_text.txt"
        async with aiofiles.open(input_file, "r", encoding="utf-8") as f:
            text = await f.read()

        print(f"📄 Read {len(text)} characters from {input_file}")

        # Split text into chunks
        chunk_size = len(text) // NUM_MAPPERS
        chunks = [
            text[i * chunk_size : (i + 1) * chunk_size]
            for i in range(NUM_MAPPERS - 1)
        ]
        # Last chunk gets the remainder
        chunks.append(text[(NUM_MAPPERS - 1) * chunk_size :])

        # Store chunk assignments in shared state
        chunk_assignments = {
            f"mapper_{i}": chunks[i] for i in range(NUM_MAPPERS)
        }
        ctx.set_shared_state("chunk_assignments", chunk_assignments)

        print(f"✅ Split text into {NUM_MAPPERS} chunks")
        for i, chunk in enumerate(chunks):
            print(f"   - Chunk {i}: {len(chunk)} characters")

        return SplitCompleted(chunk_count=NUM_MAPPERS)

## Map Executor

The **MapExecutor** processes assigned chunks and emits word-count pairs.

### Key Features:
- Each mapper instance processes its assigned chunk
- Retrieves chunk from shared state using executor ID
- Counts word occurrences in the chunk
- Writes `(word, 1)` pairs to file for each word occurrence
- Uses async file I/O for non-blocking writes

### Output Format:
```
word1 1
word2 1
word1 1
...
```

### File-Backed Storage:
- Results written to `tmp/map_results_{mapper_id}.txt`
- Memory footprint stays constant regardless of chunk size
- Enables processing of very large text files

### Why Multiple Instances?
- Each mapper runs in parallel on its own chunk
- Executor ID determines which chunk to process
- Fan-out pattern automatically creates instances

In [None]:
class MapExecutor(Executor[SplitCompleted, MapCompleted]):
    """Maps words in assigned chunk to (word, 1) pairs."""

    async def execute(self, ctx: ExecutorContext[SplitCompleted]) -> MapCompleted:
        mapper_id = self.id
        print(f"\n🗺️  MAP [{mapper_id}]: Processing chunk...")

        # Retrieve assigned chunk from shared state
        chunk_assignments: dict[str, str] = ctx.get_shared_state("chunk_assignments")
        chunk = chunk_assignments[mapper_id]

        # Count words
        words = chunk.lower().split()
        print(f"   Found {len(words)} words in chunk")

        # Write (word, 1) pairs to file
        output_file = TMP_DIR / f"map_results_{mapper_id}.txt"
        async with aiofiles.open(output_file, "w", encoding="utf-8") as f:
            for word in words:
                # Clean word (remove punctuation)
                word = "".join(c for c in word if c.isalnum())
                if word:
                    await f.write(f"{word} 1\n")

        print(f"   ✅ Wrote results to {output_file}")
        return MapCompleted(file_paths=[output_file])

## Shuffle Executor

The **ShuffleExecutor** groups map outputs by key and partitions them across reducers.

### Key Features:
- Reads all map result files
- Groups `(word, 1)` pairs by word (key)
- Partitions keys across `NUM_REDUCERS` using hash function
- Writes grouped data to reducer-specific files
- Stores partition assignments in shared state

### Partitioning Strategy:
```python
reducer_id = hash(word) % NUM_REDUCERS
```
- Ensures same word always goes to same reducer
- Distributes keys evenly across reducers
- Enables parallel reduce phase

### Output Format (per reducer):
```
word1 1 1 1
word2 1 1
word3 1
...
```

### Shared State Storage:
```python
ctx.set_shared_state(
    "partition_assignments",
    {"reducer_0": file0, "reducer_1": file1, ...}
)
```

In [None]:
class ShuffleExecutor(Executor[list[MapCompleted], ShuffleCompleted]):
    """Shuffles map outputs by grouping by key and partitioning across reducers."""

    async def execute(
        self, ctx: ExecutorContext[list[MapCompleted]]
    ) -> ShuffleCompleted:
        print("\n" + "="*60)
        print("PHASE 3: SHUFFLE")
        print("="*60)

        map_results = ctx.get_input_data()

        # Collect all map result files
        all_map_files = []
        for result in map_results:
            all_map_files.extend(result.file_paths)

        print(f"📥 Reading {len(all_map_files)} map result files...")

        # Group by key (word) and partition to reducers
        partitions: dict[int, dict[str, list[int]]] = {
            i: defaultdict(list) for i in range(NUM_REDUCERS)
        }

        for map_file in all_map_files:
            async with aiofiles.open(map_file, "r", encoding="utf-8") as f:
                async for line in f:
                    word, count = line.strip().split()
                    # Partition by hash of word
                    reducer_id = hash(word) % NUM_REDUCERS
                    partitions[reducer_id][word].append(int(count))

        # Write partitioned data to reducer-specific files
        shuffle_files = []
        partition_assignments = {}

        for reducer_id, partition in partitions.items():
            output_file = TMP_DIR / f"shuffle_results_{reducer_id}.txt"
            shuffle_files.append(output_file)
            partition_assignments[f"reducer_{reducer_id}"] = output_file

            async with aiofiles.open(output_file, "w", encoding="utf-8") as f:
                for word, counts in partition.items():
                    # Write word and all its counts
                    counts_str = " ".join(str(c) for c in counts)
                    await f.write(f"{word} {counts_str}\n")

            print(f"   ✅ Partition {reducer_id}: {len(partition)} unique words → {output_file}")

        # Store partition assignments in shared state
        ctx.set_shared_state("partition_assignments", partition_assignments)

        print(f"\n✅ Shuffle complete: {len(shuffle_files)} partitions created")
        return ShuffleCompleted(file_paths=shuffle_files)

## Reduce Executor

The **ReduceExecutor** aggregates counts for each word in its assigned partition.

### Key Features:
- Each reducer processes its assigned partition file
- Retrieves partition file from shared state using executor ID
- Sums counts for each word
- Writes final `(word, total_count)` pairs to file
- Multiple reducers run in parallel

### Output Format:
```
word1 42
word2 17
word3 8
...
```

### File-Backed Storage:
- Results written to `tmp/reduced_results_{reducer_id}.txt`
- Each reducer's output is independent
- Final word counts distributed across reducer files

### Why Multiple Instances?
- Each reducer processes different key partitions
- Parallel reduce improves performance
- No coordination needed between reducers

In [None]:
class ReduceExecutor(Executor[ShuffleCompleted, ReduceCompleted]):
    """Reduces word counts by summing all occurrences."""

    async def execute(self, ctx: ExecutorContext[ShuffleCompleted]) -> ReduceCompleted:
        reducer_id = self.id
        print(f"\n🔄 REDUCE [{reducer_id}]: Aggregating word counts...")

        # Retrieve assigned partition from shared state
        partition_assignments: dict[str, Path] = ctx.get_shared_state(
            "partition_assignments"
        )
        partition_file = partition_assignments[reducer_id]

        # Aggregate counts
        word_counts: dict[str, int] = {}
        async with aiofiles.open(partition_file, "r", encoding="utf-8") as f:
            async for line in f:
                parts = line.strip().split()
                word = parts[0]
                counts = [int(c) for c in parts[1:]]
                word_counts[word] = sum(counts)

        print(f"   Aggregated {len(word_counts)} unique words")

        # Write final results
        output_file = TMP_DIR / f"reduced_results_{reducer_id}.txt"
        async with aiofiles.open(output_file, "w", encoding="utf-8") as f:
            for word, count in sorted(word_counts.items()):
                await f.write(f"{word} {count}\n")

        print(f"   ✅ Wrote results to {output_file}")
        return ReduceCompleted(file_paths=[output_file])

## Completion Executor

The **CompletionExecutor** collects final results from all reducers.

### Key Features:
- Receives list of all reducer output files
- Displays total word count statistics
- Shows sample results from each reducer
- Confirms successful workflow completion

### Final Output:
- List of file paths containing word counts
- Each file has sorted `(word, count)` pairs
- Results distributed across `NUM_REDUCERS` files

### Post-Processing Options:
- Merge all reducer files for global word counts
- Sort by count to find most frequent words
- Generate word cloud visualization
- Export to database or analytics platform

In [None]:
class CompletionExecutor(Executor[list[ReduceCompleted], ReduceCompleted]):
    """Collects final results from all reducers."""

    async def execute(
        self, ctx: ExecutorContext[list[ReduceCompleted]]
    ) -> ReduceCompleted:
        print("\n" + "="*60)
        print("PHASE 5: COMPLETION")
        print("="*60)

        reduce_results = ctx.get_input_data()

        # Collect all reducer output files
        all_result_files = []
        for result in reduce_results:
            all_result_files.extend(result.file_paths)

        print(f"\n📊 Final Results:")
        print(f"   - Total reducer output files: {len(all_result_files)}")

        # Display sample results from each file
        for result_file in all_result_files:
            async with aiofiles.open(result_file, "r", encoding="utf-8") as f:
                lines = await f.readlines()
                print(f"\n   📄 {result_file.name}: {len(lines)} unique words")
                # Show first 3 words
                for line in lines[:3]:
                    word, count = line.strip().split()
                    print(f"      - {word}: {count}")
                if len(lines) > 3:
                    print(f"      ... and {len(lines) - 3} more words")

        print("\n" + "="*60)
        print("✅ MAP-REDUCE WORKFLOW COMPLETE")
        print("="*60)

        return ReduceCompleted(file_paths=all_result_files)

## Build the Workflow

Construct the map-reduce workflow with proper phase connections.

### Workflow Construction Steps:

1. **Create executor instances** with unique IDs
2. **Connect phases** with appropriate edges:
   - Split → Map (fan-out to all mappers)
   - Map → Shuffle (fan-in from all mappers)
   - Shuffle → Reduce (fan-out to all reducers)
   - Reduce → Completion (fan-in from all reducers)
3. **Set entry point** to Split

### Graph Structure:
```
        Split
          ↓ (fan-out)
    Map_0, Map_1, Map_2
          ↓ (fan-in)
       Shuffle
          ↓ (fan-out)
  Reduce_0, Reduce_1, Reduce_2, Reduce_3
          ↓ (fan-in)
      Completion
```

In [None]:
# Create executor instances
split = SplitExecutor()

mappers = [MapExecutor(id=f"mapper_{i}") for i in range(NUM_MAPPERS)]

shuffle = ShuffleExecutor()

reducers = [ReduceExecutor(id=f"reducer_{i}") for i in range(NUM_REDUCERS)]

completion = CompletionExecutor()

# Build the workflow
workflow = Workflow()

# Phase 1 → Phase 2: Split → Map (fan-out)
workflow.add_fan_out_edges(split, mappers)

# Phase 2 → Phase 3: Map → Shuffle (fan-in)
workflow.add_fan_in_edges(mappers, shuffle)

# Phase 3 → Phase 4: Shuffle → Reduce (fan-out)
workflow.add_fan_out_edges(shuffle, reducers)

# Phase 4 → Phase 5: Reduce → Completion (fan-in)
workflow.add_fan_in_edges(reducers, completion)

# Set entry point
workflow.set_entry_point(split)

print("✅ Map-reduce workflow constructed successfully!")
print(f"\nWorkflow phases:")
print(f"   1. Split (1 executor)")
print(f"   2. Map ({NUM_MAPPERS} executors in parallel)")
print(f"   3. Shuffle (1 executor)")
print(f"   4. Reduce ({NUM_REDUCERS} executors in parallel)")
print(f"   5. Completion (1 executor)")

## Workflow Visualization

Use **WorkflowViz** to generate visual representations of the workflow graph.

### Visualization Options:

1. **Mermaid Diagram** (Text-based)
   - Renders in documentation and markdown files
   - Compatible with GitHub, GitLab, and many IDEs
   - Easy to version control

2. **DiGraph** (NetworkX)
   - Programmatic graph analysis
   - Compute graph properties (degree, centrality, etc.)
   - Apply graph algorithms

3. **SVG Export** (Vector Graphics)
   - High-quality documentation images
   - Scalable without quality loss
   - Embeddable in web pages and PDFs

### WorkflowViz API:
```python
viz = WorkflowViz(workflow)
mermaid_code = viz.to_mermaid()  # Generate Mermaid diagram
digraph = viz.to_digraph()       # Generate NetworkX DiGraph
viz.to_svg("workflow.svg")       # Export as SVG file
```

In [None]:
# Create WorkflowViz instance
viz = WorkflowViz(workflow)

# Generate Mermaid diagram
mermaid_code = viz.to_mermaid()
print("\n" + "="*60)
print("MERMAID DIAGRAM")
print("="*60)
print(mermaid_code)
print("\n💡 Copy the above code to https://mermaid.live to visualize")

# Generate NetworkX DiGraph
digraph = viz.to_digraph()
print(f"\n📊 DiGraph Statistics:")
print(f"   - Nodes: {digraph.number_of_nodes()}")
print(f"   - Edges: {digraph.number_of_edges()}")
print(f"   - Node names: {list(digraph.nodes())}")

# Export as SVG (optional - requires graphviz)
try:
    svg_file = TMP_DIR / "workflow_graph.svg"
    viz.to_svg(str(svg_file))
    print(f"\n✅ Workflow graph exported to {svg_file}")
except Exception as e:
    print(f"\n⚠️  SVG export failed: {e}")
    print("   Install graphviz to enable SVG export")

## Run the Workflow

Execute the map-reduce workflow and observe each phase.

### Expected Behavior:
1. **Split**: Divides text into 3 chunks, stores in shared state
2. **Map**: 3 mappers process chunks in parallel, write word pairs to files
3. **Shuffle**: Groups words by key, partitions across 4 reducers, writes to files
4. **Reduce**: 4 reducers aggregate counts in parallel, write final counts to files
5. **Completion**: Displays final statistics and sample results

### Performance Notes:
- Map and Reduce phases execute in parallel
- File I/O is asynchronous (non-blocking)
- Memory usage stays constant regardless of input size
- Intermediate files enable fault recovery

In [None]:
# Run the workflow
result = await workflow.run()

print(f"\n🎉 Workflow execution complete!")
print(f"   Final result files: {result.file_paths}")

## Analyze Results

Let's examine the final word count results across all reducer files.

In [None]:
# Aggregate all word counts
all_word_counts: dict[str, int] = {}

for result_file in result.file_paths:
    async with aiofiles.open(result_file, "r", encoding="utf-8") as f:
        async for line in f:
            word, count = line.strip().split()
            all_word_counts[word] = int(count)

# Sort by count (descending)
sorted_words = sorted(all_word_counts.items(), key=lambda x: x[1], reverse=True)

print("\n" + "="*60)
print("TOP 20 MOST FREQUENT WORDS")
print("="*60)
for i, (word, count) in enumerate(sorted_words[:20], 1):
    print(f"{i:2}. {word:20} → {count:4} occurrences")

print(f"\n📊 Total Statistics:")
print(f"   - Unique words: {len(all_word_counts)}")
print(f"   - Total word occurrences: {sum(all_word_counts.values())}")
print(f"   - Average count per word: {sum(all_word_counts.values()) / len(all_word_counts):.2f}")

## Key Takeaways

### Map-Reduce Architecture
- ✅ **Scalable Pattern**: Processes data too large for single machine
- ✅ **Five Phases**: Split → Map → Shuffle → Reduce → Completion
- ✅ **Parallel Execution**: Map and Reduce phases run concurrently
- ✅ **Data Locality**: Each executor processes independent data chunks

### File-Backed Intermediate Results
- ✅ **Memory Efficiency**: Constant memory usage regardless of data size
- ✅ **Fault Tolerance**: Intermediate files enable recovery from failures
- ✅ **Async I/O**: Non-blocking file operations with aiofiles
- ✅ **Scalability**: Handle datasets larger than available RAM

### Shared State Coordination
- ✅ **Type-Safe Communication**: Strongly typed shared state objects
- ✅ **Dynamic Assignment**: Runtime chunk and partition allocation
- ✅ **Context Isolation**: Each executor has independent context
- ✅ **Flexible Coordination**: Supports complex multi-executor patterns

### Workflow Visualization
- ✅ **Mermaid Diagrams**: Text-based graph rendering for docs
- ✅ **DiGraph Objects**: Programmatic graph analysis with NetworkX
- ✅ **SVG Export**: High-quality vector graphics for documentation
- ✅ **Graph Analytics**: Compute properties and apply algorithms

### Marker-Based Phases
- ✅ **Type Safety**: Dataclasses ensure type-correct phase transitions
- ✅ **Clear Semantics**: Marker classes document workflow stages
- ✅ **Metadata Passing**: File paths and counts flow through pipeline
- ✅ **Debugging Aid**: Markers make execution flow explicit

### Configurable Parallelism
- ✅ **Tunable Performance**: Adjust mapper and reducer counts
- ✅ **Resource Management**: Balance CPU and I/O utilization
- ✅ **Scalability Testing**: Experiment with different configurations
- ✅ **Production Optimization**: Profile and tune for workload

### When to Use This Pattern
- ✅ Processing large text files (logs, documents, web pages)
- ✅ Distributed aggregation (analytics, reporting, statistics)
- ✅ Data-intensive batch processing
- ✅ ETL pipelines with transform and reduce steps
- ✅ Any workload with embarrassingly parallel map phase

### Best Practices
- 🎯 **Partition Wisely**: Balance data distribution across reducers
- 🎯 **Use Async I/O**: Prevent blocking on file operations
- 🎯 **Clean Intermediate Files**: Remove tmp files after completion
- 🎯 **Monitor Progress**: Add logging for phase transitions
- 🎯 **Handle Errors**: Implement retry logic for file operations
- 🎯 **Tune Parallelism**: Profile to find optimal executor counts
- 🎯 **Visualize Workflows**: Generate diagrams for documentation

### Performance Optimization
- 🚀 Increase mapper count for faster parallel processing
- 🚀 Increase reducer count for better aggregation parallelism
- 🚀 Use larger chunks for fewer but longer-running map tasks
- 🚀 Optimize hash function for better key distribution
- 🚀 Use SSDs for faster intermediate file I/O
- 🚀 Profile async I/O to identify bottlenecks

### Next Steps
- Process different file types (CSV, JSON, Parquet)
- Implement custom aggregation functions (beyond word count)
- Add compression for intermediate files
- Integrate with distributed file systems (S3, HDFS)
- Build monitoring dashboard for workflow execution
- Add checkpointing for long-running workflows
- Experiment with different partitioning strategies
- Create reusable map-reduce workflow templates