# Checkpoint A Test Notebook

**Checkpoint A validates:**
- Stage 0: Run + Config Snapshot (boot)
- Stage 1: Source Planning (plan_sources)
- Stage 2: Collection + Raw Snapshot Store (collect)

This notebook tests each stage independently and then runs the full checkpoint.

In [20]:
# Setup path for imports
import sys
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
sys.path.insert(0, str(project_root))

print(f"Project root: {project_root}")

Project root: /Users/cm/Desktop/PYTHON/repos/job_agent


## Stage 0: Config & Boot

In [2]:
from core.config import Settings, load_config, SourceConfig, SourcesConfig

# Load from YAML config
sources_path = project_root / "config" / "sources.yaml"
settings, sources = load_config(sources_path)

print(f"Loaded {len(sources.sources)} sources:")
for src in sources.sources:
    status = "enabled" if src.enabled else "disabled"
    print(f"  - {src.source_id}: {src.source_type} [{status}]")

Loaded 2 sources:
  - anthropic_careers: careers_page [enabled]
  - openai_careers: careers_page [enabled]


In [3]:
settings

Settings(database_url='sqlite:///./data/job_agent.db', snapshots_dir='./data/snapshots', config_snapshots_dir='./data/config_snapshots', dry_run=False, log_level='INFO', default_timeout=30, default_rate_limit=1.0)

In [4]:
from core.context import RunContext

# Boot a run context
ctx = RunContext.boot(settings=settings, sources=sources)

print(f"Run ID: {ctx.run_id}")
print(f"Started: {ctx.started_at}")
print(f"Status: {ctx.status}")
print(f"Config snapshot: {ctx.config_snapshot_path}")

Run ID: 20260205_202549_6c7b2e25
Started: 2026-02-05 20:25:49.397058+00:00
Status: RunStatus.RUNNING
Config snapshot: data/config_snapshots/20260205_202549_6c7b2e25_config.json


## Stage 1: Source Planning

In [5]:
from collectors.planner import plan_sources, FetchTask

# Create fresh context for isolated test
ctx = RunContext.boot(settings, sources)

# Plan fetch tasks
tasks = plan_sources(ctx)

print(f"\nPlanned {len(tasks)} fetch tasks:")
for task in tasks:
    print(f"  - {task.source_id}")
    print(f"    URL: {task.url}")
    print(f"    Type: {task.source_type}")
    print(f"    Rate limit: {task.fetch_policy.rate_limit_rps} rps")
    print()


Planned 2 fetch tasks:
  - anthropic_careers
    URL: https://www.anthropic.com/careers
    Type: careers_page
    Rate limit: 0.5 rps

  - openai_careers
    URL: https://openai.com/careers
    Type: careers_page
    Rate limit: 0.5 rps



In [6]:
# Check stage log
print("Stage logs:")
for log in ctx.stage_logs:
    print(f"  {log.stage}: {log.items_in} -> {log.items_out} ({log.status})")

print(f"\nMetrics: {ctx.metrics.num_fetch_tasks} fetch tasks planned")

Stage logs:
  plan_sources: 2 -> 2 (completed)

Metrics: 2 fetch tasks planned


In [8]:
tasks

[FetchTask(url='https://www.anthropic.com/careers', source_id='anthropic_careers', source_type='careers_page', fetch_policy=FetchPolicy(rate_limit_rps=0.5, timeout_seconds=30, max_retries=3, follow_links=False), original_url='https://www.anthropic.com/careers', metadata={'company': 'Anthropic', 'priority': 'high', 'notes': 'AI safety company, key target'}),
 FetchTask(url='https://openai.com/careers', source_id='openai_careers', source_type='careers_page', fetch_policy=FetchPolicy(rate_limit_rps=0.5, timeout_seconds=30, max_retries=3, follow_links=False), original_url='https://openai.com/careers', metadata={'company': 'OpenAI', 'priority': 'high'})]

## Stage 2: Collection

This stage fetches URLs and stores raw snapshots.

In [9]:
from evidence.snapshot import FileSnapshotStore
from collectors.collector import collect
import asyncio

# Create snapshot store
store = FileSnapshotStore(settings.snapshots_dir)

# Run collection (async)
snapshots = await collect(tasks, ctx, store)

print(f"\nCollected {len(snapshots)} snapshots:")
for snap in snapshots:
    status = "OK" if snap.success else f"FAIL: {snap.error}"
    print(f"  - {snap.source_id}: {status}")
    if snap.success:
        print(f"    Content: {snap.content_length} bytes ({snap.content_type})")
        print(f"    Hash: {snap.content_hash}")


