# Data Processing Pipeline Demo

This notebook demonstrates the end-to-end data processing pipeline for the Macro AI Agent.

## Pipeline Stages

1. **Ingestion**: Fetch raw data from sources
2. **Cleaning**: Parse and structure the data
3. **Enrichment**: Add classifications and metadata
4. **Chunking**: Split into LLM-optimized chunks
5. **Indexing**: Store in vector database

**Updated**: 2025-11-18

In [None]:
# Setup
import os
import sys
from pathlib import Path

# Add src to path
sys.path.append(str(Path.cwd().parent / 'src'))

# Imports
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
from pprint import pprint

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

print("‚úÖ Environment setup complete")

## Stage 1: Ingestion (FRED Example)

Fetch economic data from FRED API.

In [None]:
# FRED API ingestion example
from fredapi import Fred

# Initialize FRED client
FRED_API_KEY = os.getenv('FRED_API_KEY')
fred = Fred(api_key=FRED_API_KEY)

# Fetch CPI data
print("Fetching CPI data from FRED...")
cpi_data = fred.get_series('CPIAUCSL', observation_start='2020-01-01')
cpi_info = fred.get_series_info('CPIAUCSL')

print(f"\nüìä Fetched {len(cpi_data)} observations")
print(f"\nSeries Info:")
print(f"  Title: {cpi_info['title']}")
print(f"  Units: {cpi_info['units']}")
print(f"  Frequency: {cpi_info['frequency']}")
print(f"  Last Updated: {cpi_info['last_updated']}")

# Preview data
print(f"\nLatest 5 observations:")
print(cpi_data.tail())

In [None]:
# Visualize CPI data
plt.figure(figsize=(12, 6))
plt.plot(cpi_data.index, cpi_data.values, linewidth=2)
plt.title('Consumer Price Index (CPI) - All Urban Consumers', fontsize=14, fontweight='bold')
plt.xlabel('Date')
plt.ylabel('Index (1982-1984=100)')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Calculate YoY inflation rate
inflation_rate = cpi_data.pct_change(12) * 100  # 12-month change

