# PR Agent System - Comprehensive Test Suite

This notebook provides comprehensive testing for the PR Agent System, validating all core functionality:

1. **Configuration Testing** - Validate environment setup
2. **Profile Management** - Test executive profile operations
3. **Synchronous Workflow** - Test basic comment generation
4. **Asynchronous Workflow** - Test async/parallel operations
5. **Streaming Workflow** - Test real-time streaming
6. **Phase 3 Features** - Test memory, RAG, and evaluation
7. **Error Handling** - Test resilience and fallbacks
8. **Cache Testing** - Test caching behavior

**Note**: This notebook requires proper API keys configured in your `.env` file.

## Setup and Imports

In [None]:
import sys
import os
import json
import asyncio
from pathlib import Path
from datetime import datetime

# Add parent directory to path
sys.path.append('../..')

from pr_agent import PRCommentAgent, PRAgentConfig
from pr_agent.profile_manager import ExecutiveProfileManager
from dotenv import load_dotenv

# Load environment variables
load_dotenv('../../.env')  # Load from project root

print(" Imports successful")
print(f" Python version: {sys.version}")
print(f" Working directory: {os.getcwd()}")

## Test 1: Configuration Validation

Verify that configuration is properly set up and all required environment variables are available.

In [None]:
def test_configuration():
    """Test configuration setup and validation."""
    print("Testing Configuration...")
    print("=" * 80)
    
    # Test 1.1: Create default config
    try:
        config = PRAgentConfig()
        print(" Default configuration created")
    except Exception as e:
        print(f" Failed to create config: {e}")
        return False
    
    # Test 1.2: Check LLM provider
    if config.openai_api_key:
        print(f" OpenAI API key found (model: {config.model_name})")
    elif config.anthropic_api_key:
        print(f" Anthropic API key found (model: {config.model_name})")
    else:
        print(" No LLM API key configured")
        return False
    
    # Test 1.3: Check search provider
    if config.serper_api_key:
        print(" Serper API key found")
    elif config.tavily_api_key:
        print(" Tavily API key found")
    else:
        print(" No search API key configured (some features may fail)")
    
    # Test 1.4: Check email configuration
    if config.email_from and config.email_password:
        print(f" Email configured: {config.email_from}")
        if config.pr_manager_email:
            print(f" PR manager email: {config.pr_manager_email}")
    else:
        print(" Email not configured (email notifications will be skipped)")
    
    # Test 1.5: Validate configuration
    try:
        config.validate()
        print(" Configuration validation passed")
    except ValueError as e:
        print(f" Configuration validation warnings: {e}")
    
    # Test 1.6: Custom configuration
    custom_config = PRAgentConfig(
        temperature=0.8,
        max_search_results=3,
        enable_verbose_logging=True
    )
    assert custom_config.temperature == 0.8
    assert custom_config.max_search_results == 3
    print(" Custom configuration works")
    
    print("\n Configuration tests passed\n")
    return True

test_configuration()

## Test 2: Profile Management

Test loading, creating, and managing executive profiles.

In [None]:
def test_profile_management():
    """Test executive profile management."""
    print("Testing Profile Management...")
    print("=" * 80)
    
    manager = ExecutiveProfileManager()
    
    # Test 2.1: List profiles
    profiles = manager.list_profiles()
    print(f" Found {len(profiles)} profiles: {profiles}")
    
    if len(profiles) == 0:
        print(" No profiles found. Create at least one profile to test.")
        return False
    
    # Test 2.2: Load existing profile
    test_profile_name = profiles[0]
    try:
        profile = manager.load_profile(test_profile_name)
        print(f" Loaded profile: {profile['name']}")
        print(f"  Title: {profile['title']}")
        print(f"  Company: {profile.get('company', 'N/A')}")
        print(f"  Expertise areas: {len(profile.get('expertise', []))}")
    except Exception as e:
        print(f" Failed to load profile: {e}")
        return False
    
    # Test 2.3: Validate required fields
    required_fields = ['name', 'title', 'communication_style', 'expertise']
    for field in required_fields:
        if field not in profile or not profile[field]:
            print(f" Missing required field: {field}")
            return False
    print(f" All required fields present")
    
    # Test 2.4: Create sample profile
    sample_profile = manager.create_sample_profile("Test Executive")
    assert sample_profile['name'] == "Test Executive"
    assert 'expertise' in sample_profile
    print(" Sample profile creation works")
    
    # Test 2.5: Profile caching
    profile_cached = manager.load_profile(test_profile_name)
    assert profile == profile_cached
    print(" Profile caching works")
    
    print("\n Profile management tests passed\n")
    return True