Collected 2 snapshots:
  - anthropic_careers: OK
    Content: 156637 bytes (text/html; charset=utf-8)
    Hash: bd677a9799370c10
  - openai_careers: FAIL: None


In [10]:
# Check metrics after collection
print("Collection metrics:")
print(f"  Successful: {ctx.metrics.num_snapshots_success}")
print(f"  Failed: {ctx.metrics.num_snapshots_failed}")

print("\nStage logs:")
for log in ctx.stage_logs:
    print(f"  {log.stage}: {log.items_in} -> {log.items_out} ({log.status})")
    if log.duration_seconds:
        print(f"    Duration: {log.duration_seconds:.2f}s")
    if log.errors:
        print(f"    Errors: {log.errors[:3]}...")  # Show first 3

Collection metrics:
  Successful: 1
  Failed: 1

Stage logs:
  plan_sources: 2 -> 2 (completed)
    Duration: 0.00s
  collect: 2 -> 2 (completed)
    Duration: 1.71s
    Errors: ['openai_careers: HTTP 403']...


## Inspect Stored Snapshots

In [11]:
# List snapshots for this run
stored_snapshots = store.list_by_run(ctx.run_id)

print(f"Snapshots stored for run {ctx.run_id}:")
for snap in stored_snapshots:
    print(f"\n  ID: {snap.snapshot_id}")
    print(f"  Source: {snap.source_id}")
    print(f"  URL: {snap.canonical_url}")
    print(f"  Status: {snap.status_code}")
    print(f"  Content path: {snap.content_path}")

Snapshots stored for run 20260205_203226_e30e9651:

  ID: 1817939c6653
  Source: anthropic_careers
  URL: https://www.anthropic.com/careers
  Status: 200
  Content path: data/snapshots/20260205_203226_e30e9651/1817939c6653.content

  ID: e0ec10090cd5
  Source: openai_careers
  URL: https://openai.com/careers
  Status: 403
  Content path: data/snapshots/20260205_203226_e30e9651/e0ec10090cd5.content


In [12]:
# Preview content from first successful snapshot
for snap in stored_snapshots:
    if snap.success and snap.content_path:
        content = store.get_content_by_path(snap.content_path)
        if content:
            preview = content[:500].decode('utf-8', errors='replace')
            print(f"Preview of {snap.source_id}:")
            print("-" * 50)
            print(preview)
            print("...")
            break

Preview of anthropic_careers:
--------------------------------------------------
<!DOCTYPE html><html lang="en" class="anthropicsans_eac0b31f-module__tjnuGq__variable anthropicserif_87b6fa7d-module__quIBbW__variable anthropicmono_fae19af3-module__c5XAsG__variable copernicus_4da799c5-module__dijTSq__variable styrenea_f8492ab1-module__HimLXW__variable styreneb_278af5c6-module__wkOAdG__variable tiempostext_4eff4b4c-module__mpviCW__variable jetbrainsmono_7d7bdbc6-module__j_XgJq__variable"><head><meta charSet="utf-8"/><meta name="viewport" content="width=device-width, initial-sca
...


## Full Checkpoint A Run

Now run the complete checkpoint using the orchestration runner.

In [13]:
from orchestration.runner import run_checkpoint_a_async, get_checkpoint_a_results

# Run complete checkpoint A
ctx = await run_checkpoint_a_async(sources_path=sources_path)

print(f"Checkpoint A complete!")
print(f"Run ID: {ctx.run_id}")
print(f"Status: {ctx.status}")

Checkpoint A complete!
Run ID: 20260205_204019_32a997a3
Status: RunStatus.COMPLETED


In [14]:
# Get detailed results
results = get_checkpoint_a_results(ctx)

print("\n=== Run Summary ===")
summary = results['summary']
print(f"Run ID: {summary['run_id']}")
print(f"Status: {summary['status']}")
print(f"Started: {summary['started_at']}")
print(f"Completed: {summary['completed_at']}")

print("\n=== Metrics ===")
for key, val in summary['metrics'].items():
    print(f"  {key}: {val}")

print("\n=== Stages ===")
for stage in summary['stages']:
    print(f"  {stage['stage']}: {stage['items_in']} -> {stage['items_out']} ({stage['status']})")


=== Run Summary ===
Run ID: 20260205_204019_32a997a3
Status: completed
Started: 2026-02-05T20:40:19.220360+00:00
Completed: 2026-02-05T20:40:21.309239+00:00