plt.figure(figsize=(12, 6))
plt.plot(inflation_rate.index, inflation_rate.values, linewidth=2, color='red')
plt.axhline(y=2.0, color='green', linestyle='--', label='Fed Target (2%)')
plt.title('Year-over-Year Inflation Rate', fontsize=14, fontweight='bold')
plt.xlabel('Date')
plt.ylabel('Inflation Rate (%)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nüìà Current inflation rate: {inflation_rate.iloc[-1]:.2f}%")

## Stage 2: Cleaning (FOMC Statement Example)

Parse and clean a sample FOMC statement.

In [None]:
# Sample FOMC statement (for demonstration)
sample_fomc_statement = """
FOR RELEASE AT 2:00 P.M. EST
March 20, 2024

Recent indicators suggest that economic activity has been expanding at a solid pace. 
Job gains have been strong, and the unemployment rate has remained low. 
Inflation has eased over the past year but remains elevated. 

The Committee seeks to achieve maximum employment and inflation at the rate of 2 percent over the longer run. 
In support of these goals, the Committee decided to maintain the target range for the federal funds rate at 5-1/4 to 5-1/2 percent. 

The Committee will continue to assess additional information and its implications for monetary policy. 
In determining the extent of any additional policy firming that may be appropriate to return inflation to 2 percent over time, 
the Committee will take into account the cumulative tightening of monetary policy, the lags with which monetary policy affects 
economic activity and inflation, and economic and financial developments. 

In addition, the Committee will continue reducing its holdings of Treasury securities and agency debt and agency 
mortgage-backed securities. The Committee is strongly committed to returning inflation to its 2 percent objective.
"""

# Parse and clean
from bs4 import BeautifulSoup
import re

def clean_fomc_statement(text: str) -> dict:
    """Clean and structure FOMC statement."""
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Extract date
    date_match = re.search(r'(\w+ \d+, \d{4})', text)
    date_str = date_match.group(1) if date_match else None
    
    # Remove header
    text = re.sub(r'FOR RELEASE AT.*?\d{4}', '', text).strip()
    
    return {
        "title": "FOMC Statement",
        "content": text,
        "published_at": date_str,
        "source_name": "Federal Reserve",
        "content_type": "policy",
        "macro_themes": ["monetary_policy", "inflation"],
        "geography": ["us"],
        "importance": "high"
    }

cleaned_statement = clean_fomc_statement(sample_fomc_statement)

print("‚úÖ Cleaned FOMC Statement:")
print(json.dumps(cleaned_statement, indent=2))

## Stage 3: Enrichment (Classification & Entity Extraction)

Add AI-generated classifications and extract entities.

In [None]:
# Sentiment classification (hawkish vs dovish)
HAWKISH_PHRASES = [
    "persistent inflation", "inflation pressures", "further increases",
    "additional tightening", "restrictive policy", "vigilant",
    "committed to bringing inflation down", "policy firming"
]

DOVISH_PHRASES = [
    "inflation has eased", "inflation has moderated", "disinflation",
    "patient", "data-dependent", "judicious", "monitor",
    "sufficient restrictiveness", "maintain"
]

def classify_fomc_sentiment(text: str) -> dict:
    """Classify FOMC statement sentiment."""
    text_lower = text.lower()
    
    hawkish_count = sum(1 for phrase in HAWKISH_PHRASES if phrase in text_lower)
    dovish_count = sum(1 for phrase in DOVISH_PHRASES if phrase in text_lower)
    
    if hawkish_count > dovish_count + 1:
        sentiment = "hawkish"
    elif dovish_count > hawkish_count + 1:
        sentiment = "dovish"
    else:
        sentiment = "neutral"
    
    return {
        "sentiment": sentiment,
        "hawkish_signals": hawkish_count,
        "dovish_signals": dovish_count,
        "confidence": abs(hawkish_count - dovish_count) / max(hawkish_count + dovish_count, 1)
    }

sentiment_analysis = classify_fomc_sentiment(cleaned_statement['content'])

print("üìä Sentiment Analysis:")
print(json.dumps(sentiment_analysis, indent=2))

# Add to document
cleaned_statement['sentiment'] = sentiment_analysis['sentiment']
cleaned_statement['ai_analysis'] = {
    "sentiment_details": sentiment_analysis,
    "key_takeaways": [
        "Fed maintains rates at 5.25-5.50%",
        "Inflation has eased but remains elevated",
        "Data-dependent approach to future policy"
    ]
}

In [None]:
# Entity extraction (simple version - in production, use spaCy or LLM)
import re

def extract_entities(text: str) -> dict:
    """Extract entities from text."""
    
    # Economic indicators (simple regex matching)
    indicators = []
    indicator_patterns = [
        r"\b(inflation|CPI|unemployment|GDP|jobs?|employment)\b",
        r"\b(federal funds rate|interest rate|rates?)\b",
        r"\b(Treasury|securities|mortgage-backed)\b"
    ]
    
    for pattern in indicator_patterns:
        matches = re.findall(pattern, text, re.IGNORECASE)
        indicators.extend(set(match.lower() for match in matches))
    
    # Organizations
    organizations = []
    if re.search(r"\b(Committee|FOMC|Federal Reserve|Fed)\b", text, re.IGNORECASE):
        organizations.append("Federal Reserve")
        organizations.append("FOMC")
    
    # Policy actions
    actions = []
    if re.search(r"maintain.*target range", text, re.IGNORECASE):
        actions.append("maintain_rates")
    if re.search(r"reduc.*holdings", text, re.IGNORECASE):
        actions.append("quantitative_tightening")
    
    return {
        "indicators": list(set(indicators)),
        "organizations": list(set(organizations)),
        "policy_actions": actions
    }

entities = extract_entities(cleaned_statement['content'])
cleaned_statement['entities'] = entities

print("üè¢ Extracted Entities:")
print(json.dumps(entities, indent=2))

## Stage 4: Chunking (LLM Optimization)

Split document into chunks for vector embedding.

In [None]:
# Chunking strategy
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tiktoken

def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
    """Count tokens in text."""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def chunk_document(doc: dict, chunk_size: int = 800, chunk_overlap: int = 100) -> list:
    """Chunk document for LLM retrieval."""
    
    # Prepend metadata for context
    metadata_prefix = f"""Source: {doc['source_name']} | Date: {doc['published_at']} | Type: {doc['content_type']}
Themes: {', '.join(doc['macro_themes'])}

"""
    
    # Initialize splitter
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", ". ", " ", ""],
        length_function=count_tokens
    )
    
    # Split content
    content_chunks = splitter.split_text(doc['content'])
    
    # Create chunk objects
    chunks = []
    for i, chunk_text in enumerate(content_chunks):
        # Prepend metadata
        full_chunk = metadata_prefix + chunk_text
        
        chunks.append({
            "chunk_id": f"{doc.get('id', 'doc')}_chunk_{i}",
            "chunk_index": i,
            "text": full_chunk,
            "token_count": count_tokens(full_chunk),
            # Inherit metadata from parent
            "published_at": doc['published_at'],
            "source_name": doc['source_name'],
            "content_type": doc['content_type'],
            "macro_themes": doc['macro_themes'],
            "geography": doc['geography'],
            "importance": doc['importance']
        })
    
    return chunks

