# DevOps Analysis Workflow - Testing & Debugging Notebook

This notebook provides comprehensive testing and debugging capabilities for the DevOps Analysis workflow system.

## Overview
- **Tools Testing**: Test vector database query tools
- **Nodes Testing**: Test individual workflow nodes (agents)
- **State Management**: Test workflow state operations
- **Graph Execution**: Test complete workflow execution
- **Error Handling**: Debug common issues and errors

## Components Tested
- `WorkflowState` class and state management
- Analysis tools: `list_directories`, `get_metadata_by_id`, `get_content_by_id`, `search_vector_database`
- Workflow nodes: `vulnerability_agent`, `quality_agent`, `redundancy_agent`, `recommendation_agent`
- Processing nodes: `analysis_aggregator`, `report_generator`, `teams_notifier`
- Complete StateGraph workflow execution

## Setup and Imports

In [1]:
# Standard library imports
import sys
import os
import json
import traceback
from datetime import datetime
from typing import Dict, List, Any
from langgraph.prebuilt import create_react_agent

# Add project paths
project_root = os.path.abspath('..')
src_path = os.path.join(project_root, 'src')
workflow_path = os.path.join(project_root, 'workflow')

sys.path.insert(0, project_root)
sys.path.insert(0, src_path)
sys.path.insert(0, workflow_path)

print(f"Project root: {project_root}")
print(f"Python path updated with: {[project_root, src_path, workflow_path]}")

Project root: c:\Users\mohamed mowina\Desktop\Areeb\DevOps-Analysis
Python path updated with: ['c:\\Users\\mohamed mowina\\Desktop\\Areeb\\DevOps-Analysis', 'c:\\Users\\mohamed mowina\\Desktop\\Areeb\\DevOps-Analysis\\src', 'c:\\Users\\mohamed mowina\\Desktop\\Areeb\\DevOps-Analysis\\workflow']


In [None]:
# Workflow imports
try:
    from workflow.state import WorkflowState
    from workflow.tools import (
        list_directories, 
        get_metadata_by_id, 
        get_content_by_id, 
        search_vector_database,
        create_tool_registry,
        get_available_tools
    )
    from workflow.nodes import (
        vulnerability_agent,
        quality_agent,
        redundancy_agent,
        recommendation_agent,
        analysis_aggregator,
        report_generator,
        teams_notifier
    )
    from workflow.graph import (
        create_analysis_workflow,
        create_parallel_analysis_workflow,
        compile_workflow,
        run_analysis_workflow
    )
    print("✅ Successfully imported all workflow components")
except ImportError as e:
    print(f"❌ Import error: {e}")
    print("Make sure all workflow modules are properly configured")
    traceback.print_exc()

❌ Import error: cannot import name 'Finding' from 'workflow.state' (c:\Users\mohamed mowina\Desktop\Areeb\DevOps-Analysis\workflow\state.py)
Make sure all workflow modules are properly configured


Traceback (most recent call last):
  File "C:\Users\mohamed mowina\AppData\Local\Temp\ipykernel_37388\3904928290.py", line 3, in <module>
    from workflow.state import WorkflowState, Finding, Recommendation
ImportError: cannot import name 'Finding' from 'workflow.state' (c:\Users\mohamed mowina\Desktop\Areeb\DevOps-Analysis\workflow\state.py)


In [8]:
# Vector store imports (optional - may not be available in test environment)
try:
    from utils.vector_utils import StructuredVectorStore
    print("✅ Vector store utilities imported")
    vector_store_available = True
except ImportError as e:
    print(f"⚠️  Vector store not available: {e}")
    print("Some tests will use mock data instead")
    vector_store_available = False

✅ Vector store utilities imported


## Helper Functions for Testing

In [20]:
def create_sample_findings() -> List[Finding]:
    """Create sample findings for testing."""
    return [
        Finding(
            id="vuln_001",
            type="vulnerability",
            severity="critical",
            file_path="auth.py",
            line_number=15,
            description="SQL injection vulnerability in user authentication",
            recommendation="Use parameterized queries to prevent SQL injection",
            confidence=0.95
        ),
        Finding(
            id="qual_001",
            type="quality_issue",
            severity="high",
            file_path="data_processor.py",
            line_number=25,
            description="Method too long with high cyclomatic complexity",
            recommendation="Break down method into smaller, focused functions",
            confidence=0.85
        ),
        Finding(
            id="dup_001",
            type="duplicate",
            severity="medium",
            file_path="utils.py",
            line_number=10,
            description="Duplicate validation logic found in multiple functions",
            recommendation="Extract common validation logic into shared utility",
            confidence=0.78
        )
    ]

def create_sample_recommendations() -> List[Recommendation]:
    """Create sample recommendations for testing."""
    return [
        Recommendation(
            id="rec_001",
            finding_id="vuln_001",
            title="Implement Parameterized Queries",
            description="Replace string formatting with parameterized queries",
            code_example="cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, password))",
            priority="critical",
            effort_estimate="low"
        ),
        Recommendation(
            id="rec_002",
            finding_id="qual_001",
            title="Refactor Long Method",
            description="Break down the long method into smaller, focused functions",
            priority="high",
            effort_estimate="medium"
        )
    ]

def create_test_state(collection_name: str = "test_repo") -> WorkflowState:
    """Create a test workflow state with sample data."""
    state = WorkflowState(collection_name=collection_name)
    
    # Add sample findings
    for finding in create_sample_findings():
        state.add_finding(finding)
    
    # Add sample recommendations
    for rec in create_sample_recommendations():
        state.add_recommendation(rec)
    
    return state

print("Helper functions defined successfully")

Helper functions defined successfully


## 1. Test Workflow State Management

In [19]:
print("=== Testing Workflow State Management ===")

# Test basic state creation
try:
    state = WorkflowState(collection_name="test_collection")
    print(f"✅ Created basic state: {state.collection_name}")
    print(f"   Start time: {state.start_time}")
    print(f"   Current step: {state.current_step}")
    print(f"   Completed nodes: {state.completed_nodes}")
except Exception as e:
    print(f"❌ Failed to create basic state: {e}")
    traceback.print_exc()

print("\n" + "-" * 50)

=== Testing Workflow State Management ===
✅ Created basic state: test_collection
   Start time: 2025-09-30 15:08:35.272455
   Current step: start
   Completed nodes: []

--------------------------------------------------


In [21]:
# Test state with sample data
try:
    test_state = create_test_state("comprehensive_test")
    print(f"✅ Created test state with sample data")
    
    # Test findings management
    all_findings = test_state.get_all_findings()
    print(f"   Total findings: {len(all_findings)}")
    
    # Test summary statistics
    stats = test_state.get_summary_stats()
    print(f"   Summary stats: {stats}")
    
    # Test node completion tracking
    test_state.mark_node_complete("vulnerability_agent")
    test_state.mark_node_complete("quality_agent")
    print(f"   Completed nodes: {test_state.completed_nodes}")
    print(f"   Is vulnerability_agent complete: {test_state.is_node_complete('vulnerability_agent')}")
    
    # Test findings by severity
    critical_findings = test_state.get_findings_by_severity("critical")
    print(f"   Critical findings: {len(critical_findings)}")
    
except Exception as e:
    print(f"❌ Failed state management tests: {e}")
    traceback.print_exc()

print("\n" + "-" * 50)

✅ Created test state with sample data
   Total findings: 3
   Summary stats: {'total_findings': 3, 'critical': 1, 'high': 1, 'medium': 1, 'low': 0, 'vulnerabilities': 1, 'quality_issues': 1, 'redundancy_issues': 1, 'recommendations': 2}
   Completed nodes: ['vulnerability_agent', 'quality_agent']
   Is vulnerability_agent complete: True
   Critical findings: 1

--------------------------------------------------


## 2. Test Analysis Tools

### 2.1 - List Directory Tool

In [22]:
print("=== Testing Analysis Tools ===")

