# User Story Breakdown Model Development Documentation

**Project:** Automated User Story Task Breakdown with Dependency Analysis and Skill Extraction

## Table of Contents

1. [Project Overview](#1-project-overview)
2. [Input Form Strategy](#2-input-form-strategy)
3. [Treatment Architecture & Implementation](#3-treatment-architecture--implementation)
4. [Output Form & Results](#4-output-form--results)
5. [Evaluation & Performance Analysis](#5-evaluation--performance-analysis)

## 1. Project Overview

### 1.1 Problem Statement
The goal of this project is to develop a model that can automatically break down user stories into tasks, identify dependencies between tasks, and extract the required skills needed for each task. 

### 1.2 Key Objectives
- **Task Decomposition**: Break user stories into granular, actionable tasks
- **Dependency Identification**: Detect relationships and dependencies between tasks
- **Skill Extraction**: Identify required technical and non-technical skills
- **Effort Estimation**: Provide realistic time estimates for each task

### 1.3 Success Metrics
- Task breakdown accuracy
- Dependency detection precision
- Skill extraction completeness
- Processing time and cost efficiency

## 2. Input Form Strategy

### 2.1 Input Structure Design

The input strategy focuses on handling various formats and quantities of user stories to optimize processing efficiency and accuracy.


#### 2.1.1 Strategy 1: Individual Processing

**Approach**: Process each user story separately



In [43]:
import sys
import os

# Add the scripts directory to Python path
scripts_dir = os.path.join(os.getcwd(), 'scripts')
sys.path.append(scripts_dir)

In [44]:
SAMPLE_USER_STORIES = [
    "As a user, I want to click on the address so that it takes me to a new tab with Google Maps.",
    "As a user, I want to be able to anonymously view public information so that I know about recycling centers near me before creating an account.",
    "As a user, I want to create an account so that I can save my favorite recycling centers.",
    "As a user, I want to search for recycling centers by material type so that I can find where to recycle specific items.",
    "As a user, I want to rate and review recycling centers so that other users can benefit from my experience.",
    "As a user, I want to receive notifications about new recycling centers in my area so that I stay informed.",
    "As an admin, I want to manage recycling center information so that the database stays up-to-date.",
    "As a user, I want to filter search results by distance so that I can find the closest recycling centers.",
    "As a user, I want to view operating hours for each recycling center so that I know when to visit.",
    "As a user, I want to get directions to a recycling center so that I can navigate there easily.",
    "As a user, I want to upload photos of recycling centers so that others can see what the facilities look like.",
    "As a user, I want to report incorrect information about recycling centers so that the database stays accurate.",
]

In [33]:
# Add this to your existing code to show detailed results immediately

class EnhancedIndividualProcessor:
    """Enhanced version that captures and displays detailed results"""
    
    def __init__(self):
        # Use your existing components
        self.decomposer = TaskDecomposerAgent()
        self.consolidator = TaskConsolidatorAgent()
        self.dependency_analyzer = DependencyAnalyzerAgent()
        self.skill_mapper = SkillMapperAgent()
    
    async def process_stories_with_details(self, user_stories: List[str]):
        """Process stories and immediately display detailed results"""
        print(f"🏢 TRADITIONALISTS: Processing {len(user_stories)} stories individually...")
        start_time = time.time()
        
        try:
            # Use the existing process_multiple_user_stories function
            result = await process_multiple_user_stories(user_stories)
            
            if "error" in result:
                raise Exception(result["error"])
            
            # Calculate metrics
            api_calls = len(user_stories) + 1 + len(result["tasks"])
            total_tasks = sum(len(tasks) for tasks in result.get("task_origins", {}).values())
            duplicate_reduction = 1 - (len(result["tasks"]) / total_tasks) if total_tasks > 0 else 0
            dependencies_found = sum(len(deps) for deps in result["dependencies"].values())
            total_skills = sum(len(skills) for skills in result["required_skills"].values())
            
            print(f"✅ SUCCESS: Processed {len(user_stories)} stories with {api_calls} API calls")
            
            # IMMEDIATELY DISPLAY DETAILED RESULTS
            self.display_detailed_results(result, time.time() - start_time, api_calls)
            
            return result
            
        except Exception as e:
            print(f"❌ ERROR: {str(e)}")
            return {"error": str(e)}
    
    def display_detailed_results(self, result, execution_time, api_calls):
        """Display comprehensive detailed results immediately"""
        
        print("\n" + "=" * 80)
        print("🏢 PARTY A: TRADITIONALISTS - COMPLETE DETAILED RESULTS")
        print("=" * 80)
        
        # SUMMARY METRICS
        print("\n📊 PERFORMANCE SUMMARY:")
        print("-" * 50)
        print(f"⏱️  Execution Time: {execution_time:.2f} seconds")
        print(f"🔌 API Calls: {api_calls}")
        print(f"📋 Total Tasks Generated: {len(result['tasks'])}")
        print(f"🔗 Dependencies Found: {sum(len(deps) for deps in result['dependencies'].values())}")
        print(f"🎯 Skills Identified: {sum(len(skills) for skills in result['required_skills'].values())}")
        print(f"⚡ Efficiency: {len(result['tasks']) / api_calls:.2f} tasks per API call")
        
        # DETAILED TASKS
        print(f"\n📋 ALL {len(result['tasks'])} DECOMPOSED TASKS:")
        print("-" * 50)
        for i, task in enumerate(result['tasks'], 1):
            print(f"{i:2d}. {task}")
            
            # Show which story(ies) generated this task
            origins = result['task_origins'].get(task, [])
            if origins:
                if len(origins) == 1:
                    print(f"    └─ From: {origins[0][:60]}...")
                else:
                    print(f"    └─ From {len(origins)} stories (duplicate found):")
                    for origin in origins:
                        print(f"       • {origin[:55]}...")
            print()
        
        # DEPENDENCIES
        print(f"\n🔗 TASK DEPENDENCIES:")
        print("-" * 50)
        
        tasks_with_deps = {task: deps for task, deps in result['dependencies'].items() if deps}
        
        if tasks_with_deps:
            print(f"Found {len(tasks_with_deps)} tasks with dependencies:\n")
            
            for i, (task, deps) in enumerate(tasks_with_deps.items(), 1):
                print(f"{i}. 📋 TASK: {task}")
                print(f"   ⬇️  DEPENDS ON:")
                for j, dep in enumerate(deps, 1):
                    print(f"      {j}. {dep}")
                print()
        else:
            print("ℹ️  No dependencies detected between tasks")
        
        # SKILLS MAPPING
        print(f"\n🎯 SKILLS REQUIRED FOR EACH TASK:")
        print("-" * 50)
        
        for i, (task, skills) in enumerate(result['required_skills'].items(), 1):
            if skills:  # Only show tasks with skills
                print(f"{i:2d}. 📋 {task}")
                print(f"     🛠️  Skills: {', '.join(skills)}")
                print()
        
        # ANALYSIS
        print(f"\n📈 DETAILED ANALYSIS:")
        print("-" * 50)
        
        # Task distribution per story
        story_counts = {}
        for task, origins in result['task_origins'].items():
            for origin in origins:
                short_origin = origin[:40] + "..." if len(origin) > 40 else origin
                story_counts[short_origin] = story_counts.get(short_origin, 0) + 1
        
        print("📊 Tasks Generated per Story:")
        for story, count in story_counts.items():
            print(f"   • {story}: {count} tasks")
        
        # Dependency analysis
        dep_counts = [len(deps) for deps in result['dependencies'].values()]
        if dep_counts:
            max_deps = max(dep_counts)
            avg_deps = sum(dep_counts) / len(dep_counts)
            tasks_with_deps_count = sum(1 for d in dep_counts if d > 0)
            
            print(f"\n🔗 Dependency Statistics:")
            print(f"   • Tasks with dependencies: {tasks_with_deps_count}")
            print(f"   • Maximum dependencies per task: {max_deps}")
            print(f"   • Average dependencies per task: {avg_deps:.1f}")
        
        # Skills analysis
        all_skills = set()
        for skills in result['required_skills'].values():
            all_skills.update(skills)
        
        print(f"\n🎯 Skills Analysis:")
        print(f"   • Unique skills identified: {len(all_skills)}")
        print(f"   • Most common skills: {', '.join(list(all_skills)[:5])}")

# SIMPLE FUNCTION TO RUN AND DISPLAY EVERYTHING
async def run_with_full_details():
    """Run the processor and display all detailed results immediately"""
    
    print("🏢 PARTY A: INDIVIDUAL PROCESSING WITH FULL DETAILS")
    print("=" * 60)
    print("Philosophy: 'One story at a time, done right'")
    print("")
    
    # Use sample stories
    user_stories = SAMPLE_USER_STORIES
    print(f"Processing {len(user_stories)} sample user stories...")
    
    # Create processor and run with detailed display
    processor = EnhancedIndividualProcessor()
    result = await processor.process_stories_with_details(user_stories)
    
    return result

result = await run_with_full_details()

🏢 PARTY A: INDIVIDUAL PROCESSING WITH FULL DETAILS
Philosophy: 'One story at a time, done right'

Processing 12 sample user stories...
🏢 TRADITIONALISTS: Processing 12 stories individually...
[TASK_DECOMPOSITION] Tokens - Input: 270, Output: 25, Total: 295
[TASK_DECOMPOSITION] Tokens - Input: 274, Output: 81, Total: 355
[TASK_DECOMPOSITION] Tokens - Input: 266, Output: 104, Total: 370
[TASK_DECOMPOSITION] Tokens - Input: 271, Output: 84, Total: 355
[TASK_DECOMPOSITION] Tokens - Input: 268, Output: 99, Total: 367
[TASK_DECOMPOSITION] Tokens - Input: 268, Output: 83, Total: 351
[TASK_DECOMPOSITION] Tokens - Input: 266, Output: 123, Total: 389
[TASK_DECOMPOSITION] Tokens - Input: 268, Output: 80, Total: 348
[TASK_DECOMPOSITION] Tokens - Input: 268, Output: 85, Total: 353
[TASK_DECOMPOSITION] Tokens - Input: 267, Output: 101, Total: 368
[TASK_DECOMPOSITION] Tokens - Input: 269, Output: 115, Total: 384
[TASK_DECOMPOSITION] Tokens - Input: 266, Output: 128, Total: 394
[DEPENDENCY_ANALYSIS] T

**Individual Processing Performance Analysis**

- Processing Time
> **Performance:** ⭐⭐⭐ (Slowest)

- Characteristics :

> Sequential processing of each story

> No parallelization benefits

> Processing time scales linearly: `O(n)` where n = number of stories

> Overhead from multiple API round-trips

- Expected Range
> **15-30 seconds for 10 stories**

- Bottlenecks
> **Network latency multiplied by story count**

---

- 🔤 Token Usage
> **Efficiency:** ⭐⭐ (Highest Usage)


- Formula
> **(Context + Examples + Story) × Number of Stories + Dependencies + Skills**

- Estimated Tokens
> **~800-1200 per story + analysis overhead**

---

- API Costs
> **Cost Rating:** ⭐ (Most Expensive)

- Cost Structure
> **High number of API calls: Stories + 1 (dependencies) + Unique_Tasks (skills)**

- Typical Example
> **10 stories = 10 + 1 + 25 = 36 API calls**

- Cost Factors
  
> Maximum API call count

> Redundant context transmission

> Individual skill mapping calls

---

- Dependency Detection Accuracy
> **Accuracy:** ⭐⭐⭐⭐⭐ (Highest)

- Advantages
  
> Each story gets individual attention

> Detailed task breakdown per story

> Comprehensive dependency analysis on final consolidated tasks

> No context dilution

---

- 📊 Summary

- Best Performance Aspects

> **✅ Highest reliability and accuracy**

> **✅ Battle-tested methodology**

> **✅ Error isolation per story**

> **✅ Comprehensive individual analysis**

- Trade-offs
  
> **⚠️ Higher API usage**

> **⚠️ Slower execution time**

> **⚠️ Less efficient for large batches**


#### 2.1.2 Strategy 2: Batch All Processing

**Approach**: Process all user stories in a single batch

**Trade-offs**:
- ✅ Better cross-story dependency detection
- ✅ Lower API costs with single call
- ✅ Maintains global context awareness
- ❌ Token limit constraints for large batches
- ❌ Harder to recover from errors


In [40]:

@dataclass
class ProcessingResults:
    method: str
    execution_time: float
    api_calls: int
    total_tasks: int
    unique_tasks: int
    duplicate_reduction: float
    dependencies_found: int
    total_skills: int
    stories_processed: int
    errors: List[str]

class GroupedBatchDecomposer:
    """Batch decomposer optimized for group processing"""
    
    def __init__(self):
        self.few_shot_examples = """
User Stories:
1. As a user, I want to click on the address so that it takes me to a new tab with Google Maps.
2. As a user, I want to be able to anonymously view public information so that I know about recycling centers near me before creating an account.
3. As a user, I want to create an account so that I can save my favorite recycling centers.

Tasks for Story 1:
1. Make address text clickable
2. Implement click handler to format address for Google Maps URL
3. Open Google Maps in new tab/window
4. Add proper URL encoding for address parameters

Tasks for Story 2:
1. Design public landing page layout
2. Create anonymous user session handling
3. Implement facility search without authentication
4. Display basic facility information publicly 
5. Design facility component
6. Detect user's location via browser API or IP
7. Show recycling centers within a radius of the user
8. Design facility list display component

Tasks for Story 3:
1. Design user registration form
2. Implement user authentication system
3. Create user profile management
4. Add favorites functionality to UI
5. Implement save/unsave facility feature
6. Design favorites list component
"""
    
    async def decompose_group(self, user_stories: List[str]) -> Dict[str, List[str]]:
        """Decompose a group of user stories (typically 3-5 stories)"""
        stories_text = "\n".join([f"{i+1}. {story}" for i, story in enumerate(user_stories)])
        
        prompt = f"""
You are a task decomposition expert. Break down EACH of the following user stories into specific, actionable technical tasks.
Each task should be simple and focused on a single responsibility.

For each user story, provide tasks in the format:
Tasks for Story X:
1. Task description
2. Task description
...

IMPORTANT: Return tasks for ALL user stories. Do NOT skip any stories.

Examples:
{self.few_shot_examples}

User Stories:
{stories_text}

Tasks:
"""
        
        response = client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model="llama3-70b-8192",
            temperature=0.3
        )
        
        return self._parse_response(response.choices[0].message.content.strip(), user_stories)
    
    def _parse_response(self, content: str, user_stories: List[str]) -> Dict[str, List[str]]:
        """Parse the LLM response into structured task data"""
        result = {}
        lines = content.split('\n')
        current_story = None
        current_tasks = []
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            if line.lower().startswith('tasks for story'):
                if current_story is not None and current_tasks:
                    result[current_story] = current_tasks
                
                story_match = re.search(r'story\s+(\d+)', line.lower())
                if story_match:
                    story_num = int(story_match.group(1)) - 1
                    if 0 <= story_num < len(user_stories):
                        current_story = user_stories[story_num]
                        current_tasks = []
            
            elif line and any(line.startswith(str(i) + '.') for i in range(1, 21)):
                task = re.sub(r'^\d+\.\s*', '', line).strip()
                if task and len(task) > 10:
                    current_tasks.append(task)
        
        if current_story is not None and current_tasks:
            result[current_story] = current_tasks
        
        return result

class EnhancedGroupedProcessor:
    """
    Enhanced Grouped Processor that captures and displays detailed results
    """
    
    def __init__(self, group_size: int = 3):
        self.group_size = group_size
        self.batch_decomposer = GroupedBatchDecomposer()
        self.consolidator = TaskConsolidatorAgent()
        self.dependency_analyzer = DependencyAnalyzerAgent()
        self.skill_mapper = SkillMapperAgent()
    
    async def process_stories_with_details(self, user_stories: List[str]):
        """Process stories and immediately display detailed results"""
        print(f"🚀 OPTIMIZERS: Processing {len(user_stories)} stories in groups of {self.group_size}...")
        start_time = time.time()
        errors = []
        api_calls = 0
        
        try:
            # Step 1: Group stories and decompose each group
            groups = [user_stories[i:i + self.group_size] for i in range(0, len(user_stories), self.group_size)]
            all_user_stories_tasks = {}
            
            print(f"📦 Created {len(groups)} groups for processing")
            
            for i, group in enumerate(groups, 1):
                try:
                    print(f"   Processing group {i}/{len(groups)} ({len(group)} stories)...")
                    group_tasks = await self.batch_decomposer.decompose_group(group)
                    api_calls += 1
                    all_user_stories_tasks.update(group_tasks)
                    print(f"   ✅ Group {i} completed")
                except Exception as e:
                    error_msg = f"Failed to process group {i}: {str(e)}"
                    errors.append(error_msg)
                    print(f"   ❌ {error_msg}")
            
            # Step 2: Consolidate tasks from all groups
            print("🔄 Consolidating tasks across all groups...")
            unique_tasks, task_origins = self.consolidator.consolidate_tasks(all_user_stories_tasks)
            
            # Step 3: Analyze dependencies
            print("🔗 Analyzing task dependencies...")
            dependencies = await self.dependency_analyzer.analyze(unique_tasks)
            api_calls += 1
            
            # Step 4: Map skills for each unique task
            print("🎯 Mapping required skills...")
            skill_map = {}
            for task in unique_tasks:
                try:
                    skills = await self.skill_mapper.map_skills(task)
                    skill_map[task] = skills
                    api_calls += 1
                except Exception as e:
                    error_msg = f"Failed to map skills for task: {str(e)}"
                    errors.append(error_msg)
            
            # Calculate metrics
            total_tasks = sum(len(tasks) for tasks in all_user_stories_tasks.values())
            duplicate_reduction = (total_tasks - len(unique_tasks)) / total_tasks if total_tasks > 0 else 0
            dependencies_found = sum(len(deps) for deps in dependencies.values())
            total_skills = sum(len(skills) for skills in skill_map.values())
            
            print(f"✅ SUCCESS: Processed {len(all_user_stories_tasks)} stories with {api_calls} API calls")
            
            # IMMEDIATELY DISPLAY DETAILED RESULTS
            self.display_detailed_results(
                unique_tasks, task_origins, dependencies, skill_map, 
                time.time() - start_time, api_calls, total_tasks, errors
            )
            
            return {
                "tasks": unique_tasks,
                "task_origins": task_origins,
                "dependencies": dependencies,
                "required_skills": skill_map,
                "api_calls": api_calls,
                "execution_time": time.time() - start_time,
                "errors": errors
            }
            
        except Exception as e:
            print(f"❌ CRITICAL ERROR: {str(e)}")
            return {"error": str(e)}
    
    def display_detailed_results(self, unique_tasks, task_origins, dependencies, skill_map, execution_time, api_calls, total_tasks, errors):
        """Display comprehensive detailed results immediately"""
        
        print("\n" + "=" * 80)
        print("🚀 PARTY B: OPTIMIZERS - COMPLETE DETAILED RESULTS")
        print("=" * 80)
        
        # SUMMARY METRICS
        print(f"\n📊 PERFORMANCE SUMMARY:")
        print("-" * 50)
        print(f"⏱️  Execution Time: {execution_time:.2f} seconds")
        print(f"🔌 API Calls: {api_calls}")
        print(f"📦 Group Size: {self.group_size}")
        print(f"📋 Total Tasks Generated: {total_tasks}")
        print(f"🎯 Unique Tasks After Consolidation: {len(unique_tasks)}")
        print(f"🔄 Duplicate Reduction: {((total_tasks - len(unique_tasks)) / total_tasks * 100) if total_tasks > 0 else 0:.1f}%")
        print(f"🔗 Dependencies Found: {sum(len(deps) for deps in dependencies.values())}")
        print(f"🛠️  Skills Identified: {sum(len(skills) for skills in skill_map.values())}")
        print(f"⚡ Efficiency: {len(unique_tasks) / api_calls:.2f} tasks per API call")
        
        if errors:
            print(f"⚠️  Errors: {len(errors)}")
            for error in errors:
                print(f"   • {error}")
        
        # DETAILED TASKS
        print(f"\n📋 ALL {len(unique_tasks)} CONSOLIDATED TASKS:")
        print("-" * 50)
        for i, task in enumerate(unique_tasks, 1):
            print(f"{i:2d}. {task}")
            
            # Show which story(ies) generated this task
            origins = task_origins.get(task, [])
            if origins:
                if len(origins) == 1:
                    print(f"    └─ From: {origins[0][:60]}...")
                else:
                    print(f"    └─ From {len(origins)} stories (DUPLICATE CONSOLIDATED):")
                    for origin in origins:
                        print(f"       • {origin[:55]}...")
            print()
        
        # DEPENDENCIES
        print(f"\n🔗 TASK DEPENDENCIES:")
        print("-" * 50)
        
        tasks_with_deps = {task: deps for task, deps in dependencies.items() if deps}
        
        if tasks_with_deps:
            print(f"Found {len(tasks_with_deps)} tasks with dependencies:\n")
            
            for i, (task, deps) in enumerate(tasks_with_deps.items(), 1):
                print(f"{i}. 📋 TASK: {task}")
                print(f"   ⬇️  DEPENDS ON:")
                for j, dep in enumerate(deps, 1):
                    print(f"      {j}. {dep}")
                print()
        else:
            print("ℹ️  No dependencies detected between tasks")
        
        # SKILLS MAPPING
        print(f"\n🎯 SKILLS REQUIRED FOR EACH TASK:")
        print("-" * 50)
        
        for i, (task, skills) in enumerate(skill_map.items(), 1):
            if skills:  # Only show tasks with skills
                print(f"{i:2d}. 📋 {task}")
                print(f"     🛠️  Skills: {', '.join(skills)}")
                print()
        
        # GROUPING ANALYSIS
        print(f"\n📦 GROUPING ANALYSIS:")
        print("-" * 50)
        
        # Analyze how tasks were distributed from groups
        story_counts = {}
        for task, origins in task_origins.items():
            for origin in origins:
                short_origin = origin[:40] + "..." if len(origin) > 40 else origin
                story_counts[short_origin] = story_counts.get(short_origin, 0) + 1
        
        print("📊 Tasks Generated per Story:")
        for story, count in story_counts.items():
            print(f"   • {story}: {count} tasks")
        
        print(f"\n🎯 Grouping Efficiency:")
        print(f"   • Group size used: {self.group_size}")
        print(f"   • Number of groups: {api_calls - 1 - len(unique_tasks)}") # Subtract dependency and skill calls
        print(f"   • Average tasks per group call: {total_tasks / max(1, api_calls - 1 - len(unique_tasks)):.1f}")
        
        # Skills analysis
        all_skills = set()
        for skills in skill_map.values():
            all_skills.update(skills)
        
        print(f"\n🛠️  Skills Analysis:")
        print(f"   • Unique skills identified: {len(all_skills)}")
        if all_skills:
            print(f"   • Most common skills: {', '.join(list(all_skills)[:5])}")

# ============================================================================
# JUPYTER-COMPATIBLE FUNCTION WITH DETAILED DISPLAY
# ============================================================================

async def run_grouped_with_full_details(group_size: int = 3):
    """Run the grouped processor and display all detailed results immediately"""
    
    print("🚀 PARTY B: GROUPED PROCESSING WITH FULL DETAILS")
    print("=" * 60)
    print("Philosophy: 'Smart batching for balanced efficiency'")
    print(f"Group size: {group_size}")
    print("")
    
    # Use sample stories
    user_stories = SAMPLE_USER_STORIES
    print(f"Processing {len(user_stories)} sample user stories...")
    
    # Create processor and run with detailed display
    processor = EnhancedGroupedProcessor(group_size=group_size)
    result = await processor.process_stories_with_details(user_stories)
    
    return result

result = await run_grouped_with_full_details(group_size=2)

🚀 PARTY B: GROUPED PROCESSING WITH FULL DETAILS
Philosophy: 'Smart batching for balanced efficiency'
Group size: 2

Processing 5 sample user stories...
🚀 OPTIMIZERS: Processing 5 stories in groups of 2...
📦 Created 3 groups for processing
   Processing group 1/3 (2 stories)...
   ✅ Group 1 completed
   Processing group 2/3 (2 stories)...
   ✅ Group 2 completed
   Processing group 3/3 (1 stories)...
   ✅ Group 3 completed
🔄 Consolidating tasks across all groups...
🔗 Analyzing task dependencies...
🎯 Mapping required skills...
✅ SUCCESS: Processed 0 stories with 4 API calls

🚀 PARTY B: OPTIMIZERS - COMPLETE DETAILED RESULTS

📊 PERFORMANCE SUMMARY:
--------------------------------------------------
⏱️  Execution Time: 5.02 seconds
🔌 API Calls: 4
📦 Group Size: 2
📋 Total Tasks Generated: 0
🎯 Unique Tasks After Consolidation: 0
🔄 Duplicate Reduction: 0.0%
🔗 Dependencies Found: 0
🛠️  Skills Identified: 0
⚡ Efficiency: 0.00 tasks per API call

📋 ALL 0 CONSOLIDATED TASKS:
-----------------------

**Grouped Processing Performance Analysis**
- Processing Time
> **Performance:** ⭐⭐⭐⭐ (Balanced)
- Characteristics :
  
> Moderate parallelization through grouping

> Processing time: `O(n/g)` where g = group size

> Reduced API round-trips

> Configurable group sizes for optimization
- Expected Range
> **8-15 seconds for 10 stories (groups of 3)**
- Bottlenecks
> **Sweet spot between speed and reliability**
---
- 🔤 Token Usage
> **Efficiency:** ⭐⭐⭐⭐ (Balanced)
- Formula
> **(Context + Examples + Group_Stories) × Number_of_Groups + Dependencies + Skills**
- Estimated Tokens
> **~400-600 per story + analysis overhead**
---
- API Costs
> **Cost Rating:** ⭐⭐⭐⭐ (Moderate)
---
- Dependency Detection Accuracy
> **Accuracy:** ⭐⭐⭐⭐ (High)
- Advantages
  
> Good task breakdown within groups

> Effective cross-group dependency detection

> Minimal context dilution

> Balanced detail vs. efficiency
- Trade-offs
  
> Slightly less granular than individual processing

> Good inter-group dependency mapping

> Maintains quality while improving speed
---
- Error Recovery Capability
> **Resilience:** ⭐⭐⭐⭐ (Very Good)
- Recovery Features
  
> **Group Isolation:** Failed group doesn't affect others

> **Partial Processing:** Can continue with successful groups

> **Adaptive Grouping:** Can adjust group sizes based on failure patterns

> **Moderate Debugging:** Can isolate issues to specific groups
- Failure Scenarios
> **Group failure rate ~5-10%**
---
- 📊 Summary
- Best Performance Aspects
  
> **✅ Balanced efficiency vs. reliability**

> **✅ Scalable to large story sets**

> **✅ Error resilience per group**

> **✅ Configurable group sizes**
- Trade-offs
  
> **⚠️ More complex than batch-all**

> **⚠️ Coordination overhead**

> **⚠️ Not maximally efficient**

#### 2.1.3 Strategy 3: Grouped Processing

**Approach**: Process user stories in logical groups

**Trade-offs**:
- ✅ Balanced approach between individual and batch
- ✅ Some dependency detection within groups
- ✅ Manageable token usage
- ❌ May miss cross-group dependencies
- ❌ Requires post-processing to merge results

In [46]:
@dataclass
class ProcessingResults:
    method: str
    execution_time: float
    api_calls: int
    total_tasks: int
    unique_tasks: int
    duplicate_reduction: float
    dependencies_found: int
    total_skills: int
    stories_processed: int
    errors: List[str]

class FixedBatchAllTaskDecomposer:
    """Ultra-efficient batch decomposer with robust parsing"""
    
    def __init__(self):
        self.few_shot_examples = """
User Stories:
1. As a user, I want to click on the address so that it takes me to a new tab with Google Maps.
2. As a user, I want to be able to anonymously view public information so that I know about recycling centers near me before creating an account.
3. As a user, I want to create an account so that I can save my favorite recycling centers.

Tasks for Story 1:
1. Make address text clickable
2. Implement click handler to format address for Google Maps URL
3. Open Google Maps in new tab/window
4. Add proper URL encoding for address parameters

Tasks for Story 2:
1. Design public landing page layout
2. Create anonymous user session handling
3. Implement facility search without authentication
4. Display basic facility information publicly 
5. Design facility component
6. Detect user's location via browser API or IP
7. Show recycling centers within a radius of the user

Tasks for Story 3:
1. Design user registration form
2. Implement user authentication system
3. Create user profile management
4. Add favorites functionality to UI
5. Implement save/unsave facility feature
"""
    
    async def decompose_all_stories(self, user_stories: List[str]) -> Dict[str, List[str]]:
        """
        Decompose ALL user stories in a single, powerful API call.
        """
        stories_text = "\n".join([f"{i+1}. {story}" for i, story in enumerate(user_stories)])
        
        prompt = f"""
You are an expert task decomposition system. Break down ALL of the following user stories into specific, actionable technical tasks.

For each user story, provide tasks in the format:
Tasks for Story X:
1. Task description
2. Task description
...

CRITICAL: Return tasks for EVERY SINGLE user story provided. Do NOT skip any stories.

Examples:
{self.few_shot_examples}

User Stories to Process:
{stories_text}

Tasks:
"""
        
        try:
            print(f"🔍 DEBUG: Sending {len(user_stories)} stories to LLM...")
            
            response = client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama3-70b-8192",
                temperature=0.3,
                max_tokens=4000
            )
            
            raw_response = response.choices[0].message.content.strip()
            print(f"🔍 DEBUG: Received response length: {len(raw_response)} characters")
            print(f"🔍 DEBUG: First 200 chars: {raw_response[:200]}...")
            
            parsed_result = self._parse_massive_response(raw_response, user_stories)
            print(f"🔍 DEBUG: Parsed {len(parsed_result)} stories from response")
            
            return parsed_result
            
        except Exception as e:
            print(f"🔍 DEBUG: Exception in decompose_all_stories: {str(e)}")
            raise Exception(f"Batch decomposition failed: {str(e)}")
    
    def _parse_massive_response(self, content: str, user_stories: List[str]) -> Dict[str, List[str]]:
        """
        Enhanced parsing with better error handling and debugging
        """
        print(f"🔍 DEBUG: Starting to parse response for {len(user_stories)} stories")
        
        result = {}
        lines = content.split('\n')
        current_story = None
        current_tasks = []
        
        print(f"🔍 DEBUG: Processing {len(lines)} lines...")
        
        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue
            
            # Look for story headers with multiple patterns
            if (line.lower().startswith('tasks for story') or 
                line.lower().startswith('story') or
                re.match(r'^\d+\.\s*as\s+a', line.lower())):
                
                # Save previous story if exists
                if current_story is not None and current_tasks:
                    print(f"🔍 DEBUG: Saved {len(current_tasks)} tasks for story: {current_story[:50]}...")
                    result[current_story] = current_tasks
                
                # Extract story number
                story_match = re.search(r'story\s+(\d+)', line.lower())
                if story_match:
                    story_num = int(story_match.group(1)) - 1
                    if 0 <= story_num < len(user_stories):
                        current_story = user_stories[story_num]
                        current_tasks = []
                        print(f"🔍 DEBUG: Starting story {story_num + 1}: {current_story[:50]}...")
                    else:
                        print(f"🔍 DEBUG: Invalid story number {story_num + 1}")
                else:
                    # Try to match the story directly if no number found
                    for story in user_stories:
                        if story.lower() in line.lower():
                            current_story = story
                            current_tasks = []
                            print(f"🔍 DEBUG: Matched story by content: {story[:50]}...")
                            break
            
            # Look for task items with flexible patterns
            elif line and (any(line.startswith(str(i) + '.') for i in range(1, 51)) or 
                          line.startswith('-') or 
                          line.startswith('•')):
                
                # Extract task text
                task = line
                if line.startswith(('-', '•')):
                    task = line[1:].strip()
                else:
                    task = re.sub(r'^\d+\.\s*', '', line).strip()
                
                if task and len(task) > 5:  # More lenient minimum length
                    current_tasks.append(task)
                    if len(current_tasks) <= 3:  # Only log first few tasks
                        print(f"🔍 DEBUG: Added task: {task[:60]}...")
        
        # Don't forget the last story
        if current_story is not None and current_tasks:
            print(f"🔍 DEBUG: Saved final {len(current_tasks)} tasks for story: {current_story[:50]}...")
            result[current_story] = current_tasks
        
        print(f"🔍 DEBUG: Final result: {len(result)} stories with tasks")
        for story, tasks in result.items():
            print(f"🔍 DEBUG: {story[:40]}... -> {len(tasks)} tasks")
        
        # If no results, try alternative parsing
        if not result:
            print("🔍 DEBUG: No results from primary parsing, trying fallback...")
            result = self._fallback_parsing(content, user_stories)
        
        return result
    
    def _fallback_parsing(self, content: str, user_stories: List[str]) -> Dict[str, List[str]]:
        """Fallback parsing method"""
        print("🔍 DEBUG: Using fallback parsing...")
        
        result = {}
        lines = content.split('\n')
        
        # Simple approach: assume tasks are listed after story mentions
        current_story_index = 0
        current_tasks = []
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            # Look for numbered tasks
            if re.match(r'^\d+\.', line):
                task = re.sub(r'^\d+\.\s*', '', line).strip()
                if task and len(task) > 5:
                    current_tasks.append(task)
            
            # If we have tasks and see a new story indicator, save current and move to next
            elif current_tasks and (line.lower().startswith('story') or 
                                   line.lower().startswith('tasks') or
                                   re.match(r'^\d+\.\s*as\s+a', line.lower())):
                
                if current_story_index < len(user_stories):
                    result[user_stories[current_story_index]] = current_tasks
                    print(f"🔍 DEBUG: Fallback saved {len(current_tasks)} tasks for story {current_story_index + 1}")
                    current_story_index += 1
                    current_tasks = []
        
        # Save final tasks
        if current_tasks and current_story_index < len(user_stories):
            result[user_stories[current_story_index]] = current_tasks
            print(f"🔍 DEBUG: Fallback saved final {len(current_tasks)} tasks for story {current_story_index + 1}")
        
        return result

class FixedBatchAllProcessor:
    """
    Fixed Batch All Processor with robust error handling
    """
    
    def __init__(self):
        self.batch_decomposer = FixedBatchAllTaskDecomposer()
        self.consolidator = TaskConsolidatorAgent()
        self.dependency_analyzer = DependencyAnalyzerAgent()
        self.skill_mapper = SkillMapperAgent()
    
    def check_context_limits(self, user_stories: List[str]) -> Dict[str, any]:
        """Analyze if the batch size is appropriate for the LLM context window"""
        total_chars = sum(len(story) for story in user_stories)
        estimated_tokens = total_chars // 4  # Rough estimate
        
        # Llama3-70b has ~8k context window
        max_safe_tokens = 6000  # Leave room for response
        
        analysis = {
            "total_stories": len(user_stories),
            "total_characters": total_chars,
            "estimated_tokens": estimated_tokens,
            "max_safe_tokens": max_safe_tokens,
            "within_limits": estimated_tokens < max_safe_tokens,
            "risk_level": "LOW" if estimated_tokens < max_safe_tokens * 0.5 else 
                        "MEDIUM" if estimated_tokens < max_safe_tokens * 0.8 else "HIGH"
        }
        
        return analysis
    
    async def process_stories_with_details(self, user_stories: List[str]):
        """Process stories with enhanced error handling and debugging"""
        print(f"⚡ REVOLUTIONARIES: Processing ALL {len(user_stories)} stories in one mega-batch...")
        start_time = time.time()
        errors = []
        api_calls = 0
        
        # Context analysis
        context_analysis = self.check_context_limits(user_stories)
        print(f"\n🔍 CONTEXT ANALYSIS:")
        print(f"   Stories: {context_analysis['total_stories']}")
        print(f"   Estimated tokens: {context_analysis['estimated_tokens']}")
        print(f"   Risk level: {context_analysis['risk_level']}")
        
        if not context_analysis['within_limits']:
            print("⚠️  WARNING: Large batch size may hit context limits!")
            print("   Proceeding with mega-batch approach anyway...")
        
        try:
            # Step 1: Decompose ALL stories in one massive call
            print("\n🚀 Launching mega-batch decomposition...")
            user_stories_tasks = await self.batch_decomposer.decompose_all_stories(user_stories)
            api_calls += 1
            
            if not user_stories_tasks:
                raise Exception("Batch decomposition returned no results after parsing")
            
            print(f"✅ Successfully decomposed {len(user_stories_tasks)} stories!")
            
            # Debug: Show what we got
            for story, tasks in user_stories_tasks.items():
                print(f"   📋 {story[:50]}... -> {len(tasks)} tasks")
            
            # Step 2: Consolidate tasks across all stories
            print("\n🔄 Consolidating tasks across entire batch...")
            unique_tasks, task_origins = self.consolidator.consolidate_tasks(user_stories_tasks)
            total_tasks_before_consolidation = sum(len(tasks) for tasks in user_stories_tasks.values())
            print(f"📊 Consolidated {total_tasks_before_consolidation} tasks into {len(unique_tasks)} unique tasks")
            
            # Step 3: Analyze dependencies in one call
            print("🔗 Analyzing dependencies across all tasks...")
            dependencies = await self.dependency_analyzer.analyze(unique_tasks)
            api_calls += 1
            
            # Step 4: Map skills for each unique task (with progress tracking)
            print("🎯 Mapping skills for all unique tasks...")
            skill_map = {}
            successful_mappings = 0
            
            for i, task in enumerate(unique_tasks, 1):
                try:
                    skills = await self.skill_mapper.map_skills(task)
                    skill_map[task] = skills
                    successful_mappings += 1
                    api_calls += 1
                    
                    # Progress indicator
                    if i % 5 == 0 or i == len(unique_tasks):
                        print(f"   Progress: {i}/{len(unique_tasks)} tasks processed")
                        
                except Exception as e:
                    error_msg = f"Failed to map skills for task '{task[:30]}...': {str(e)}"
                    errors.append(error_msg)
            
            print(f"✅ Successfully mapped skills for {successful_mappings}/{len(unique_tasks)} tasks")
            
            # Calculate metrics
            duplicate_reduction = (total_tasks_before_consolidation - len(unique_tasks)) / total_tasks_before_consolidation if total_tasks_before_consolidation > 0 else 0
            dependencies_found = sum(len(deps) for deps in dependencies.values())
            total_skills = sum(len(skills) for skills in skill_map.values())
            
            print(f"🏆 MEGA SUCCESS: Processed {len(user_stories_tasks)} stories with only {api_calls} API calls!")
            print(f"⚡ Efficiency: {len(unique_tasks) / api_calls:.2f} tasks per API call")
            
            # IMMEDIATELY DISPLAY DETAILED RESULTS
            self.display_detailed_results(
                unique_tasks, task_origins, dependencies, skill_map, 
                time.time() - start_time, api_calls, total_tasks_before_consolidation, 
                errors, context_analysis
            )
            
            return {
                "tasks": unique_tasks,
                "task_origins": task_origins,
                "dependencies": dependencies,
                "required_skills": skill_map,
                "api_calls": api_calls,
                "execution_time": time.time() - start_time,
                "errors": errors,
                "context_analysis": context_analysis
            }
            
        except Exception as e:
            print(f"💥 CRITICAL FAILURE: {str(e)}")
            print(f"🔍 DEBUG: Full error details: {repr(e)}")
            return {"error": str(e)}
    
    def display_detailed_results(self, unique_tasks, task_origins, dependencies, skill_map, execution_time, api_calls, total_tasks, errors, context_analysis):
        """Display comprehensive detailed results immediately"""
        
        print("\n" + "=" * 80)
        print("⚡ PARTY C: REVOLUTIONARIES - COMPLETE DETAILED RESULTS")
        print("=" * 80)
        
        # SUMMARY METRICS
        print(f"\n📊 REVOLUTIONARY PERFORMANCE SUMMARY:")
        print("-" * 50)
        print(f"⏱️  Execution Time: {execution_time:.2f} seconds")
        print(f"🔌 API Calls: {api_calls} (ULTRA-EFFICIENT!)")
        print(f"📋 Total Tasks Generated: {total_tasks}")
        print(f"🎯 Unique Tasks After Consolidation: {len(unique_tasks)}")
        print(f"🔄 Duplicate Reduction: {((total_tasks - len(unique_tasks)) / total_tasks * 100) if total_tasks > 0 else 0:.1f}%")
        print(f"🔗 Dependencies Found: {sum(len(deps) for deps in dependencies.values())}")
        print(f"🛠️  Skills Identified: {sum(len(skills) for skills in skill_map.values())}")
        print(f"⚡ EFFICIENCY SCORE: {len(unique_tasks) / api_calls:.2f} tasks per API call 🚀")
        
        # Context analysis results
        print(f"\n🔍 CONTEXT WINDOW ANALYSIS:")
        print(f"   Stories processed: {context_analysis['total_stories']}")
        print(f"   Estimated tokens used: {context_analysis['estimated_tokens']}")
        print(f"   Risk level: {context_analysis['risk_level']}")
        print(f"   Within safe limits: {'✅ YES' if context_analysis['within_limits'] else '⚠️  NO'}")
        
        if errors:
            print(f"\n⚠️  Errors Encountered: {len(errors)}")
            for error in errors[:3]:
                print(f"   • {error}")
            if len(errors) > 3:
                print(f"   ... and {len(errors) - 3} more errors")
        
        # DETAILED TASKS
        print(f"\n📋 ALL {len(unique_tasks)} MEGA-BATCH PROCESSED TASKS:")
        print("-" * 50)
        for i, task in enumerate(unique_tasks, 1):
            print(f"{i:2d}. {task}")
            
            # Show which story(ies) generated this task
            origins = task_origins.get(task, [])
            if origins:
                if len(origins) == 1:
                    print(f"    └─ From: {origins[0][:60]}...")
                else:
                    print(f"    └─ From {len(origins)} stories (MEGA-BATCH CONSOLIDATION):")
                    for origin in origins:
                        print(f"       • {origin[:55]}...")
            print()
        
        # DEPENDENCIES
        print(f"\n🔗 TASK DEPENDENCIES:")
        print("-" * 50)
        
        tasks_with_deps = {task: deps for task, deps in dependencies.items() if deps}
        
        if tasks_with_deps:
            print(f"Found {len(tasks_with_deps)} tasks with dependencies:\n")
            
            for i, (task, deps) in enumerate(tasks_with_deps.items(), 1):
                print(f"{i}. 📋 TASK: {task}")
                print(f"   ⬇️  DEPENDS ON:")
                for j, dep in enumerate(deps, 1):
                    print(f"      {j}. {dep}")
                print()
        else:
            print("ℹ️  No dependencies detected between tasks")
        
        # SKILLS MAPPING
        print(f"\n🎯 SKILLS REQUIRED FOR EACH TASK:")
        print("-" * 50)
        
        skills_shown = 0
        for i, (task, skills) in enumerate(skill_map.items(), 1):
            if skills and skills_shown < 10:  # Limit display to first 10 for brevity
                print(f"{i:2d}. 📋 {task}")
                print(f"     🛠️  Skills: {', '.join(skills)}")
                print()
                skills_shown += 1
        
        if len(skill_map) > skills_shown:
            print(f"   ... and skills for {len(skill_map) - skills_shown} more tasks")
        
        # REVOLUTIONARY VERDICT
        efficiency_score = len(unique_tasks) / api_calls
        print(f"\n🎯 REVOLUTIONARY VERDICT:")
        if efficiency_score > 2.0:
            print("   🏆 REVOLUTIONARY SUCCESS! Ultra-high efficiency achieved!")
        elif efficiency_score > 1.0:
            print("   📊 SOLID REVOLUTIONARY PERFORMANCE! Good efficiency, acceptable risk")
        else:
            print("   ⚠️  EFFICIENCY CONCERNS - Consider optimizing or using grouped approach")
        
        print(f"\n💥 RISK vs REWARD SUMMARY:")
        print(f"   ✅ REWARDS: Minimal API calls ({api_calls}), Maximum speed, Lowest cost")
        print(f"   ⚠️  RISKS: Single point of failure, Context limits, Harder debugging")



# ============================================================================
# FIXED JUPYTER-COMPATIBLE FUNCTION
# ============================================================================

async def run_fixed_batch_all_with_full_details():
    """Run the FIXED batch all processor with enhanced debugging"""
    
    print("⚡ PARTY C: FIXED BATCH ALL PROCESSING WITH FULL DETAILS")
    print("=" * 60)
    print("Philosophy: 'All or nothing - maximum efficiency'")
    print("Enhanced with robust parsing and debugging")
    print("")
    
    # Use sample stories
    user_stories = SAMPLE_USER_STORIES
    print(f"Processing {len(user_stories)} sample user stories in ONE MEGA-BATCH...")
    
    # Create processor and run with detailed display
    processor = FixedBatchAllProcessor()
    result = await processor.process_stories_with_details(user_stories)
    
    return result

result = await run_fixed_batch_all_with_full_details()

⚡ PARTY C: FIXED BATCH ALL PROCESSING WITH FULL DETAILS
Philosophy: 'All or nothing - maximum efficiency'
Enhanced with robust parsing and debugging

Processing 12 sample user stories in ONE MEGA-BATCH...
⚡ REVOLUTIONARIES: Processing ALL 12 stories in one mega-batch...

🔍 CONTEXT ANALYSIS:
   Stories: 12
   Estimated tokens: 315
   Risk level: LOW

🚀 Launching mega-batch decomposition...
🔍 DEBUG: Sending 12 stories to LLM...
🔍 DEBUG: Received response length: 3259 characters
🔍 DEBUG: First 200 chars: Here are the tasks for each user story:

**Tasks for Story 1:**
1. Make address text clickable
2. Implement click handler to format address for Google Maps URL
3. Open Google Maps in new tab/window
4....
🔍 DEBUG: Starting to parse response for 12 stories
🔍 DEBUG: Processing 85 lines...
🔍 DEBUG: Added task: Make address text clickable...
🔍 DEBUG: Added task: Implement click handler to format address for Google Maps UR...
🔍 DEBUG: Added task: Open Google Maps in new tab/window...
🔍 DEBUG: F

**Batch All Processing Performance Analysis**
- Processing Time
> **Performance:** ⭐⭐⭐⭐⭐ (Fastest)
- Characteristics :
  
> Single mega-batch processing

> Maximum parallelization within LLM

> Minimal API round-trips

> Processing time: `O(1)` for decomposition step
- Expected Range
> **5-10 seconds for 10 stories**
- Bottlenecks
> **Single decomposition call + analysis**
---
- 🔤 Token Usage
> **Efficiency:** ⭐⭐⭐⭐⭐ (Most Efficient)
- Formula
> **Context + Examples + All_Stories + Dependencies + Skills**
- Estimated Tokens
> **~200-300 per story + analysis overhead**
---
- API Costs
> **Cost Rating:** ⭐⭐⭐⭐⭐ (Cheapest)
- Cost Structure
> **Minimum API calls: 1 (decomposition) + 1 (dependencies) + Unique_Tasks (skills)**
- Typical Example
> **10 stories = 1 + 1 + 25 = 27 API calls**
- Cost Factors
  
> ~40-50% cost reduction vs. Individual

> Maximum cost optimization

> Revolutionary efficiency approach
---
- Dependency Detection Accuracy
> **Accuracy:** ⭐⭐⭐ (Variable)
- Challenges
  
> Context dilution with large batches

> Potential task detail reduction

> LLM attention distribution across many stories

> Risk of missing subtle dependencies
- Mitigation
> **Works best with <15 stories**
---
- Error Recovery Capability
> **Resilience:** ⭐⭐ (Risky)
- Vulnerabilities
  
> **Single Point of Failure:** One failed call affects entire batch

> **All-or-Nothing:** No partial processing capability

> **Difficult Debugging:** Hard to isolate specific story issues

> **Context Limits:** May hit token limits with large batches
- Failure Scenarios
> **Batch failure rate ~10-20% for large sets**
---
- 📊 Summary
- Best Performance Aspects
  
> **✅ MAXIMUM efficiency - fewest API calls possible**

> **✅ FASTEST processing - single decomposition step**

> **✅ LOWEST cost - minimal API usage**

> **✅ ELEGANT simplicity - clean approach**
- Trade-offs
  
> **⚠️ High-risk, high-reward approach**

> **⚠️ Single point of failure**

> **⚠️ Context window limitations**

> **⚠️ Harder to debug issues**

## 3. Treatment Architecture & Implementation

### 3.1 Architecture Decision: Multi-Agent vs Single Agent

#### 3.1.1 Single Agent Approach
> **One Agent handles all steps: decomposition, dependency analysis, and skill mapping**

- **Advantages:**
  - Simpler implementation
  - No coordination overhead
  - Single point of control

- **Disadvantages:**
  - Limited specialization
  - Potential context overload
  - Reduced modularity


#### 3.1.2 Multi-Agent System 
> **A collaborative system of specialized LLM-based agents, each optimized for a distinct responsibility**

- **Advantages:**
  - **Modularity:** Each agent specializes in one task
  - **Parallelism:** Agents can work simultaneously 
  - **Token Optimization:** Focused context per agent
  - **Scalability:** Easy to add/modify individual agents
    
---
#### 3.1.3 **Decision Rationale:**
> **We chose the multi-agent system for modularity, parallelism and token optimization**

---

### 3.2 Multi-Agent Workflow Architecture

### 3.2.1 Agent Composition

- **Task Decomposer Agent**
  
> **Input:** User stories
  
> **utput:** Raw tasks
  
> **Responsibility:** Break down user stories into actionable tasks

- **Task Consolidator Agent**

> **Input:** Raw tasks

> **Output:** Refined tasks
 
> **Responsibility:** Remove duplicates and merge similar tasks

- **Skill Mapper Agent**

> **Input:** Refined tasks
 
> **Output:** Required skills
 
> **Responsibility:** Identify technical skills needed for each task

- **Dependency Analyzer Agent**

> **Input:** Refined tasks
  
> **Output:** Dependencies

> **Responsibility:** Determine task dependencies and execution order

---

### 3.2.2 Workflow Process

- **Data Flow**
```
user stories → Task Decomposer Agent → raw tasks
                                          ↓
                                   Task Consolidator Agent → refined tasks
                                          ↓                        ↓
                                   Skill Mapper Agent        Dependency Analyzer Agent
                                          ↓                        ↓
                                   required skills            dependencies
                                          ↓                        ↓
                                              OUTPUT
```

1. **Input Processing:** User stories fed to Task Decomposer Agent
2. **Task Generation:** Raw tasks created from user stories
3. **Task Refinement:** Consolidator Agent removes duplicates
4. **Parallel Analysis:** 
   - Skill Mapper identifies required skills
   - Dependency Analyzer determines task relationships
5. **Output Synthesis:** All results combined into final output

---

### 3.3 Model Selection & Deployment Strategy

#### 3.3.1 Local Models
> **Run on-premises but resource heavy**

- **Advantages:**
  - Full data control and privacy
  - No API rate limits
  - One-time setup cost
  - Offline capability

- **Disadvantages:**
  - High computational requirements
  - Significant hardware investment
  - Model maintenance overhead
  - Limited to available local resources

#### 3.3.2 API-Based Models  
> **Served by third-party providers but the free plan is constrained**

- **Advantages:**
  - No infrastructure setup required
  - Access to state-of-the-art models
  - Automatic updates and maintenance
  - Scalable on-demand

- **Disadvantages:**
  - API rate limits on free tiers
  - Data privacy considerations
  - Ongoing API costs
  - Internet dependency

---


> **We chose Llama3-70B on Groq for both performance and prompt length advantages**

> Why Llama3-70B?

- **High Performance:** 70B parameters provide excellent reasoning
- **Long Context Window:** Supports complex multi-story processing
- **Open Source:** Flexible deployment options
- **Proven Track Record:** Well-tested for text generation tasks









### Alternative Options
- **Local deployment** for data-sensitive applications
- **Other API providers** for comparison and redundancy
- **Model fine-tuning** for domain-specific improvements

### configuration

In [None]:
from groq import Groq
client = Groq(api_key=os.getenv('GROQ_API_KEY'))

response = client.chat.completions.create(
    model="llama3-70b-8192",
    messages=[{"role": "user", "content": prompt}],
    temperature=0.3,
    max_tokens=4000
)

### Model Parameters
- **Model:** llama3-70b-8192
- **Temperature:** 0.3 (balanced creativity/consistency)
- **Max Tokens:** 4000 (supports large responses)
- **Context Window:** 8192 tokens (sufficient for multi-story processing)
- **Rate limit:** 30 reauest per minute 

##  Technical Configuration

### 3.3 Prompt Engineering Strategies

#### 3.3.1 Strategy 1: Zero-Shot Prompting

#### 3.3.2 Strategy 2: Few-Shot Prompting


#### 3.3.3 Strategy 3: Chain of Thought


#### 3.3.4 Strategy 4: fewshots-cot Prompting

#### 3.3.5 Strategy 5: Meta Prompting


#### 3.3.6 Strategy 6: Thought-of-tree Prompting

#### 3.3.7 Strategy 7: Self-Consistency

#### 3.3.8 Strategy 8: Reflexion Prompting

### 3.4 Prompt Strategy Comparison

### 3.5 Fine-tuning with LoRA

#### 3.5.1 Data Structure Challenges

## 4. Output Form & Results

### 4.1 Output Structure Design

#### 4.1.1 Task Output Format

#### 4.1.2 Dependency Visualization

#### 4.1.3 Skills Extraction Results

## 5. Evaluation & Performance Analysis

### 5.1 Evaluation Metrics

#### 5.1.1 Task Breakdown Accuracy