# Chunk the FOMC statement
chunks = chunk_document(cleaned_statement)

print(f"‚úÇÔ∏è Created {len(chunks)} chunks\n")
for i, chunk in enumerate(chunks):
    print(f"Chunk {i}:")
    print(f"  Tokens: {chunk['token_count']}")
    print(f"  Preview: {chunk['text'][:150]}...\n")

## Stage 5: Embedding & Indexing (Vector Database)

Generate embeddings and prepare for vector search.

In [None]:
# Generate embeddings (using OpenAI ada-002)
from openai import OpenAI

client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

def generate_embedding(text: str, model: str = "text-embedding-ada-002") -> list:
    """Generate embedding for text."""
    response = client.embeddings.create(
        input=text,
        model=model
    )
    return response.data[0].embedding

# Generate embeddings for all chunks
print("Generating embeddings...")
for chunk in chunks:
    chunk['embedding'] = generate_embedding(chunk['text'])
    print(f"  ‚úÖ Chunk {chunk['chunk_index']}: {len(chunk['embedding'])} dimensions")

print(f"\n‚úÖ Generated embeddings for {len(chunks)} chunks")

## Unified Document JSON (Final Output)

This is what gets stored in the database.

In [None]:
# Assemble final document
import uuid

final_document = {
    "id": str(uuid.uuid4()),
    "url": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20240320a.htm",
    "created_at": datetime.now().isoformat(),
    
    # Content
    "title": cleaned_statement['title'],
    "content": cleaned_statement['content'],
    "summary": "Fed maintains rates at 5.25-5.50%, noting inflation has eased but remains elevated. Data-dependent approach continues.",
    "language": "en",
    
    # Source
    "source_name": cleaned_statement['source_name'],
    "source_type": "central_bank",
    "source_credibility": 1.0,
    "is_paywalled": False,
    
    # Temporal
    "published_at": "2024-03-20T14:00:00Z",
    "ingested_at": datetime.now().isoformat(),
    "effective_date": "2024-03-20",
    
    # Classification
    "content_type": cleaned_statement['content_type'],
    "macro_themes": cleaned_statement['macro_themes'],
    "geography": cleaned_statement['geography'],
    "asset_classes": ["equities", "fixed_income", "fx"],
    "sentiment": cleaned_statement['sentiment'],
    "importance": cleaned_statement['importance'],
    
    # Entities
    "entities": cleaned_statement['entities'],
    
    # AI Analysis
    "ai_analysis": cleaned_statement['ai_analysis'],
    
    # Chunks (without embeddings for display)
    "chunks": [
        {k: v for k, v in chunk.items() if k != 'embedding'}
        for chunk in chunks
    ],
    
    # Provenance
    "ingestion_method": "web_scraping",
    "processing_version": "v1.0",
    "verification_status": "verified",
    "human_reviewed": False
}