test_profile_management()

## Test 3: Agent Initialization

Test that the PR Agent initializes correctly with all components.

In [None]:
def test_agent_initialization():
    """Test PR Agent initialization."""
    print("Testing Agent Initialization...")
    print("=" * 80)
    
    # Test 3.1: Initialize with default config
    try:
        config = PRAgentConfig(enable_verbose_logging=False)
        agent = PRCommentAgent(config)
        print(" Agent initialized with default config")
    except Exception as e:
        print(f" Failed to initialize agent: {e}")
        return None
    
    # Test 3.2: Check components
    assert agent.main_llm is not None
    print(" Main LLM initialized")
    
    assert agent.humanizer_llm is not None
    print(" Humanizer LLM initialized")
    
    assert agent.web_search is not None
    print(" Web search tool initialized")
    
    assert agent.media_research_tool is not None
    print(" Media research tool initialized")
    
    assert agent.email_sender is not None
    print(" Email sender initialized")
    
    assert agent.profile_manager is not None
    print(" Profile manager initialized")
    
    assert agent.workflow is not None
    print(" Workflow graph built")
    
    # Test 3.3: Check specialized agents
    assert agent.media_researcher is not None
    assert agent.data_researcher is not None
    assert agent.comment_drafter is not None
    assert agent.humanizer is not None
    print(" All specialized agents initialized")
    
    # Test 3.4: Check cache
    assert agent.cache is not None
    print(f" Cache initialized (enabled: {agent.cache.enabled})")
    
    print("\n Agent initialization tests passed\n")
    return agent

agent = test_agent_initialization()

## Test 4: Synchronous Comment Generation

Test the basic synchronous workflow for generating PR comments.

In [None]:
# Sample data for testing
test_article = """
Recent research from the Global Marketing Institute shows that brands investing 
in long-term brand building alongside performance marketing see 3x better ROI 
over a five-year period compared to those focused solely on short-term conversions.

The study analyzed $2.3 billion in marketing spend across 150 brands and found 
that the optimal split is 60% brand building and 40% activation. However, current 
industry practice shows most brands do the opposite.
"""

test_question = """
How should CMOs balance short-term performance demands with long-term brand 
building in today's data-driven marketing environment?
"""

test_media_outlet = "Marketing Week"
test_journalist = "Rachel Morrison"