# Test tool registry
try:
    available_tools = get_available_tools()
    print(f"✅ Available tools: {available_tools}")
    
    tool_registry = create_tool_registry()
    print(f"✅ Tool registry created with {len(tool_registry)} tools")
    print(f"   Tools: {list(tool_registry.keys())}")
    
except Exception as e:
    print(f"❌ Failed to create tool registry: {e}")
    traceback.print_exc()

print("\n" + "-" * 50)

=== Testing Analysis Tools ===
✅ Available tools: ['list_directories', 'get_metadata_by_id', 'get_content_by_id', 'search_vector_database']
✅ Tool registry created with 4 tools
   Tools: ['list_directories', 'get_metadata_by_id', 'get_content_by_id', 'search_vector_database']

--------------------------------------------------


In [23]:
# Test individual tools (with mock data if vector store not available)
test_collection = "Todo"

print(f"Testing tools with collection: {test_collection}")

# Test list_directories
try:
    directories = list_directories(test_collection)
    print(f"✅ list_directories returned: {len(directories) if isinstance(directories, list) else 'error'}")
    if isinstance(directories, list) and directories:
        print(f"   Sample directories: {directories[:3]}")
    else:
        print(f"   Result: {directories}")
except Exception as e:
    print(f"❌ list_directories failed: {e}")

print()

Testing tools with collection: Todo




✅ list_directories returned: 10
   Sample directories: ['README.md', 'Todo/AppDelegate.swift', 'Todo/Common.swift']



In [24]:
list_directories.invoke("Internship-ai")



['README.md#chunk_0',
 'README.md#chunk_1',
 'app.py#chunk_0',
 'app.py#chunk_1',
 'main.py',
 'notebooks/notebooks.md',
 'requirements.txt',
 'studio/graph.py',
 'studio/langgraph.json',
 'utils/__init__.py',
 'utils/config.py',
 'workflow/__init__.py',
 'workflow/graph.py',
 'workflow/nodes.py#chunk_0',
 'workflow/nodes.py#chunk_1',
 'workflow/nodes.py#chunk_2',
 'workflow/state.py',
 'workflow/tools.py']

In [48]:
list_directories.invoke("Todo")



['README.md',
 'Todo/AppDelegate.swift',
 'Todo/Common.swift',
 'Todo/DetailViewController.swift',
 'Todo/MyPlayground.playground/section-1.swift',
 'Todo/TodoModel.swift',
 'Todo/ViewController.swift#chunk_0',
 'Todo/ViewController.swift#chunk_1',
 'Todo/ViewController.swift#chunk_2',
 'TodoTests/TodoTests.swift']

### 2.2 Search VectorStore Tool

In [25]:
def search_vector_database(collection_name: str, query: str, limit: int = 10) -> List[Dict[str, Any]]:
    """
    Search the vector database and return top matches with their IDs and metadata.
    
    Args:
        collection_name: Name of the repository collection
        query: Search query
        limit: Maximum number of results to return
        
    Returns:
        List of matching chunks with IDs and metadata (no content)
    """
    try:
        vector_store = StructuredVectorStore()
        results = vector_store.search_structured_repo(collection_name, query, limit)
        
        # Return results with metadata but without content for efficiency
        search_results = []
        for result in results:
            search_result = {
                'id': result.get('id'),
                'repo_id': result.get('repo_id'),
                'chunk_id': result.get('chunk_id'),
                'symbols': result.get('symbols', {}),
                'imports': result.get('imports', []),
                'metadata': result.get('metadata', {}),
                'similarity_score': result.get('similarity_score', 0),
                'tool_used': 'search_vector_database',
                'search_query': query
            }
            search_results.append(search_result)
        
        return search_results
        
    except Exception as e:
        return [{'error': f'Search failed: {str(e)}', 'tool_used': 'search_vector_database'}]

In [18]:
search_vector_database("Internship-ai", "Workflow", 3)