print("üì¶ Final Document Structure:")
print(json.dumps(final_document, indent=2)[:2000] + "...")

## Storage (Supabase Insert)

Insert into Supabase database.

In [None]:
# Supabase insertion (pseudocode - requires supabase-py)
"""
from supabase import create_client

# Initialize Supabase client
supabase = create_client(
    os.getenv('SUPABASE_URL'),
    os.getenv('SUPABASE_KEY')
)

# Insert document
response = supabase.table('documents').insert(final_document).execute()

# Insert chunks
for chunk in chunks:
    chunk_data = {
        'document_id': final_document['id'],
        'chunk_index': chunk['chunk_index'],
        'text': chunk['text'],
        'embedding': chunk['embedding'],
        'token_count': chunk['token_count'],
        # ... metadata
    }
    supabase.table('chunks').insert(chunk_data).execute()

print("‚úÖ Document and chunks inserted into database")
"""

print("‚ö†Ô∏è Supabase insertion code (requires database setup)")

## Retrieval Test (Semantic Search)

Test vector similarity search.

In [None]:
# Simulate vector search
from numpy.linalg import norm

def cosine_similarity(vec1, vec2):
    """Calculate cosine similarity between two vectors."""
    return np.dot(vec1, vec2) / (norm(vec1) * norm(vec2))

# User query
user_query = "What is the Fed's stance on interest rates?"

# Generate query embedding
query_embedding = generate_embedding(user_query)

# Calculate similarity with each chunk
similarities = []
for chunk in chunks:
    similarity = cosine_similarity(query_embedding, chunk['embedding'])
    similarities.append({
        'chunk_index': chunk['chunk_index'],
        'similarity': similarity,
        'text': chunk['text'][:200]  # Preview
    })

# Sort by similarity
similarities.sort(key=lambda x: x['similarity'], reverse=True)

print(f"üîç Query: '{user_query}'\n")
print("Top 2 most relevant chunks:\n")
for i, result in enumerate(similarities[:2]):
    print(f"Rank {i+1}:")
    print(f"  Similarity: {result['similarity']:.4f}")
    print(f"  Text: {result['text']}...\n")

## Summary

### What We Demonstrated

1. ‚úÖ **Ingestion**: Fetched FRED data and parsed FOMC statement
2. ‚úÖ **Cleaning**: Structured raw data into unified format
3. ‚úÖ **Enrichment**: Added sentiment, entities, AI analysis
4. ‚úÖ **Chunking**: Split into LLM-optimized chunks with metadata
5. ‚úÖ **Embedding**: Generated vector embeddings
6. ‚úÖ **Retrieval**: Tested semantic search

### Next Steps

1. Implement full ingestion modules in `src/ingestion/`
2. Set up Supabase database with schema from `DATA_SCHEMA.md`
3. Build processing pipeline in `src/ingestion/processors.py`
4. Implement vector store in `src/knowledge/vector_store.py`
5. Test with more data sources (RSS, research PDFs)

### Key Insights

- **Metadata is crucial**: Prepending context improves retrieval
- **Chunking strategy matters**: Balance between context and specificity
- **Enrichment adds value**: Sentiment and entities help filtering
- **Human-readable + LLM-optimized**: Store both formats

---

**Ready to implement the full pipeline!** üöÄ