# Test Single Flow Execution

Test that the scrape flow now executes only once and creates pipeline states without duplicates.

**Goal:** Verify that the duplicate flow execution issue has been fixed.


In [None]:
## Setup and Imports


In [None]:
import sys
from pathlib import Path
import json

# Add project root to path
project_root = Path.cwd()
sys.path.insert(0, str(project_root))

from flows.scrape_flow import scrape_flow
from pipeline.pipeline_state import PipelineStateManager

print("✓ Imports successful")
print(f"✓ Project root: {project_root}")
print("="*60)


## Step 1: Check Current Pipeline State


In [None]:
# Check current pipeline state before running the flow
manager = PipelineStateManager()
states = manager._read_all_states()

print(f"Current pipeline states: {len(states)}")
print("Recent test states:")
for i, state in enumerate(states[-5:]):  # Show last 5 states
    if state.get('source_url', '').startswith('https://example.com/test/'):
        print(f"  {i+1}. ID: {state.get('id', 'N/A')[:8]}... | URL: {state.get('source_url', 'N/A')}")
print("="*60)


## Step 2: Run Scrape Flow


In [None]:
# Run the scrape flow
print("🚀 Running scrape_flow...")
print("="*60)

try:
    scrape_result = scrape_flow(
        speaker="donald_trump",
        start_date="2025-01-01", 
        end_date="2025-12-31"
    )
    print(f"✓ Scrape flow completed: {scrape_result}")
except Exception as e:
    print(f"✗ Error running scrape flow: {e}")
    import traceback
    traceback.print_exc()

print("="*60)


## Step 3: Check Results


In [None]:
# Check pipeline state after running the flow
manager = PipelineStateManager()
states_after = manager._read_all_states()

print(f"Pipeline states after: {len(states_after)}")
print(f"New states created: {len(states_after) - len(states)}")

# Show new test states
print("\nNew test states:")
new_states = states_after[len(states):]
for i, state in enumerate(new_states):
    if state.get('source_url', '').startswith('https://example.com/test/'):
        print(f"  {i+1}. ID: {state.get('id', 'N/A')[:8]}... | URL: {state.get('source_url', 'N/A')}")

# Check for duplicates
test_urls = [s.get('source_url', '') for s in states_after if s.get('source_url', '').startswith('https://example.com/test/')]
unique_urls = set(test_urls)
duplicates = len(test_urls) - len(unique_urls)

print(f"\n📊 Results:")
print(f"  • Total test URLs: {len(test_urls)}")
print(f"  • Unique test URLs: {len(unique_urls)}")
print(f"  • Duplicates: {duplicates}")

if duplicates == 0:
    print("✅ SUCCESS: No duplicate pipeline states created!")
else:
    print("❌ ISSUE: Duplicate pipeline states detected")

print("="*60)


## Summary

The fix involved:

1. **Renamed** `tests/mock_scrape_flow.py` → `tests/test_transcript_generator.py`
2. **Removed** all Prefect flow/task decorators from the generator
3. **Updated** `flows/scrape_flow.py` to import the utility function
4. **Added** a Prefect task wrapper in `scrape_flow.py` only

Now only `scrape_flow.py` handles Prefect flow orchestration, while `test_transcript_generator.py` is just a utility for creating test data.


## Step 4: Test the Fix

Let's run the flow again to verify that duplicates are eliminated.


In [None]:
# Run the flow again to test the fix
print("🔧 Testing the duplicate fix...")
print("="*60)

# Check states before
states_before = manager._read_all_states()
print(f"Pipeline states before: {len(states_before)}")

try:
    scrape_result = scrape_flow(
        speaker="donald_trump",
        start_date="2025-01-01", 
        end_date="2025-12-31"
    )
    print(f"✓ Scrape flow completed: {scrape_result}")
except Exception as e:
    print(f"✗ Error running scrape flow: {e}")
    import traceback
    traceback.print_exc()

# Check states after
states_after = manager._read_all_states()
new_states = len(states_after) - len(states_before)

print(f"\n📊 Results:")
print(f"  • States before: {len(states_before)}")
print(f"  • States after: {len(states_after)}")
print(f"  • New states created: {new_states}")

if new_states == 3:
    print("✅ SUCCESS: Exactly 3 new pipeline states created (no duplicates)!")
else:
    print(f"❌ ISSUE: Expected 3 new states, got {new_states}")

print("="*60)