def test_sync_generation(agent):
    """Test synchronous comment generation."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return None
    
    print("Testing Synchronous Comment Generation...")
    print("=" * 80)
    
    # Get available profiles
    profiles = agent.profile_manager.list_profiles()
    if len(profiles) == 0:
        print(" No profiles available for testing")
        return None
    
    test_executive = profiles[0]
    print(f"Testing with executive: {test_executive}")
    
    try:
        start_time = datetime.now()
        
        result = agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive,
            journalist_name=test_journalist,
            article_url="https://example.com/test-article"
        )
        
        duration = (datetime.now() - start_time).total_seconds()
        
        print(f" Comment generated in {duration:.2f} seconds")
        
        # Validate result structure
        required_keys = [
            'drafted_comment', 'humanized_comment', 'executive_profile',
            'media_research', 'supporting_data', 'current_step', 'errors'
        ]
        
        for key in required_keys:
            assert key in result, f"Missing key: {key}"
        print(" Result structure valid")
        
        # Check that comment was generated
        assert result['drafted_comment'] is not None
        assert len(result['drafted_comment']) > 0
        print(f" Drafted comment generated ({len(result['drafted_comment'])} chars)")
        
        assert result['humanized_comment'] is not None
        assert len(result['humanized_comment']) > 0
        print(f" Humanized comment generated ({len(result['humanized_comment'])} chars)")
        
        # Check workflow completion
        assert result['current_step'] in ['email_sent', 'completed']
        print(f" Workflow completed (step: {result['current_step']})")
        
        # Display results
        print("\n" + "=" * 80)
        print("GENERATED COMMENT (HUMANIZED)")
        print("=" * 80)
        print(result['humanized_comment'])
        print("=" * 80)
        
        print(f"\nEmail sent: {result['email_sent']}")
        print(f"Errors: {len(result['errors'])} - {result['errors'] if result['errors'] else 'None'}")
        
        print("\n Synchronous generation tests passed\n")
        return result
        
    except Exception as e:
        print(f" Comment generation failed: {e}")
        import traceback
        traceback.print_exc()
        return None

sync_result = test_sync_generation(agent)

## Test 5: Asynchronous Comment Generation

Test async workflow with parallel operations for improved performance.

In [None]:
async def test_async_generation(agent):
    """Test asynchronous comment generation."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return None
    
    print("Testing Asynchronous Comment Generation...")
    print("=" * 80)
    
    profiles = agent.profile_manager.list_profiles()
    if len(profiles) == 0:
        print(" No profiles available for testing")
        return None
    
    test_executive = profiles[0]
    print(f"Testing with executive: {test_executive}")
    
    try:
        start_time = datetime.now()
        
        result = await agent.generate_comment_async(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive,
            journalist_name=test_journalist
        )
        
        duration = (datetime.now() - start_time).total_seconds()
        
        print(f" Async comment generated in {duration:.2f} seconds")
        
        # Validate result
        assert result['drafted_comment'] is not None
        assert result['humanized_comment'] is not None
        assert 'duration_seconds' in result
        print(f" Result valid (reported duration: {result['duration_seconds']:.2f}s)")
        
        # Check parallel research worked
        assert result['media_research'] is not None
        assert result['supporting_data'] is not None
        print(" Parallel research completed")
        
        print("\n" + "=" * 80)
        print("ASYNC GENERATED COMMENT")
        print("=" * 80)
        print(result['humanized_comment'][:500] + "..." if len(result['humanized_comment']) > 500 else result['humanized_comment'])
        print("=" * 80)
        
        print("\n Asynchronous generation tests passed\n")
        return result
        
    except Exception as e:
        print(f" Async generation failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# Run async test
async_result = await test_async_generation(agent)

## Test 6: Streaming Comment Generation

Test real-time streaming of comment generation with progress updates.

In [None]:
async def test_streaming_generation(agent):
    """Test streaming comment generation."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return None
    
    print("Testing Streaming Comment Generation...")
    print("=" * 80)
    
    profiles = agent.profile_manager.list_profiles()
    if len(profiles) == 0:
        print(" No profiles available for testing")
        return None
    
    test_executive = profiles[0]
    print(f"Testing with executive: {test_executive}\n")
    
    try:
        events_received = []
        final_result = None
        
        async for event in agent.generate_comment_stream(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive
        ):
            events_received.append(event)
            
            if event['event'] == 'started':
                print(f"â†’ Started: {event['step']}")
            elif event['event'] == 'completed':
                print(f" Completed: {event['step']}")
            elif event['event'] == 'streaming':
                # Show first chunk of each step
                if len([e for e in events_received if e.get('step') == event['step'] and e.get('event') == 'streaming']) == 1:
                    print(f"  Streaming {event['step']}...", end='')
            elif event['event'] == 'finished':
                final_result = event['data']
                print(f"\n Workflow finished")
        
        print(f"\n Received {len(events_received)} events")
        
        # Validate event sequence
        expected_steps = ['profile', 'research', 'drafting', 'humanizing']
        for step in expected_steps:
            started = any(e['event'] == 'started' and e['step'] == step for e in events_received)
            completed = any(e['event'] == 'completed' and e['step'] == step for e in events_received)
            assert started and completed, f"Missing events for step: {step}"
        print(" All expected steps present")
        
        # Check final result
        assert final_result is not None
        assert final_result['humanized_comment'] is not None
        print(" Final result valid")
        
        print("\n" + "=" * 80)
        print("STREAMED COMMENT (FINAL)")
        print("=" * 80)
        print(final_result['humanized_comment'][:500] + "..." if len(final_result['humanized_comment']) > 500 else final_result['humanized_comment'])
        print("=" * 80)
        
        print("\n Streaming generation tests passed\n")
        return final_result
        
    except Exception as e:
        print(f" Streaming generation failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# Run streaming test
streaming_result = await test_streaming_generation(agent)

## Test 7: Phase 3 Features (Memory, RAG, Evaluation)

Test advanced Phase 3 features if enabled in configuration.

In [None]:
async def test_phase3_features(agent):
    """Test Phase 3 features: memory, RAG, and evaluation."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return None
    
    print("Testing Phase 3 Features...")
    print("=" * 80)
    
    # Check if Phase 3 components are enabled
    memory_enabled = agent.memory and agent.memory.enabled
    rag_enabled = agent.rag and agent.rag.enabled
    evaluator_enabled = agent.evaluator and agent.evaluator.enabled
    
    print(f"Memory enabled: {memory_enabled}")
    print(f"RAG enabled: {rag_enabled}")
    print(f"Evaluator enabled: {evaluator_enabled}")
    
    if not (memory_enabled or rag_enabled or evaluator_enabled):
        print("\n No Phase 3 features enabled - skipping advanced tests")
        return None
    
    profiles = agent.profile_manager.list_profiles()
    if len(profiles) == 0:
        print(" No profiles available for testing")
        return None
    
    test_executive = profiles[0]
    session_id = f"test_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    print(f"\nTesting with executive: {test_executive}")
    print(f"Session ID: {session_id}\n")
    
    try:
        start_time = datetime.now()
        
        result = await agent.generate_comment_with_memory_and_evaluation(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive,
            session_id=session_id,
            enable_evaluation=True
        )
        
        duration = (datetime.now() - start_time).total_seconds()
        
        print(f" Phase 3 workflow completed in {duration:.2f} seconds")
        
        # Validate Phase 3 result structure
        assert 'session_id' in result
        assert 'past_comments' in result
        assert 'rag_context' in result
        assert 'evaluation_scores' in result
        assert 'phase3_enabled' in result
        print(" Phase 3 result structure valid")
        
        # Display Phase 3 data
        print("\n" + "=" * 80)
        print("PHASE 3 RESULTS")
        print("=" * 80)
        
        if memory_enabled:
            print(f"Past comments retrieved: {len(result['past_comments'])}")
            print(f"Conversation history: {len(result.get('conversation_history', []))}")
        
        if rag_enabled:
            print(f"RAG context: {result['rag_context'].get('enabled', False)}")
            if result['rag_context'].get('enabled'):
                retrieval_counts = result['rag_context'].get('retrieval_counts', {})
                print(f"  - Similar comments: {retrieval_counts.get('similar_comments', 0)}")
                print(f"  - Media knowledge: {retrieval_counts.get('media_knowledge', 0)}")
                print(f"  - Examples: {retrieval_counts.get('examples', 0)}")
        
        if evaluator_enabled:
            eval_scores = result['evaluation_scores']
            if eval_scores.get('enabled'):
                print(f"Evaluation scores:")
                print(f"  - Overall score: {eval_scores.get('overall_score', 0):.2f}")
                print(f"  - Overall passed: {eval_scores.get('overall_passed', False)}")
                if 'criteria_scores' in eval_scores:
                    for criterion, score in eval_scores['criteria_scores'].items():
                        print(f"  - {criterion}: {score:.2f}")
        
        print("=" * 80)
        
        # Test Phase 3 stats
        stats = agent.get_phase3_stats()
        print("\nPhase 3 Statistics:")
        print(json.dumps(stats, indent=2))
        
        print("\n Phase 3 feature tests passed\n")
        return result
        
    except Exception as e:
        print(f" Phase 3 workflow failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# Run Phase 3 test
phase3_result = await test_phase3_features(agent)

## Test 8: Error Handling and Edge Cases

Test resilience and error handling capabilities.

In [None]:
def test_error_handling(agent):
    """Test error handling and validation."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return
    
    print("Testing Error Handling...")
    print("=" * 80)
    
    profiles = agent.profile_manager.list_profiles()
    test_executive = profiles[0] if profiles else "Test Executive"
    
    # Test 8.1: Empty article text
    try:
        agent.generate_comment(
            article_text="",
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive
        )
        print(" Should have raised ValueError for empty article")
    except ValueError as e:
        print(f" Empty article validation: {str(e)[:50]}")
    
    # Test 8.2: Empty question
    try:
        agent.generate_comment(
            article_text=test_article,
            journalist_question="",
            media_outlet=test_media_outlet,
            executive_name=test_executive
        )
        print(" Should have raised ValueError for empty question")
    except ValueError as e:
        print(f" Empty question validation: {str(e)[:50]}")
    
    # Test 8.3: Invalid URL
    try:
        agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive,
            article_url="not-a-valid-url"
        )
        print(" Should have raised ValueError for invalid URL")
    except ValueError as e:
        print(f" Invalid URL validation: {str(e)[:50]}")
    
    # Test 8.4: Invalid email
    try:
        agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive,
            pr_manager_email="not-an-email"
        )
        print(" Should have raised ValueError for invalid email")
    except ValueError as e:
        print(f" Invalid email validation: {str(e)[:50]}")
    
    # Test 8.5: Article too long
    try:
        long_article = "x" * 60000
        agent.generate_comment(
            article_text=long_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive
        )
        print(" Should have raised ValueError for article too long")
    except ValueError as e:
        print(f" Article length validation: {str(e)[:50]}")
    
    # Test 8.6: Nonexistent profile (should fail)
    try:
        agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name="Nonexistent Executive 12345"
        )
        print(" Workflow continued with nonexistent profile (unexpected)")
    except Exception as e:
        print(f" Nonexistent profile handled: {type(e).__name__}")
    
    print("\n Error handling tests passed\n")

test_error_handling(agent)

## Test 9: Cache Testing

Test caching behavior to ensure responses are cached correctly.

In [None]:
def test_caching(agent):
    """Test response caching."""
    if agent is None:
        print(" Skipping test - agent not initialized")
        return
    
    if not agent.cache.enabled:
        print(" Cache is disabled - skipping cache tests")
        return
    
    print("Testing Cache Behavior...")
    print("=" * 80)
    
    profiles = agent.profile_manager.list_profiles()
    if len(profiles) == 0:
        print(" No profiles available for testing")
        return
    
    test_executive = profiles[0]
    
    # Note: Cache will be used if available, testing speedup
    try:
        # Test 9.1: First request (cache miss)
        print("First request (should be cache miss)...")
        start_time = datetime.now()
        
        result1 = agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive
        )
        
        duration1 = (datetime.now() - start_time).total_seconds()
        print(f" First request completed in {duration1:.2f}s")
        
        # Test 9.2: Second request (cache hit)
        print("\nSecond request (should be cache hit)...")
        start_time = datetime.now()
        
        result2 = agent.generate_comment(
            article_text=test_article,
            journalist_question=test_question,
            media_outlet=test_media_outlet,
            executive_name=test_executive
        )
        
        duration2 = (datetime.now() - start_time).total_seconds()
        print(f" Second request completed in {duration2:.2f}s")
        
        # Test 9.3: Verify cache speedup
        if duration2 < duration1 * 0.5:  # Should be at least 2x faster
            print(f" Cache speedup: {duration1/duration2:.1f}x faster")
        else:
            print(f" Cache may not be working (speedup: {duration1/duration2:.1f}x)")
        
        # Test 9.4: Verify same result
        assert result1['humanized_comment'] == result2['humanized_comment']
        print(" Cached result matches original")
        
        print("\n Cache tests passed\n")
        
    except Exception as e:
        print(f" Cache test failed: {e}")
        import traceback
        traceback.print_exc()

test_caching(agent)

## Test Summary

Display a comprehensive summary of all test results.

In [None]:
def display_test_summary():
    """Display summary of all tests."""
    print("\n" + "=" * 80)
    print("TEST SUMMARY")
    print("=" * 80)
    
    tests = [
        ("Configuration Validation", agent is not None),
        ("Profile Management", agent is not None),
        ("Agent Initialization", agent is not None),
        ("Synchronous Generation", sync_result is not None),
        ("Asynchronous Generation", async_result is not None),
        ("Streaming Generation", streaming_result is not None),
        ("Phase 3 Features", phase3_result is not None),
        ("Error Handling", True),  # Always runs
        ("Cache Testing", agent is not None and agent.cache.enabled)
    ]
    
    passed = sum(1 for _, result in tests if result)
    total = len(tests)
    
    for test_name, result in tests:
        status = " PASS" if result else " FAIL/SKIP"
        print(f"{status:12} {test_name}")
    
    print("=" * 80)
    print(f"Results: {passed}/{total} tests passed ({passed/total*100:.0f}%)")
    print("=" * 80)
    
    if passed == total:
        print("\nðŸŽ‰ All tests passed! The PR Agent System is working correctly.")
    else:
        print(f"\n {total - passed} test(s) failed or were skipped. Review output above.")

display_test_summary()

## Additional Testing

Use the cells below for ad-hoc testing and experimentation.

In [None]:
# Custom test: Try your own article and question

custom_article = """
Paste your article text here...
"""

custom_question = """
Paste your journalist question here...
"""

# Uncomment and run to test with custom input
# if agent and custom_article.strip() and custom_question.strip():
#     custom_result = agent.generate_comment(
#         article_text=custom_article,
#         journalist_question=custom_question,
#         media_outlet="Your Media Outlet",
#         executive_name=agent.profile_manager.list_profiles()[0]
#     )
#     print(custom_result['humanized_comment'])

In [None]:
# Inspect agent configuration
if agent:
    print("Current Agent Configuration:")
    print("=" * 80)
    print(f"Model: {agent.config.model_name}")
    print(f"Temperature: {agent.config.temperature}")
    print(f"Humanizer Temperature: {agent.config.humanizer_temperature}")
    print(f"Max Search Results: {agent.config.max_search_results}")
    print(f"Cache Enabled: {agent.cache.enabled}")
    print(f"Streaming Enabled: {agent.config.enable_streaming}")
    print(f"Async Enabled: {agent.config.async_enabled}")
    print(f"Verbose Logging: {agent.config.enable_verbose_logging}")
    print(f"Tracing Enabled: {agent.config.enable_tracing}")

## Cleanup

Clean up resources after testing.

In [None]:
# Cleanup code (if needed)
print("Test suite completed successfully!")
print("\nTo run specific tests again, simply re-execute the relevant cells above.")