# DynamoDB Query Notebook
Query processed documents, extraction results, and pipeline errors.

**Table structure:**
- `DOC#{doc_id}` / `META` → document metadata + status
- `DOC#{doc_id}` / `EXTRACTION_COMPARISON` → entity extraction results
- `DOC#{doc_id}` / `ERROR#{step}#{timestamp}` → pipeline errors
- **GSI1**: query all docs processed on a given date
- **GSI2**: query all failed documents

In [None]:
import boto3
import pandas as pd
from decimal import Decimal
from boto3.dynamodb.conditions import Key

# ── Config ────────────────────────────────────────────────────────────────────
AWS_REGION     = 'us-east-2'
DYNAMODB_TABLE = 'wp-phoenix-statement-reporting-table'

dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION)
table    = dynamodb.Table(DYNAMODB_TABLE)

# Helper: convert Decimal back to float for display
def _clean(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    if isinstance(obj, dict):
        return {k: _clean(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_clean(i) for i in obj]
    return obj

print('Connected to:', DYNAMODB_TABLE)

---
## 1. Get all doc IDs processed today (or any date)

In [None]:
# Change this date to query a different day
query_date = '2026-02-15'   # format: YYYY-MM-DD

resp = table.query(
    IndexName='GSI1',
    KeyConditionExpression=Key('GSI1PK').eq(f'DATE#{query_date}')
)

docs_today = [_clean(item) for item in resp.get('Items', [])]

if docs_today:
    df = pd.DataFrame([{
        'doc_id':          d.get('doc_id'),
        'doc_name':        d.get('doc_name'),
        'status':          d.get('status'),
        'created_at':      d.get('created_at'),
        'merchant_group':  d.get('merchant_group'),
        'volume_tier_tsg': d.get('volume_tier_tsg'),
    } for d in docs_today])
    print(f'Found {len(df)} document(s) processed on {query_date}')
    display(df)
else:
    print(f'No documents found for {query_date}')

---
## 2. Get full metadata for a specific doc_id

In [None]:
# Paste any doc_id from the list above
DOC_ID = 'PASTE_DOC_ID_HERE'

resp = table.get_item(
    Key={'PK': f'DOC#{DOC_ID}', 'SK': 'META'}
)

meta = _clean(resp.get('Item', {}))

if meta:
    print(f"Document : {meta.get('doc_name')}")
    print(f"Status   : {meta.get('status')}")
    print(f"Created  : {meta.get('created_at')}")
    print(f"Completed: {meta.get('completed_at')}")
    print(f"S3 text  : {meta.get('s3_text_key')}")
    print(f"OS index : {meta.get('opensearch_index_name')}")
    print(f"Chars    : {meta.get('char_count'):,}" if meta.get('char_count') else "Chars: —")
    print(f"Tables   : {meta.get('table_count')}")
    print(f"Industry : {meta.get('merchant_group')}")
    print(f"Tier     : {meta.get('volume_tier_tsg')}")
else:
    print(f'No metadata found for doc_id: {DOC_ID}')

---
## 3. Get extraction results for a doc_id

In [None]:
# Uses DOC_ID from cell above — or paste a different one
# DOC_ID = 'PASTE_DOC_ID_HERE'

resp = table.get_item(
    Key={'PK': f'DOC#{DOC_ID}', 'SK': 'EXTRACTION_COMPARISON'}
)

ext = _clean(resp.get('Item', {}))

if ext:
    print('── Single Prompt Results ──────────────────────────────')
    sp = ext.get('single_prompt', {})
    print(f"  total_amount             : {sp.get('total_amount')}")
    print(f"  total_transactions_count : {sp.get('total_transactions_count')}")
    print(f"  total_fees               : {sp.get('total_fees')}")

    print('\n── Separate Prompts Results ───────────────────────────')
    sep = ext.get('separate_prompts', {})
    print(f"  total_amount             : {sep.get('total_amount')}")
    print(f"  total_transactions_count : {sep.get('total_transactions_count')}")
    print(f"  total_fees               : {sep.get('total_fees')}")

    print('\n── Match Flags ────────────────────────────────────────')
    for k, v in (ext.get('match_flags') or {}).items():
        print(f"  {k}: {'✓ match' if v else '✗ mismatch'}")

    print('\n── Judge Verdicts (separate prompts) ──────────────────')
    for k, v in (ext.get('judge_verdicts') or {}).items():
        print(f"  {k}: {v}")

    print('\n── Effective Rate + Benchmark ─────────────────────────')
    print(f"  effective_rate_raw     : {ext.get('effective_rate_raw')}")
    print(f"  benchmark_raw          : {ext.get('benchmark_raw')}")
    print(f"  delta_vs_benchmark_raw : {ext.get('delta_vs_benchmark_raw')}")
    print(f"  merchant_group         : {ext.get('merchant_group')}")
    print(f"  volume_tier_tsg        : {ext.get('volume_tier_tsg')}")
else:
    print(f'No extraction results found for doc_id: {DOC_ID}')

---
## 4. Get all pipeline errors for a doc_id

In [None]:
# Uses DOC_ID from above — or paste a different one
# DOC_ID = 'PASTE_DOC_ID_HERE'

resp = table.query(
    KeyConditionExpression=
        Key('PK').eq(f'DOC#{DOC_ID}') &
        Key('SK').begins_with('ERROR#')
)

errors = [_clean(item) for item in resp.get('Items', [])]

if errors:
    print(f'Found {len(errors)} error(s) for doc_id: {DOC_ID}\n')
    for e in errors:
        print(f"Step       : {e.get('step')}")
        print(f"Time       : {e.get('created_at')}")
        print(f"Error      : {e.get('error_message')}")
        print(f"Traceback  :\n{e.get('traceback')}")
        print('─' * 60)
else:
    print(f'No errors recorded for doc_id: {DOC_ID}')

---
## 5. Get ALL failed documents across the system (any doc, any date)

In [None]:
resp = table.query(
    IndexName='GSI2',
    KeyConditionExpression=Key('GSI2PK').eq('STATUS#ERROR')
)

all_errors = [_clean(item) for item in resp.get('Items', [])]

if all_errors:
    df_errors = pd.DataFrame([{
        'doc_id':        e.get('doc_id'),
        'step':          e.get('step'),
        'error_message': e.get('error_message'),
        'created_at':    e.get('created_at'),
    } for e in all_errors]).sort_values('created_at', ascending=False)
    print(f'Total errors across all documents: {len(df_errors)}')
    display(df_errors)
else:
    print('No pipeline errors found.')

---
## 6. Compare extraction methods across all documents (single vs separate)

In [None]:
# Get doc_ids from today (reuse from cell 1) or build your own list
# doc_ids = ['abc123', 'def456', ...]   # manual list
doc_ids = [d.get('doc_id') for d in docs_today if d.get('doc_id')]

rows = []
for doc_id in doc_ids:
    resp = table.get_item(
        Key={'PK': f'DOC#{doc_id}', 'SK': 'EXTRACTION_COMPARISON'}
    )
    ext = _clean(resp.get('Item', {}))
    if not ext:
        continue

    sp  = ext.get('single_prompt',   {})
    sep = ext.get('separate_prompts', {})
    mf  = ext.get('match_flags',      {})

    rows.append({
        'doc_id':                     doc_id,
        'single_amount':              sp.get('total_amount'),
        'separate_amount':            sep.get('total_amount'),
        'single_fees':                sp.get('total_fees'),
        'separate_fees':              sep.get('total_fees'),
        'single_txn_count':           sp.get('total_transactions_count'),
        'separate_txn_count':         sep.get('total_transactions_count'),
        'match_amount':               mf.get('match_total_amount'),
        'match_fees':                 mf.get('match_total_fees'),
        'match_txn':                  mf.get('match_total_transactions_count'),
        'effective_rate_raw':         ext.get('effective_rate_raw'),
        'benchmark_raw':              ext.get('benchmark_raw'),
        'delta_vs_benchmark_raw':     ext.get('delta_vs_benchmark_raw'),
    })

if rows:
    df_compare = pd.DataFrame(rows)
    print(f'Extraction comparison across {len(df_compare)} document(s):')
    display(df_compare)

    # Agreement rate
    for col in ['match_amount', 'match_fees', 'match_txn']:
        rate = df_compare[col].mean() * 100 if col in df_compare else 0
        print(f'{col}: {rate:.0f}% agreement between methods')
else:
    print('No extraction comparison data found.')

---
## 7. Get all records for a doc_id (full audit trail)

In [None]:
# Returns every record stored for this document
# DOC_ID = 'PASTE_DOC_ID_HERE'

resp = table.query(
    KeyConditionExpression=Key('PK').eq(f'DOC#{DOC_ID}')
)

all_records = [_clean(item) for item in resp.get('Items', [])]

print(f'All records for DOC#{DOC_ID}:')
for r in all_records:
    print(f"  SK = {r.get('SK')}")

print(f'\nTotal: {len(all_records)} record(s)')