[{'id': 'workflow/__init__.py',
  'repo_id': 'Internship-ai',
  'chunk_id': None,
  'symbols': {},
  'imports': ['from .graph import app, create_workflow, start_workflow, continue_workflow'],
  'metadata': {'language': 'python',
   'size': 197,
   'timestamp': '2025-09-24T12:51:47.210971',
   'organization': 'areebgroup',
   'project': 'Internship-Playground'},
  'similarity_score': 0.41509083644367994,
  'tool_used': 'search_vector_database',
  'search_query': 'Workflow'},
 {'id': 'workflow/graph.py',
  'repo_id': 'Internship-ai',
  'chunk_id': None,
  'symbols': {'functions': ['route_feedback',
    'create_workflow',
    'start_workflow',
    'continue_workflow']},
  'imports': ['from langgraph.graph import StateGraph, END',
   'from langgraph.checkpoint.memory import MemorySaver',
   'from workflow.state import WorkflowState',
   'from workflow.nodes import (\r\n    create_post_text,\r\n    generate_image,\r\n    classify_feedback,\r\n    refine_text,\r\n    refine_image,\r\n    sen

In [21]:
from workflow.tools import search_vector_database

# Test search_vector_database
try:
    search_results = search_vector_database.invoke(
        {"collection_name":"Internship-ai", "query":"workflow", "limit":3})
    print(f"✅ search_vector_database returned: {len(search_results) if isinstance(search_results, list) else 'error'}")
    if isinstance(search_results, list) and search_results:
        print(f"   Sample result keys: {list(search_results[0].keys()) if search_results[0] else 'empty'}")
    else:
        print(f"   Result: {search_results}")
except Exception as e:
    print(f"❌ search_vector_database failed: {e}")

print()



✅ search_vector_database returned: 3
   Sample result keys: ['id', 'repo_id', 'chunk_id', 'symbols', 'imports', 'metadata', 'similarity_score', 'tool_used', 'search_query']



In [51]:
search_results

[{'id': 'workflow/__init__.py',
  'repo_id': 'Internship-ai',
  'chunk_id': None,
  'symbols': {},
  'imports': ['from .graph import app, create_workflow, start_workflow, continue_workflow'],
  'metadata': {'language': 'python',
   'size': 197,
   'timestamp': '2025-09-24T12:51:47.210971',
   'organization': 'areebgroup',
   'project': 'Internship-Playground'},
  'similarity_score': 0.387905476675254,
  'tool_used': 'search_vector_database',
  'search_query': 'workflow'},
 {'id': 'workflow/graph.py',
  'repo_id': 'Internship-ai',
  'chunk_id': None,
  'symbols': {'functions': ['route_feedback',
    'create_workflow',
    'start_workflow',
    'continue_workflow']},
  'imports': ['from langgraph.graph import StateGraph, END',
   'from langgraph.checkpoint.memory import MemorySaver',
   'from workflow.state import WorkflowState',
   'from workflow.nodes import (\r\n    create_post_text,\r\n    generate_image,\r\n    classify_feedback,\r\n    refine_text,\r\n    refine_image,\r\n    send_

In [26]:
# Test get_metadata_by_id (with a sample chunk ID)
sample_chunk_id = "src/auth.py"
try:
    metadata = get_metadata_by_id.invoke({
        "collection_name": "Internship-ai", 
        "chunk_id": "workflow/graph.py"
    })
    print(f"✅ get_metadata_by_id returned: {type(metadata)}")
    if isinstance(metadata, dict):
        print(f"   Metadata keys: {list(metadata.keys())}")
        if 'error' in metadata:
            print(f"   Error: {metadata['error']}")
    else:
        print(f"   Result: {metadata}")
except Exception as e:
    print(f"❌ get_metadata_by_id failed: {e}")

print()



✅ get_metadata_by_id returned: <class 'dict'>
   Metadata keys: ['id', 'repo_id', 'chunk_id', 'symbols', 'imports', 'metadata', 'similarity_score', 'tool_used']



In [23]:
metadata

{'id': 'workflow/graph.py',
 'repo_id': 'Internship-ai',
 'chunk_id': None,
 'symbols': {'functions': ['route_feedback',
   'create_workflow',
   'start_workflow',
   'continue_workflow']},
 'imports': ['from langgraph.graph import StateGraph, END',
  'from langgraph.checkpoint.memory import MemorySaver',
  'from workflow.state import WorkflowState',
  'from workflow.nodes import (\r\n    create_post_text,\r\n    generate_image,\r\n    classify_feedback,\r\n    refine_text,\r\n    refine_image,\r\n    send_email,\r\n)'],
 'metadata': {'language': 'python',
  'size': 2736,
  'timestamp': '2025-09-24T12:51:47.210971',
  'organization': 'areebgroup',
  'project': 'Internship-Playground'},
 'similarity_score': 0.51231455509316,
 'tool_used': 'get_metadata_by_id'}

In [24]:
# Test get_content_by_id
try:
    content = content = get_content_by_id.invoke({
        "collection_name": "Internship-ai", 
        "chunk_id": "workflow/graph.py"
    })
    print(f"✅ get_content_by_id returned: {type(content)}")
    if isinstance(content, dict):
        print(f"   Content keys: {list(content.keys())}")
        if 'content' in content and content['content']:
            content_preview = content['content'][:100] + "..." if len(content['content']) > 100 else content['content']
            print(f"   Content preview: {content_preview}")
        if 'error' in content:
            print(f"   Error: {content['error']}")
    else:
        print(f"   Result: {content}")
except Exception as e:
    print(f"❌ get_content_by_id failed: {e}")

print("\n" + "-" * 50)



✅ get_content_by_id returned: <class 'dict'>
   Content keys: ['id', 'content', 'tool_used']
   Content preview: from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

...

--------------------------------------------------


In [50]:
metadata

{'id': 'Todo/AppDelegate.swift',
 'repo_id': 'Todo',
 'chunk_id': None,
 'symbols': {'functions': ['application',
   'applicationWillResignActive',
   'applicationDidEnterBackground',
   'applicationWillEnterForeground',
   'applicationDidBecomeActive',
   'applicationWillTerminate'],
  'classes': ['AppDelegate']},
 'imports': ['import UIKit'],
 'metadata': {'language': 'swift',
  'size': 2137,
  'timestamp': '2025-09-25T12:53:46.666360',
  'source_url': 'https://github.com/JakeLin/Todo'},
 'similarity_score': 0.5836301197785221,
 'tool_used': 'get_metadata_by_id'}

In [35]:
# import display and markdown
from IPython.display import display, Markdown, Code, Image

display(Markdown("Here's the retrived content:"))
display(Markdown(f"```python\n{content['content']}\n```"))

Here's the retrived content:

```python
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

from workflow.state import WorkflowState
from workflow.nodes import (
    create_post_text,
    generate_image,
    classify_feedback,
    refine_text,
    refine_image,
    send_email,
)


def route_feedback(state: WorkflowState) -> str:
    """Route based on feedback classification."""
    classification = state.classification.lower()
    if "approved" in classification:
        return "approved"
    if "refine text" in classification or "refine_text" in classification:
        return "refine_text"
    if "refine image" in classification or "refine_image" in classification:
        return "refine_image"
    if "terminate" in classification:
        return "terminate"
    return "terminate"


def create_workflow():
    """Create the LangGraph workflow with interrupts configured."""
    workflow = StateGraph(WorkflowState)
    workflow.add_node("create_post", create_post_text)
    workflow.add_node("generate_image", generate_image)
    workflow.add_node("classify_feedback", classify_feedback)
    workflow.add_node("refine_text", refine_text)
    workflow.add_node("refine_image", refine_image)
    workflow.add_node("send_email", send_email)
    workflow.set_entry_point("create_post")
    workflow.add_edge("create_post", "generate_image")
    workflow.add_edge("generate_image", "classify_feedback")
    workflow.add_conditional_edges(
        "classify_feedback",
        route_feedback,
        {
            "approved": "send_email",
            "refine_text": "refine_text",
            "refine_image": "refine_image",
            "terminate": END,
        },
    )
    workflow.add_edge("refine_text", "classify_feedback")
    workflow.add_edge("refine_image", "classify_feedback")
    workflow.add_edge("send_email", END)
    memory = MemorySaver()
    app = workflow.compile(
        checkpointer=memory,
        interrupt_after=["generate_image", "refine_text", "refine_image"],
    )
    return app


# Convenience API mirroring notebook helpers
app = create_workflow()


def start_workflow(user_input: str, thread_id: str = "default") -> WorkflowState:
    initial_state = WorkflowState(user_input=user_input)
    config = {"configurable": {"thread_id": thread_id}}
    result = app.invoke(initial_state, config)
    return WorkflowState(**result)


def continue_workflow(user_feedback: str, thread_id: str = "default") -> WorkflowState:
    config = {"configurable": {"thread_id": thread_id}}
    app.update_state(config, {"user_feedback": user_feedback})
    result = app.invoke(None, config)
    return WorkflowState(**result)



```

## 3. Test Individual Workflow Nodes

In [31]:
print("=== Testing Individual Workflow Nodes ===")

# Create a test state for node testing
node_test_state = create_test_state("node_testing")
print(f"Created test state: {node_test_state.collection_name}")
print(f"Initial step: {node_test_state.current_step}")
print(f"Initial completed nodes: {node_test_state.completed_nodes}")

print("\n" + "-" * 30)

=== Testing Individual Workflow Nodes ===
Created test state: node_testing
Initial step: start
Initial completed nodes: []

------------------------------


In [32]:
# Test vulnerability_agent node
print("Testing vulnerability_agent node...")
try:
    # Note: This will likely fail without proper LLM setup, but we can test the structure
    vuln_state = WorkflowState(collection_name="Todo")
    
    # Mock the agent execution (comment out the actual call if LLM not available)
    print("⚠️  Skipping actual LLM call - testing structure only")
    
    # Test state structure changes
    vuln_state.mark_node_complete("vulnerability_agent")
    vuln_state.current_step = "vulnerability_complete"
    
    print(f"✅ Vulnerability agent structure test passed")
    print(f"   Completed nodes: {vuln_state.completed_nodes}")
    print(f"   Current step: {vuln_state.current_step}")
    
    # Uncomment to test actual agent (requires LLM setup)
    # result_state = vulnerability_agent(vuln_state)
    # print(f"   Result state step: {result_state.current_step}")
    
except Exception as e:
    print(f"❌ Vulnerability agent test failed: {e}")
    print("This is expected if LLM is not configured")

print()

Testing vulnerability_agent node...
⚠️  Skipping actual LLM call - testing structure only
✅ Vulnerability agent structure test passed
   Completed nodes: ['vulnerability_agent']
   Current step: vulnerability_complete



In [33]:
# Test processing nodes (these don't require LLM)
print("Testing processing nodes...")

# Test analysis_aggregator
try:
    agg_state = create_test_state("aggregator_test")
    result_state = analysis_aggregator(agg_state)
    
    print(f"✅ analysis_aggregator test passed")
    print(f"   Current step: {result_state.current_step}")
    print(f"   Completed nodes: {result_state.completed_nodes}")
    print(f"   Aggregated results available: {'aggregated_results' in result_state.analysis_metadata}")
    
except Exception as e:
    print(f"❌ analysis_aggregator test failed: {e}")
    traceback.print_exc()

print()

Testing processing nodes...
📋 [Analysis Aggregator] Combining analysis results...
✅ [Analysis Aggregator] Combined 3 findings
✅ analysis_aggregator test passed
   Current step: aggregation_complete
   Completed nodes: ['analysis_aggregator']
   Aggregated results available: True



In [34]:
# Test report_generator
try:
    report_state = create_test_state("report_test")
    result_state = report_generator(report_state)
    
    print(f"✅ report_generator test passed")
    print(f"   Current step: {result_state.current_step}")
    print(f"   Final report available: {'final_report' in result_state.analysis_metadata}")
    
    if 'final_report' in result_state.analysis_metadata:
        report = result_state.analysis_metadata['final_report']
        print(f"   Report keys: {list(report.keys())}")
        print(f"   Findings by category: {report.get('findings_by_category', {})}")
    
except Exception as e:
    print(f"❌ report_generator test failed: {e}")
    traceback.print_exc()

print()

📄 [Report Generator] Generating final report...
✅ [Report Generator] Report generated successfully
✅ report_generator test passed
   Current step: report_complete
   Final report available: True
   Report keys: ['collection_name', 'analysis_summary', 'total_files_analyzed', 'execution_time', 'completed_nodes', 'findings_by_category', 'recommendations_count']
   Findings by category: {'vulnerabilities': 1, 'quality_issues': 1, 'redundancy_issues': 1}



In [None]:
# Test teams_notifier
try:
    teams_state = create_test_state("teams_test")
    result_state = teams_notifier(teams_state)
    
    print(f"✅ teams_notifier test passed")
    print(f"   Current step: {result_state.current_step}")
    print(f"   Teams notification data available: {'teams_notification' in result_state.analysis_metadata}")
    
    if 'teams_notification' in result_state.analysis_metadata:
        notification = result_state.analysis_metadata['teams_notification']
        print(f"   Notification keys: {list(notification.keys())}")
        print(f"   Collection: {notification.get('collection_name')}")
        print(f"   Total findings: {notification.get('total_findings')}")
    
except Exception as e:
    print(f"❌ teams_notifier test failed: {e}")
    traceback.print_exc()

print("\n" + "-" * 50)

## Testing an Agent

In [None]:
mock_state = WorkflowState(collection_name="Todo")

In [44]:
mock_state

WorkflowState(collection_name='Todo', code_chunks=[], vulnerability_findings=[], quality_findings=[], redundancy_findings=[], recommendations=[], current_step='start', completed_nodes=[], analysis_metadata={}, start_time=datetime.datetime(2025, 9, 29, 12, 39, 18, 28377), end_time=None, total_files_analyzed=0)

In [None]:
from langgraph.prebuilt import create_react_agent


def vulnerability_agent(collection):
    """
    AI agent node for vulnerability detection.
    Uses create_react_agent with vulnerability analysis tools.
    """
    print("🔍 [Vulnerability Agent] Starting vulnerability analysis...")
    
    try:
        # Get available tools
        tools = list(create_tool_registry().values())
        
        # Create the vulnerability analysis agent
        agent = create_react_agent(
            model="openai:gpt-4o-mini",
            tools=tools,
            prompt="""You are a security expert specializing in vulnerability detection.
            
Your task is to analyze code for security vulnerabilities including:
- SQL injection vulnerabilities
- Cross-site scripting (XSS) issues
- Authentication and authorization flaws
- Input validation problems
- Cryptographic weaknesses
- Insecure dependencies

Use the available tools to search and analyze code chunks. Focus on identifying
specific security issues with file locations and provide actionable recommendations.""",
             debug=True
        )
        
        # Run vulnerability analysis
        response = agent.invoke({
            "messages": [
                {"role": "user", "content": f"Analyze the codebase '{collection}' for security vulnerabilities. Use the available tools to search for common vulnerability patterns."}
            ]
        })
        
        return response
        
    except Exception as e:
        print(f"❌ [Vulnerability Agent] Error: {e}")
    
    return state

vulnerability_agent("Todo")

In [46]:
last_message = """ Here\'s an analysis of the `Todo` codebase with respect to potential security vulnerabilities:\n\n### 1. **General Issues Identified**\n- **Lack of Input Validation**: There are instances where user inputs appear to be directly used without any validation (e.g., `todoItem.text!` and `searchString!`).\n- **Potential for XSS**: In the `DetailViewController`, where `todoItem.text` is used as a title without being sanitized or escaped.\n- **Data Handling**: There isn’t explicit handling or sanitization of data such as title and image that could lead to issues during data capture or display.\n\n### 2. **Vulnerable Files & Recommendations**\n\n#### a. **`Common.swift`**\n```swift\nfunc dateFromString (dateStr: String) -> NSDate? {\n    let dateFormatter = NSDateFormatter()\n    dateFormatter.dateFormat = "yyyy-MM-dd"\n    let date = dateFormatter.dateFromString(dateStr)\n    return date\n}\n```\n**Issues:**\n- **Input Validation**: There’s no validation on the format of `dateStr`. Invalid inputs can crash the app (force unwrapping can lead to runtime exceptions).\n  \n**Recommendation**: Add validation to ensure that `dateStr` is in the correct format before processing.\n\n#### b. **`DetailViewController.swift`**\n```swift\nif todo == nil {\n    todo = TodoModel(id: uuid, image: image, title: todoItem.text!, date: todoDate.date)\n    todos.append(todo!)\n}\n```\n**Issues:**\n- **Force Unwrapping**: Using `todoItem.text!` can lead to crashes if `textField` is empty.\n- **XSS Possibility**: The title for the todo item is set directly from user input without sanitization.\n\n**Recommendation**:\n- Validate that `todoItem.text` is not nil or empty before proceeding, and consider sanitizing the input.\n  \n#### c. **`ViewController.swift`**\n```swift\nfilteredTodos = todos.filter(){$0.title.rangeOfString(searchString!) != nil}\n```\n**Issues:**\n- **Force Unwrapping**: Using `searchString!` could lead to crashes.\n  \n**Recommendation**: Use optional binding (if let) to safely unwrap `searchString` to prevent crashes.\n\n#### d. **`TodoTests.swift`**\n- **Lack of Testing on Security Aspects**: The tests provided do not verify any security-specific scenarios or edge cases relevant to input validation.\n\n**Recommendation**: Implement unit tests specifically designed to validate user inputs and edge cases to ensure safety against common attack vectors.\n\n### 3. **Crucial Areas to Improve**\n- Implement consistent input validation throughout the application.\n- Use safe unwrapping techniques to prevent runtime exceptions from nil values.\n- Consider sanitizing user input for display (to guard against XSS attacks).\n- Expand test cases to cover security aspects and input validity scenarios.\n\n### Conclusion\nThe `Todo` codebase has several areas that can be improved to enhance security and robustness. Addressing the highlighted issues and implementing the recommendations will significantly mitigate security risks and improve the overall quality of the application. """

In [47]:
display(Markdown(last_message))

 Here's an analysis of the `Todo` codebase with respect to potential security vulnerabilities:

### 1. **General Issues Identified**
- **Lack of Input Validation**: There are instances where user inputs appear to be directly used without any validation (e.g., `todoItem.text!` and `searchString!`).
- **Potential for XSS**: In the `DetailViewController`, where `todoItem.text` is used as a title without being sanitized or escaped.
- **Data Handling**: There isn’t explicit handling or sanitization of data such as title and image that could lead to issues during data capture or display.

### 2. **Vulnerable Files & Recommendations**

#### a. **`Common.swift`**
```swift
func dateFromString (dateStr: String) -> NSDate? {
    let dateFormatter = NSDateFormatter()
    dateFormatter.dateFormat = "yyyy-MM-dd"
    let date = dateFormatter.dateFromString(dateStr)
    return date
}
```
**Issues:**
- **Input Validation**: There’s no validation on the format of `dateStr`. Invalid inputs can crash the app (force unwrapping can lead to runtime exceptions).
  
**Recommendation**: Add validation to ensure that `dateStr` is in the correct format before processing.

#### b. **`DetailViewController.swift`**
```swift
if todo == nil {
    todo = TodoModel(id: uuid, image: image, title: todoItem.text!, date: todoDate.date)
    todos.append(todo!)
}
```
**Issues:**
- **Force Unwrapping**: Using `todoItem.text!` can lead to crashes if `textField` is empty.
- **XSS Possibility**: The title for the todo item is set directly from user input without sanitization.

**Recommendation**:
- Validate that `todoItem.text` is not nil or empty before proceeding, and consider sanitizing the input.
  
#### c. **`ViewController.swift`**
```swift
filteredTodos = todos.filter(){$0.title.rangeOfString(searchString!) != nil}
```
**Issues:**
- **Force Unwrapping**: Using `searchString!` could lead to crashes.
  
**Recommendation**: Use optional binding (if let) to safely unwrap `searchString` to prevent crashes.

#### d. **`TodoTests.swift`**
- **Lack of Testing on Security Aspects**: The tests provided do not verify any security-specific scenarios or edge cases relevant to input validation.

**Recommendation**: Implement unit tests specifically designed to validate user inputs and edge cases to ensure safety against common attack vectors.

### 3. **Crucial Areas to Improve**
- Implement consistent input validation throughout the application.
- Use safe unwrapping techniques to prevent runtime exceptions from nil values.
- Consider sanitizing user input for display (to guard against XSS attacks).
- Expand test cases to cover security aspects and input validity scenarios.

### Conclusion
The `Todo` codebase has several areas that can be improved to enhance security and robustness. Addressing the highlighted issues and implementing the recommendations will significantly mitigate security risks and improve the overall quality of the application. 

## 4. Test Workflow Graph Structure

In [39]:
print("=== Testing Workflow Graph Structure ===")

# Test workflow creation
try:
    workflow = create_analysis_workflow()
    print(f"✅ Basic workflow created successfully")
    print(f"   Workflow type: {type(workflow)}")
    
    # Test workflow compilation
    compiled_workflow = compile_workflow(workflow, use_memory=False)
    print(f"✅ Workflow compiled successfully")
    print(f"   Compiled workflow type: {type(compiled_workflow)}")
    
except Exception as e:
    print(f"❌ Workflow creation/compilation failed: {e}")
    traceback.print_exc()

print()

=== Testing Workflow Graph Structure ===
✅ Basic workflow created successfully
   Workflow type: <class 'langgraph.graph.state.StateGraph'>
✅ Workflow compiled successfully
   Compiled workflow type: <class 'langgraph.graph.state.CompiledStateGraph'>



In [None]:
# Test parallel workflow creation
try:
    parallel_workflow = create_parallel_analysis_workflow()
    print(f"✅ Parallel workflow created successfully")
    
    compiled_parallel = compile_workflow(parallel_workflow, use_memory=False)
    print(f"✅ Parallel workflow compiled successfully")
    
except Exception as e:
    print(f"❌ Parallel workflow creation failed: {e}")
    traceback.print_exc()

print()

In [None]:
# Test workflow structure visualization
try:
    from workflow.graph import visualize_workflow_structure
    
    structure = visualize_workflow_structure()
    print(f"✅ Workflow structure retrieved")
    print(f"   Nodes: {structure['nodes']}")
    print(f"   Edges: {len(structure['edges'])} edges")
    print(f"   Entry point: {structure['entry_point']}")
    print(f"   End points: {structure['end_points']}")
    
except Exception as e:
    print(f"❌ Workflow structure visualization failed: {e}")

print("\n" + "-" * 50)

# Qwen init

In [118]:
from langchain_openai import ChatOpenAI

qwen_model = ChatOpenAI(
    model="Areeb-Coder-FP8",
    openai_api_base="http://69.19.136.55:8000/v1",
    openai_api_key="areeb-coder-demo-@45202526mlxlaq"
)

In [119]:
qwen_model.invoke("Tell me a joke.")

AIMessage(content="Why don't scientists trust atoms?\n\nBecause they make up everything!", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 13, 'total_tokens': 27, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'Areeb-Coder-FP8', 'system_fingerprint': None, 'id': 'chatcmpl-84c2292e0ffa4153bbe18eb92e6a12b3', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--73a6a957-133d-4c71-b694-2dcb957142c7-0', usage_metadata={'input_tokens': 13, 'output_tokens': 14, 'total_tokens': 27, 'input_token_details': {}, 'output_token_details': {}})

# Full Analysis Agent

In [103]:
sys_prompt = """ 
    You are a Pull Request Analysis Agent.  
    Your job is to review code changes in a pull request (PR) for correctness, style, maintainability, and security risks.  
    Focus your analysis on the files and diffs provided in the PR, but use the available tools to explore the repository if you need more context.

    ### Tools available:
    - `list_directories`: list all file paths in the repository.  
    - `get_metadata_by_id`: inspect metadata for a file without reading its full content.  
    - `get_content_by_id`: fetch the actual code of a file for deeper analysis.  
    - `search_vector_database`: search semantically for related files or concepts in the repo.  

    ### How you should work:
    1. Start with the PR’s title, description, and the diff of changed files.  
    2. For each file:
    - Check whether the change introduces bugs, vulnerabilities, or regressions.  
    - Evaluate code quality (readability, maintainability, adherence to best practices).  
    - Verify that tests are updated or added if functionality changes.  
    3. If needed, use the tools to get additional context (e.g., fetch the full file or search related files).  
    4. Provide your analysis in a structured format.

    ### Output format:
    - **Summary:** High-level impression of the PR (e.g., “Good improvement, but missing test coverage”).  
    - **Detailed Findings:** For each file/diff:
    - File path
    - Observed issues (bugs, vulnerabilities, style issues, missing docs/tests)
    - Suggested fixes or improvements
    - **Security Concerns:** Highlight any vulnerabilities (SQLi, XSS, insecure crypto, etc.).  
    - **Recommendation:** Final verdict (approve, approve with changes, request major changes).  

    You must base your analysis on actual PR content. If more context is needed, fetch it with the tools before making conclusions. Do not invent issues that are not supported by the code.
 """

In [113]:
def Analysis_agent(pr):
    """
    AI agent node for vulnerability detection.
    Uses create_react_agent with vulnerability analysis tools.
    """
    print("🔍 [Full Analysis Agent] Starting vulnerability analysis...")
    
    try:
        # Get available tools
        tools = list(create_tool_registry().values())
        
        # Create the vulnerability analysis agent
        agent = create_react_agent(
            model=qwen_model,
            tools=tools,
            prompt=sys_prompt,
            debug=True,

        )
        
        # Run vulnerability analysis
        response = agent.invoke({
            "messages": [
                {"role": "user", "content": f"Analyze this pr:\n {pr}"}
            ]
        },
        {"recursion_limit": 50},)
        
        return response
        
    except Exception as e:
        print(f"❌ [Full Analysis Agent] Error: {e}")

In [106]:
from pydantic import BaseModel
from typing import List, Optional

class PullRequestInfo(BaseModel):
    id: int
    title: str
    description: str
    status: str
    author: str
    sourceBranch: str
    targetBranch: str
    repository: str

class CommitInfo(BaseModel):
    id: str
    message: str
    author: str

class FileChange(BaseModel):
    filePath: str
    changeType: str
    content: str  # raw diff

class PRPayload(BaseModel):
    pullRequest: PullRequestInfo
    commits: List[CommitInfo]
    filesChanged: List[FileChange]
    changelog: Optional[str] = None


In [5]:
test_payload = {
  "pullRequest": {
    "id": 7,
    "title": "Merge pull request 7 from feature/json-file-output into master",
    "description": "PR submitted from Azure DevOps pipeline",
    "status": "active",
    "author": "Sherif Asker",
    "sourceBranch": "refs/heads/feature/json-file-output",
    "targetBranch": "refs/heads/master",
    "repository": "pr-api"
  },
  "commits": [
    {
      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",
      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",
      "author": "Sherif Asker"
    },
    {
      "id": "9065d11397027dd42150e522dd1ef4c53ed65974",
      "message": "Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.",
      "author": "Sherif Asker"
    },
    {
      "id": "59ea21e7db8268f661b1b6d98656c2e5f519c010",
      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050 and reflect this change in CHANGELOG.md",
      "author": "Sherif Asker"
    },
    {
      "id": "98267a15ea38b5619e8f3349f43f7841dba94c20",
      "message": "Update API endpoint port to 3333, implement JSON export of PR requests, and add aiofiles dependency for asynchronous file handling.",
      "author": "Sherif Asker"
    }
  ],
  "filesChanged": [
    {
      "filePath": "CHANGELOG.md",
      "changeType": "modified",
      "content": "diff --git a/CHANGELOG.md b/CHANGELOG.mdindex c78d719..d4d9d8b 100644--- a/CHANGELOG.md+++ b/CHANGELOG.md@@ -1,5 +1,11 @@ ## CHANGE 1 ## +## [1.2.0/develop/57+1] - 2025-08-14+### Added+Change port to 8050 on API Calling+Add json export in pr_api to store json body requests++ ### ANOTHER ENTRY FOR TESTING #### #### ASKER ###### ### CHANGE ####"
    },
    {
      "filePath": "pr_api.py",
      "changeType": "modified",
      "content": "diff --git a/pr_api.py b/pr_api.pyindex dc4ad5a..9e4126b 100644--- a/pr_api.py+++ b/pr_api.py@@ -1,6 +1,10 @@ from fastapi import FastAPI from pydantic import BaseModel from typing import List+import json+import os+import aiofiles+from datetime import datetime  app = FastAPI() @@ -34,6 +38,22 @@ class PRPayload(BaseModel): # --- Endpoint --- @app.post(\"/pr-review\") async def review_pr(payload: PRPayload):+    # Ensure requests directory exists+    requests_dir = \"requests\"+    os.makedirs(requests_dir, exist_ok=True)+    +    # Create filename with timestamp and PR ID+    timestamp = datetime.now().strftime(\"%Y%m%d_%H%M%S\")+    filename = f\"pr_request_{payload.pullRequest.id}_{timestamp}.json\"+    filepath = os.path.join(requests_dir, filename)+    +    # Convert payload to JSON and save to file+    payload_dict = payload.dict()+    async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:+        await f.write(json.dumps(payload_dict, indent=2, ens"
    },
    {
      "filePath": "requirements.txt",
      "changeType": "modified",
      "content": "diff --git a/requirements.txt b/requirements.txtindex 2c212b4..d50dd14 100644--- a/requirements.txt+++ b/requirements.txt@@ -1,3 +1,4 @@ fastapi==0.117.1 uvicorn==0.37.0 pydantic>=2.11.7+aiofiles>=23.0.0"
    }
  ],
  "changelog": "PR #7: Merge pull request 7 from feature/json-file-output into master"
}


In [110]:
payload = PRPayload(**test_payload)


In [112]:
print(payload.model_dump_json(indent=2))


{
  "pullRequest": {
    "id": 7,
    "title": "Merge pull request 7 from feature/json-file-output into master",
    "description": "PR submitted from Azure DevOps pipeline",
    "status": "active",
    "author": "Sherif Asker",
    "sourceBranch": "refs/heads/feature/json-file-output",
    "targetBranch": "refs/heads/master",
    "repository": "pr-api"
  },
  "commits": [
    {
      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",
      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",
      "author": "Sherif Asker"
    },
    {
      "id": "9065d11397027dd42150e522dd1ef4c53ed65974",
      "message": "Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.",
      "author": "Sherif Asker"
    },
    {
      "id": "59ea21e7db8268f661b1b6d98656c2e5f519c010",
      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050 and reflect this change in CHANGELOG.md",
      "author": "Sherif Asker"
  

In [114]:
response = Analysis_agent(payload.model_dump_json(indent=2))

🔍 [Full Analysis Agent] Starting vulnerability analysis...
[1m[values][0m {'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\n    "sourceBranch": "refs/heads/feature/json-file-output",\n    "targetBranch": "refs/heads/master",\n    "repository": "pr-api"\n  },\n  "commits": [\n    {\n      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",\n      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",\n      "author": "Sherif Asker"\n    },\n    {\n      "id": "9065d11397027dd42150e522dd1ef4c53ed65974",\n      "message": "Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.",\n      "author": "Sherif Asker"\n    },\n    {\n      "id": "59ea21e7db8268f661b1b6d98656c2e5f519c010",\n    



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='["CHANGELOG.md", "README.md", "azure-pipelines.yml", "pr_api.py", "requirements.txt", "send-pr-data-robust.sh", "send-pr-data-simple.sh", "send-pr-data.sh", "test-cloud-endpoint.sh", "test-pr-api.sh", "test-pr-basic.sh"]', name='list_directories', id='c61c561e-0dac-495a-9ce0-63cb6f686eb0', tool_call_id='chatcmpl-tool-7e86b4fdefc341f1a704fb978e44f43a')]}}
[1m[values][0m {'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\n    "sourceBranch": "refs/heads/feature/json-file-output",\n    "targetBranch": "refs/heads/master",\n    "repository": "pr-api"\n  },\n  "commits": [\n    {\n      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",\n      "message": "Update API endpoint port in azure-pipe



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='{"id": "pr_api.py", "repo_id": "pr-api", "chunk_id": null, "symbols": {"functions": ["review_pr"], "classes": ["Commit", "FileChange", "PullRequest", "PRPayload"]}, "imports": ["from fastapi import FastAPI", "from pydantic import BaseModel", "from typing import List"], "metadata": {"language": "python", "size": 2621, "timestamp": "2025-10-01T10:50:35.670036", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.34321858721224796, "tool_used": "get_metadata_by_id"}', name='get_metadata_by_id', id='8ef66fec-9a3e-41f4-955c-2253c55f9ad8', tool_call_id='chatcmpl-tool-743c6702216b41ad9bc07686f6b501a2')]}}
[1m[values][0m {'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='{"id": "pr_api.py", "content": "from fastapi import FastAPI\\nfrom pydantic import BaseModel\\nfrom typing import List\\n\\napp = FastAPI()\\n\\nclass Commit(BaseModel):\\n    id: str\\n    message: str\\n    author: str\\n\\nclass FileChange(BaseModel):\\n    filePath: str\\n    changeType: str\\n    content: str\\n\\nclass PullRequest(BaseModel):\\n    id: int\\n    title: str\\n    description: str\\n    status: str\\n    author: str\\n    sourceBranch: str\\n    targetBranch: str\\n    repository: str\\n\\nclass PRPayload(BaseModel):\\n    pullRequest: PullRequest\\n    commits: List[Commit]\\n    filesChanged: List[FileChange]\\n    changelog: str   # backlog replacement\\n\\n\\n# --- Endpoint ---\\n@app.post(\\"/pr-review\\")\\nasync def review_pr(payload: PRPayload):\\n    # Print the received payload for debugging\\n    print(\\"=== RECEIVED PR DATA ===\\")\\n    print(f\\"PR ID: {payload.pullRequest.id}\\")\\n    p



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='{"id": "requirements.txt", "content": "fastapi==0.117.1\\nuvicorn==0.37.0\\npydantic>=2.11.7\\n", "tool_used": "get_content_by_id"}', name='get_content_by_id', id='cd8e4be5-13cf-437e-8979-8526515e745f', tool_call_id='chatcmpl-tool-3fc13d71f9c6415f894d040ed0b01ab0')]}}
[1m[values][0m {'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\n    "sourceBranch": "refs/heads/feature/json-file-output",\n    "targetBranch": "refs/heads/master",\n    "repository": "pr-api"\n  },\n  "commits": [\n    {\n      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",\n      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",\n      "author": "Sherif Asker"\n    },\n    {\n      "id"



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='{"id": "CHANGELOG.md", "content": "## CHANGE 1 ##\\n\\n### ANOTHER ENTRY FOR TESTING ####\\n#### ASKER ######\\n### CHANGE ####\\n\\n## CHANGE 2 ##", "tool_used": "get_content_by_id"}', name='get_content_by_id', id='afd977c6-1c96-4b44-8bf6-a37dbe2d83c3', tool_call_id='chatcmpl-tool-a0b8161b80264d7bb0cb8565d24836e1')]}}
[1m[values][0m {'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\n    "sourceBranch": "refs/heads/feature/json-file-output",\n    "targetBranch": "refs/heads/master",\n    "repository": "pr-api"\n  },\n  "commits": [\n    {\n      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",\n      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",\n     



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "send-pr-data-robust.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 13478, "timestamp": "2025-10-01T10:50:35.672019", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.304520039403427, "tool_used": "search_vector_database", "search_query": "JSON file output implementation"}, {"id": "send-pr-data-simple.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 7020, "timestamp": "2025-10-01T10:50:35.672617", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.253701239383075, "tool_used": "search_vector_database", "search_query": "JSON file output implementation"}, {"id": "send-pr-data.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 7871, "timestamp": "2025-10-01T10:50:3



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='{"id": "README.md", "content": "# PR Review API - Azure DevOps Integration\\n\\n##### ASKER TEST CHANGE DETECTION\\nThis project provides a FastAPI endpoint that receives Pull Request data from Azure DevOps and processes it for code review analysis.\\n\\n## Components\\n\\n- **`pr_api.py`** - FastAPI server that receives PR data\\n- **`send-pr-data.sh`** - Script that extracts PR data from Azure DevOps and sends it to the API\\n- **`azure-pipelines.yml`** - Azure DevOps pipeline configuration for PR automation\\n- **`test-pr-api.sh`** - Local testing script\\n- **`test-cloud-endpoint.sh`** - Cloud endpoint testing script\\n\\n## Setup Instructions\\n\\n### 1. Deploy FastAPI Server\\n\\nDeploy the FastAPI server to your cloud server (appflowy.areeb.me):\\n\\n```bash\\n# Install dependencies\\npip install fastapi uvicorn\\n\\n# Run the server on port 8050\\npython3 -m uvicorn pr_api:app --host 0.0.0.0 --port 8050\\n```\\n\\n#



[1m[updates][0m {'agent': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'chatcmpl-tool-66eb390d0c614e55a2c7562d039dfdc7', 'function': {'arguments': '{"collection_name": "pr-api", "query": "asynchronous file handling in Python"}', 'name': 'search_vector_database'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 39, 'prompt_tokens': 7827, 'total_tokens': 7866, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'Areeb-Coder-FP8', 'system_fingerprint': None, 'id': 'chatcmpl-42bbba9944cd40de8403a743c66d244a', 'service_tier': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--f0c22d5e-2750-4359-95fe-05434f7e7794-0', tool_calls=[{'name': 'search_vector_database', 'args': {'collection_name': 'pr-api', 'query': 'asynchronous file handling in Python'}, 'id': 'chatcmpl-tool-66eb390d0c614e55a2c7562d039dfdc7', 'type': 'tool_call'}], usage_metadata={'input_tokens': 7827, 'out



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "requirements.txt", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 50, "timestamp": "2025-10-01T10:50:35.670036", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.255593130002564, "tool_used": "search_vector_database", "search_query": "asynchronous file handling in Python"}, {"id": "pr_api.py", "repo_id": "pr-api", "chunk_id": null, "symbols": {"functions": ["review_pr"], "classes": ["Commit", "FileChange", "PullRequest", "PRPayload"]}, "imports": ["from fastapi import FastAPI", "from pydantic import BaseModel", "from typing import List"], "metadata": {"language": "python", "size": 2621, "timestamp": "2025-10-01T10:50:35.670036", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.19224987921859504, "tool_used": "search_vector_database", "search_query": "asynchronous file handling in Python"}, {"id": 



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "test-pr-api.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2736, "timestamp": "2025-10-01T10:50:35.672886", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.37051290273666604, "tool_used": "search_vector_database", "search_query": "FastAPI JSON file saving best practices"}, {"id": "send-pr-data-robust.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 13478, "timestamp": "2025-10-01T10:50:35.672019", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.35511172291953297, "tool_used": "search_vector_database", "search_query": "FastAPI JSON file saving best practices"}, {"id": "test-cloud-endpoint.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2935, "timestamp": 



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "test-pr-api.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2736, "timestamp": "2025-10-01T10:50:35.672886", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.362643482921435, "tool_used": "search_vector_database", "search_query": "Python aiofiles usage in FastAPI"}, {"id": "test-cloud-endpoint.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2935, "timestamp": "2025-10-01T10:50:35.672886", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.361221711672, "tool_used": "search_vector_database", "search_query": "Python aiofiles usage in FastAPI"}, {"id": "requirements.txt", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 50, "timestamp": "2025-10-01T10:50:35.670036", 



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "test-pr-api.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2736, "timestamp": "2025-10-01T10:50:35.672886", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.396299588239144, "tool_used": "search_vector_database", "search_query": "FastAPI error handling for file operations"}, {"id": "test-cloud-endpoint.sh", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 2935, "timestamp": "2025-10-01T10:50:35.672886", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.36446534779461404, "tool_used": "search_vector_database", "search_query": "FastAPI error handling for file operations"}, {"id": "azure-pipelines.yml", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 6454, "timestamp": 



[1m[updates][0m {'tools': {'messages': [ToolMessage(content='[{"id": "requirements.txt", "repo_id": "pr-api", "chunk_id": null, "symbols": {}, "imports": [], "metadata": {"language": "unknown", "size": 50, "timestamp": "2025-10-01T10:50:35.670036", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.17174360246842202, "tool_used": "search_vector_database", "search_query": "secure file path handling in Python"}, {"id": "pr_api.py", "repo_id": "pr-api", "chunk_id": null, "symbols": {"functions": ["review_pr"], "classes": ["Commit", "FileChange", "PullRequest", "PRPayload"]}, "imports": ["from fastapi import FastAPI", "from pydantic import BaseModel", "from typing import List"], "metadata": {"language": "python", "size": 2621, "timestamp": "2025-10-01T10:50:35.670036", "organization": "areebtest", "project": "Project-B"}, "similarity_score": 0.14797411798660198, "tool_used": "search_vector_database", "search_query": "secure file path handling in Python"}, {"id": 

In [117]:
response

{'messages': [HumanMessage(content='Analyze this pr:\n {\n  "pullRequest": {\n    "id": 7,\n    "title": "Merge pull request 7 from feature/json-file-output into master",\n    "description": "PR submitted from Azure DevOps pipeline",\n    "status": "active",\n    "author": "Sherif Asker",\n    "sourceBranch": "refs/heads/feature/json-file-output",\n    "targetBranch": "refs/heads/master",\n    "repository": "pr-api"\n  },\n  "commits": [\n    {\n      "id": "f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c",\n      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8050",\n      "author": "Sherif Asker"\n    },\n    {\n      "id": "9065d11397027dd42150e522dd1ef4c53ed65974",\n      "message": "Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.",\n      "author": "Sherif Asker"\n    },\n    {\n      "id": "59ea21e7db8268f661b1b6d98656c2e5f519c010",\n      "message": "Update API endpoint port in azure-pipelines.yml from 3333 to 8

In [116]:
display(Markdown(response['messages'][-1].content))

# Pull Request Analysis

## Summary
This pull request implements a JSON file export feature for Pull Request data in the PR Review API. The changes include adding the `aiofiles` dependency, modifying the API endpoint to save incoming PR payloads as JSON files, and updating documentation to reflect these changes. The implementation appears to be functional but has several areas for improvement regarding error handling, security, and code quality.

## Detailed Findings

### File: pr_api.py
**Issues:**
1. **Incomplete code snippet**: The diff shows only part of the `review_pr` function implementation. The full implementation appears to be cut off after the `await f.write()` line, which means there might be missing error handling or cleanup logic.

2. **Missing error handling**: No try/except blocks around the file I/O operations. If file creation or writing fails (permissions, disk full, etc.), the API will crash instead of returning a meaningful error response.

3. **No validation of filename safety**: The filename is constructed using raw PR ID and timestamp, which could potentially lead to path traversal vulnerabilities if PR IDs contain malicious characters.

4. **Lack of file size or content validation**: There's no validation of the payload size or content before attempting to write to disk, which could lead to resource exhaustion.

5. **Inconsistent logging**: While the original code had comprehensive debug printing, the new implementation removes this useful debugging capability.

**Suggested Improvements:**
- Add proper error handling with try/except blocks around file operations
- Implement filename sanitization to prevent path traversal attacks
- Add input validation for payload size/content
- Consider using a more robust file naming scheme
- Add logging for success/failure cases

### File: requirements.txt
**Issues:**
1. **Dependency added correctly**: The `aiofiles>=23.0.0` dependency was correctly added.

### File: CHANGELOG.md
**Issues:**
1. **Poorly formatted changelog**: The changelog content appears malformed with inconsistent formatting and incomplete entries.
2. **Missing proper versioning**: The version format `[1.2.0/develop/57+1] - 2025-08-14` seems incorrect and non-standard.
3. **Incomplete content**: The changelog has very minimal content and appears to be an incomplete update.

## Security Concerns
1. **Path traversal vulnerability**: The filename construction directly uses `payload.pullRequest.id` without sanitization, which could allow attackers to create files outside the intended directory if the PR ID contains special characters.
2. **Lack of file permission controls**: Files are written without explicit permission management, potentially exposing sensitive data.
3. **No input validation**: The API accepts arbitrary JSON payloads without validation, which could lead to storage of malicious content.

## Recommendation
Request major changes. The implementation needs significant improvements in error handling, security, and code completeness. The PR adds important functionality but lacks proper safeguards and error recovery mechanisms. The changelog also needs substantial cleanup and standardization. The incomplete code snippet in `pr_api.py` suggests that the implementation is not fully complete and tested.

The core functionality of JSON export is valuable, but the current implementation poses security risks and lacks robustness. It should not be merged in its current state without addressing the identified issues.

# Workflow

## State

In [11]:
from workflow.nodes import parse_pr_data

In [8]:
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime


@dataclass
class WorkflowState:
    """
    Central state object for the PR analysis workflow.
    
    This state is passed to the PR analysis agent and maintains all 
    analysis results and metadata throughout the execution.
    """
    # Input parameters
    collection_name: Optional[str] = None
    
    # PR data for PR analysis workflow
    pr_data: str = ""
    
    # Workflow control
    current_step: str = "start"
    completed_nodes: List[str] = field(default_factory=list)
    
    # Analysis results
    analysis_response: Optional[str] = None
    analysis_error: Optional[str] = None
    
    # Analysis metadata
    analysis_metadata: Dict[str, Any] = field(default_factory=dict)
    
    # Execution tracking
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    
    def __post_init__(self):
        """Initialize start time when state is created"""
        if self.start_time is None:
            self.start_time = datetime.now()
    
    def mark_node_complete(self, node_name: str) -> None:
        """Mark a workflow node as completed"""
        if node_name not in self.completed_nodes:
            self.completed_nodes.append(node_name)
    
    def is_node_complete(self, node_name: str) -> bool:
        """Check if a workflow node has been completed"""
        return node_name in self.completed_nodes
    
    def finalize_analysis(self) -> None:
        """Mark the analysis as complete and set end time"""
        self.end_time = datetime.now()
        self.current_step = "complete"
    
    @property
    def execution_time(self) -> Optional[float]:
        """Get the total execution time in seconds"""
        if self.start_time and self.end_time:
            return (self.end_time - self.start_time).total_seconds()
        return None
    
    @property
    def is_complete(self) -> bool:
        """Check if the workflow is complete"""
        return self.current_step == "complete"

In [9]:
init_state = WorkflowState(
    pr_data=test_payload
)

In [12]:
parse_pr_data(init_state)

🧩 [Parse PR Data] Extracting collection_name from pr_data...
✅ [Parse PR Data] collection_name set to: pr-api


WorkflowState(collection_name='pr-api', pr_data={'pullRequest': {'id': 7, 'title': 'Merge pull request 7 from feature/json-file-output into master', 'description': 'PR submitted from Azure DevOps pipeline', 'status': 'active', 'author': 'Sherif Asker', 'sourceBranch': 'refs/heads/feature/json-file-output', 'targetBranch': 'refs/heads/master', 'repository': 'pr-api'}, 'commits': [{'id': 'f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c', 'message': 'Update API endpoint port in azure-pipelines.yml from 3333 to 8050', 'author': 'Sherif Asker'}, {'id': '9065d11397027dd42150e522dd1ef4c53ed65974', 'message': 'Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.', 'author': 'Sherif Asker'}, {'id': '59ea21e7db8268f661b1b6d98656c2e5f519c010', 'message': 'Update API endpoint port in azure-pipelines.yml from 3333 to 8050 and reflect this change in CHANGELOG.md', 'author': 'Sherif Asker'}, {'id': '98267a15ea38b5619e8f3349f43f7841dba94c20', 'message': 'Update API

In [13]:
init_state

WorkflowState(collection_name='pr-api', pr_data={'pullRequest': {'id': 7, 'title': 'Merge pull request 7 from feature/json-file-output into master', 'description': 'PR submitted from Azure DevOps pipeline', 'status': 'active', 'author': 'Sherif Asker', 'sourceBranch': 'refs/heads/feature/json-file-output', 'targetBranch': 'refs/heads/master', 'repository': 'pr-api'}, 'commits': [{'id': 'f9f6b6bac89e7e3def9de1dd3a4f9e8894fe4f9c', 'message': 'Update API endpoint port in azure-pipelines.yml from 3333 to 8050', 'author': 'Sherif Asker'}, {'id': '9065d11397027dd42150e522dd1ef4c53ed65974', 'message': 'Update CHANGELOG.md to include a new entry for JSON export feature and additional testing notes.', 'author': 'Sherif Asker'}, {'id': '59ea21e7db8268f661b1b6d98656c2e5f519c010', 'message': 'Update API endpoint port in azure-pipelines.yml from 3333 to 8050 and reflect this change in CHANGELOG.md', 'author': 'Sherif Asker'}, {'id': '98267a15ea38b5619e8f3349f43f7841dba94c20', 'message': 'Update API

In [14]:
# Cell 1: Setup imports
import os, sys, json

# Ensure repo root is on path if you're running from notebooks/
repo_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if repo_root not in sys.path:
    sys.path.insert(0, repo_root)

from workflow.graph import create_default_workflow
from workflow.state import WorkflowState

In [15]:
# Cell 2: Create a sample PR JSON (your Azure DevOps example)
state = WorkflowState(
    pr_data=json.dumps(test_payload)  # can also pass the dict directly; both are supported
)

In [18]:
# Cell 3: Run the workflow (parse_pr_data -> pr_analysis_agent)
app = create_default_workflow()
final_state = app.invoke(state, {"configurable": {"thread_id": 2}})

🧩 [Parse PR Data] Extracting collection_name from pr_data...
✅ [Parse PR Data] collection_name set to: pr-api
🔍 [PR Analysis Agent] Starting PR analysis...





PR ANALYSIS RESULTS
# Summary

This PR implements a new feature to export incoming Pull Request data as JSON files, while also updating the API endpoint port in CI/CD configuration. The changes introduce asynchronous file I/O using `aiofiles` and modify the FastAPI endpoint to save payloads to disk. There are some inconsistencies in the CHANGELOG and potential issues with the file saving logic.

# Detailed Findings

## File: pr_api.py
- **Issue**: Incomplete file write operation - The diff shows only part of the new file-saving logic, which appears to be cut off mid-function. This could cause runtime errors or incomplete data persistence.
- **Issue**: No error handling for file operations - The code writes to files without checking if the write was successful or handling potential exceptions during file I/O.
- **Security**: Directory traversal vulnerability - Using `os.makedirs()` without validation of the input path could allow directory traversal attacks if malicious filenames are p

In [24]:
print(app.get_state({"configurable": {"thread_id": 2}}).values['analysis_response'])

# Summary

This PR implements a new feature to export incoming Pull Request data as JSON files, while also updating the API endpoint port in CI/CD configuration. The changes introduce asynchronous file I/O using `aiofiles` and modify the FastAPI endpoint to save payloads to disk. There are some inconsistencies in the CHANGELOG and potential issues with the file saving logic.

# Detailed Findings

## File: pr_api.py
- **Issue**: Incomplete file write operation - The diff shows only part of the new file-saving logic, which appears to be cut off mid-function. This could cause runtime errors or incomplete data persistence.
- **Issue**: No error handling for file operations - The code writes to files without checking if the write was successful or handling potential exceptions during file I/O.
- **Security**: Directory traversal vulnerability - Using `os.makedirs()` without validation of the input path could allow directory traversal attacks if malicious filenames are passed.
- **Best Pract