=== Metrics ===
  num_fetch_tasks: 2
  num_snapshots_success: 1
  num_snapshots_failed: 1
  num_parse_success: 0
  num_parse_failed: 0
  num_role_leads_upserted: 0
  num_candidates: 0

=== Stages ===
  plan_sources: 2 -> 2 (completed)
  collect: 2 -> 2 (completed)


In [15]:
# Show snapshot details
print("\n=== Snapshots ===")
for snap in results['snapshots']:
    status = "OK" if snap['success'] else f"FAIL: {snap['error']}"
    print(f"  {snap['source_id']}: {status}")
    if snap['success']:
        print(f"    {snap['content_length']} bytes, {snap['content_type']}")


=== Snapshots ===
  anthropic_careers: OK
    156637 bytes, text/html; charset=utf-8
  openai_careers: FAIL: None


## Test with Custom Sources

Create sources programmatically for testing.

In [16]:
# Create a minimal test source config
test_sources = SourcesConfig(sources=[
    SourceConfig(
        source_id="test_httpbin",
        source_type="test",
        url="https://httpbin.org/html",
        enabled=True,
        rate_limit_rps=1.0,
        metadata={"test": True}
    )
])

test_settings = Settings()

# Run with test config
test_ctx = await run_checkpoint_a_async(
    settings=test_settings,
    sources=test_sources
)

print(f"Test run: {test_ctx.run_id}")
print(f"Status: {test_ctx.status}")
print(f"Snapshots: {test_ctx.metrics.num_snapshots_success} success, {test_ctx.metrics.num_snapshots_failed} failed")

Test run: 20260205_204204_00a02f29
Status: RunStatus.COMPLETED
Snapshots: 1 success, 0 failed


In [17]:
# Create a minimal test source config
test_sources = SourcesConfig(sources=[
    SourceConfig(
        source_id="norm_ai_careers",
        source_type="careers_page",
        url="https://www.norm.ai/careers/",
        enabled=True,
        rate_limit_rps=1.0,
        metadata={"company": "Norm Ai", "priority": "high", "notes": "Legal AI company"}
    )
])


test_settings = Settings()

# Run with test config
test_ctx = await run_checkpoint_a_async(
    settings=test_settings,
    sources=test_sources
)

print(f"Test run: {test_ctx.run_id}")
print(f"Status: {test_ctx.status}")
print(f"Snapshots: {test_ctx.metrics.num_snapshots_success} success, {test_ctx.metrics.num_snapshots_failed} failed")

Test run: 20260205_204326_f05a2cad
Status: RunStatus.COMPLETED
Snapshots: 1 success, 0 failed


## Utility Functions

In [18]:
from core.ids import normalize_url, content_hash, slugify

# Test URL normalization
test_urls = [
    "https://Example.com/Jobs/?utm_source=google&id=123",
    "https://example.com/jobs?id=123",
    "HTTPS://EXAMPLE.COM/Jobs?id=123&utm_campaign=test",
]

print("URL Normalization:")
for url in test_urls:
    normalized = normalize_url(url)
    print(f"  {url}")
    print(f"  -> {normalized}")
    print()

URL Normalization:
  https://Example.com/Jobs/?utm_source=google&id=123
  -> https://example.com/Jobs?id=123

  https://example.com/jobs?id=123
  -> https://example.com/jobs?id=123

  HTTPS://EXAMPLE.COM/Jobs?id=123&utm_campaign=test
  -> https://example.com/Jobs?id=123



In [19]:
# Test content hashing
test_content = b"Hello, World!"
print(f"Content hash: {content_hash(test_content)}")

# Test slugify
test_names = ["Anthropic, Inc.", "OpenAI LP", "Google DeepMind"]
print("\nSlugify:")
for name in test_names:
    print(f"  {name} -> {slugify(name)}")

Content hash: dffd6021bb2bd5b0

Slugify:
  Anthropic, Inc. -> anthropic-inc
  OpenAI LP -> openai-lp
  Google DeepMind -> google-deepmind


---

## Checkpoint A Validation Checklist

- [ ] Config loads from YAML
- [ ] RunContext boots with unique run_id
- [ ] Config snapshot is saved to disk
- [ ] Sources are planned into FetchTasks
- [ ] HTTP collection respects rate limits
- [ ] Raw snapshots are stored with content
- [ ] Snapshots can be retrieved by run_id
- [ ] Stage logs capture metrics and timing
- [ ] Errors are captured without crashing