In [1]:
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath('.')))

# Essential imports
import pandas as pd
import sqlalchemy
from datetime import datetime, date, timedelta
import requests
import json
import time
from tqdm import tqdm

# Local imports
from src.database import get_database_connection, get_api_key

print("🚀 Phase 1E: Historical News Collection - Ready!")
print("📊 Mission: Build real historical news pipeline for strategy validation")


🚀 Phase 1E: Historical News Collection - Ready!
📊 Mission: Build real historical news pipeline for strategy validation


In [2]:
# Create decoupled database schema
def create_decoupled_schema():
    """Create new tables for decoupled news collection and sentiment analysis"""
    
    schema_sql = """
    -- Raw news articles storage
    CREATE TABLE IF NOT EXISTS raw_news_articles (
        id SERIAL PRIMARY KEY,
        symbol_id INTEGER REFERENCES symbols(id),
        article_date DATE NOT NULL,
        title TEXT NOT NULL,
        content TEXT,
        summary TEXT,
        source VARCHAR(100),
        url TEXT,
        published_at TIMESTAMP,
        relevance_score DECIMAL(3,2),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        UNIQUE(url, symbol_id)
    );
    
    -- Processed sentiment results
    CREATE TABLE IF NOT EXISTS processed_sentiment (
        id SERIAL PRIMARY KEY,
        symbol_id INTEGER REFERENCES symbols(id),
        analysis_date DATE NOT NULL,
        smo_score DECIMAL(3,2),
        smd_score DECIMAL(3,2),
        smc_score DECIMAL(3,2),
        sms_score DECIMAL(3,2),
        sdc_score DECIMAL(3,2),
        articles_analyzed INTEGER,
        confidence_score DECIMAL(3,2),
        analysis_summary TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        UNIQUE(symbol_id, analysis_date)
    );
    
    -- Create indexes
    CREATE INDEX IF NOT EXISTS idx_raw_news_symbol_date ON raw_news_articles(symbol_id, article_date);
    CREATE INDEX IF NOT EXISTS idx_processed_sentiment_date ON processed_sentiment(symbol_id, analysis_date);
    """
    
    try:
        engine = get_database_connection()
        with engine.connect() as conn:
            conn.execute(sqlalchemy.text(schema_sql))
            conn.commit()
        
        print("✅ Decoupled database schema created successfully")
        return True
    except Exception as e:
        print(f"❌ Error creating schema: {e}")
        return False

# Create the new schema
create_decoupled_schema()


✅ Decoupled database schema created successfully


True

In [3]:
class GDELTNewsCollector:
    """Robust GDELT collector with improved error handling and retries"""
    
    def __init__(self, max_retries=3, base_delay=2):
        self.base_url = "https://api.gdeltproject.org/api/v2/doc/doc"
        self.symbols = ['INTC', 'AMD', 'NVDA']
        self.max_retries = max_retries
        self.base_delay = base_delay
        
        # Company name mappings for better queries
        self.company_names = {
            'INTC': 'Intel Corporation',
            'AMD': 'Advanced Micro Devices',
            'NVDA': 'NVIDIA Corporation'
        }
        
    def fetch_historical_news(self, symbol, start_date, end_date, max_records=50):
        """Fetch historical news with robust error handling and retries"""
        
        print(f"🔍 Collecting news for {symbol} from {start_date.date()} to {end_date.date()}")
        
        # Try simplified query first, then fallback to complex query
        queries = [
            symbol,  # Simple symbol query
            f'{symbol} OR "{self.company_names.get(symbol, symbol)}"',  # Symbol + company name
        ]
        
        for attempt, query in enumerate(queries, 1):
            print(f"📡 Attempt {attempt}: Using query '{query}'")
            
            articles = self._fetch_with_retries(query, start_date, end_date, max_records)
            
            if articles:
                print(f"✅ Successfully collected {len(articles)} articles for {symbol}")
                return articles
            else:
                print(f"⚠️  Query attempt {attempt} failed, trying next approach...")
                time.sleep(2)  # Brief pause between different query attempts
        
        print(f"❌ All query attempts failed for {symbol}")
        return []
    
    def _fetch_with_retries(self, query, start_date, end_date, max_records):
        """Fetch data with exponential backoff retries"""
        
        for retry in range(self.max_retries):
            try:
                # Progressive timeout increase
                timeout = 30 + (retry * 15)  # 30s, 45s, 60s
                
                params = {
                    'query': query,
                    'mode': 'artlist',
                    'maxrecords': max_records,
                    'startdatetime': start_date.strftime('%Y%m%d000000'),  # Simplified time format
                    'enddatetime': end_date.strftime('%Y%m%d235959'),
                    'sort': 'datedesc',
                    'format': 'json'
                    # Removed theme filter - might be causing issues
                }
                
                print(f"🌐 Making GDELT request (retry {retry + 1}/{self.max_retries}, timeout: {timeout}s)")
                
                response = requests.get(
                    self.base_url, 
                    params=params, 
                    timeout=timeout,
                    headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'}
                )
                
                if response.status_code == 200:
                    try:
                        data = response.json()
                        articles = data.get('articles', [])
                        
                        if articles:
                            print(f"📰 GDELT returned {len(articles)} articles")
                            return self._process_gdelt_articles(articles, query.split()[0])  # Use first word as symbol
                        else:
                            print("⚠️  GDELT returned empty articles list")
                            return []
                            
                    except json.JSONDecodeError as e:
                        print(f"⚠️  JSON decode error: {e}")
                        continue
                        
                elif response.status_code == 429:
                    print("⚠️  Rate limited by GDELT")
                    delay = self.base_delay * (2 ** retry)
                    print(f"⏳ Waiting {delay} seconds before retry...")
                    time.sleep(delay)
                    continue
                    
                else:
                    print(f"⚠️  GDELT API error: {response.status_code} - {response.text[:100]}")
                    
            except requests.exceptions.Timeout:
                print(f"⚠️  Request timeout (attempt {retry + 1})")
                
            except requests.exceptions.ConnectionError as e:
                print(f"⚠️  Connection error: {str(e)[:100]}")
                
            except Exception as e:
                print(f"⚠️  Unexpected error: {str(e)[:100]}")
            
            # Exponential backoff between retries
            if retry < self.max_retries - 1:
                delay = self.base_delay * (2 ** retry)
                print(f"⏳ Backing off for {delay} seconds...")
                time.sleep(delay)
        
        return []  # All retries failed
    
    def _process_gdelt_articles(self, articles, symbol):
        """Process GDELT articles into our format"""
        processed = []
        
        for article in articles:
            try:
                processed_article = {
                    'symbol': symbol,
                    'title': article.get('title', ''),
                    'url': article.get('url', ''),
                    'published_at': datetime.strptime(article.get('seendate', ''), '%Y%m%dT%H%M%SZ'),
                    'source': article.get('domain', ''),
                    'relevance_score': self._calculate_relevance(article.get('title', ''), symbol)
                }
                processed.append(processed_article)
            except Exception as e:
                continue  # Skip malformed articles
                
        return processed
    
    def _calculate_relevance(self, title, symbol):
        """Calculate relevance score for an article"""
        title_lower = title.lower()
        score = 0.5  # Base score
        
        # Boost for direct mentions
        if symbol.lower() in title_lower:
            score += 0.3
        if 'semiconductor' in title_lower or 'chip' in title_lower:
            score += 0.2
        if 'earnings' in title_lower or 'revenue' in title_lower:
            score += 0.3
            
        return min(1.0, score)

# Initialize collector
gdelt_collector = GDELTNewsCollector()
print("✅ GDELT News Collector ready!")


✅ GDELT News Collector ready!


In [4]:
def store_raw_news_articles(articles):
    """Store raw news articles in database"""
    if not articles:
        return 0
    
    try:
        engine = get_database_connection()
        stored_count = 0
        
        with engine.connect() as conn:
            for article in articles:
                # Get symbol_id
                symbol_query = "SELECT id FROM symbols WHERE symbol = :symbol"
                symbol_result = conn.execute(sqlalchemy.text(symbol_query), {'symbol': article['symbol']})
                symbol_row = symbol_result.fetchone()
                
                if not symbol_row:
                    continue
                    
                symbol_id = symbol_row[0]
                
                # Store article
                insert_query = """
                INSERT INTO raw_news_articles 
                (symbol_id, article_date, title, url, published_at, source, relevance_score)
                VALUES (:symbol_id, :article_date, :title, :url, :published_at, :source, :relevance_score)
                ON CONFLICT (url, symbol_id) DO NOTHING
                """
                
                conn.execute(sqlalchemy.text(insert_query), {
                    'symbol_id': symbol_id,
                    'article_date': article['published_at'].date(),
                    'title': article['title'][:500],  # Truncate if too long
                    'url': article['url'],
                    'published_at': article['published_at'],
                    'source': article['source'],
                    'relevance_score': article['relevance_score']
                })
                stored_count += 1
            
            conn.commit()
        
        print(f"✅ Stored {stored_count} new articles")
        return stored_count
        
    except Exception as e:
        print(f"❌ Error storing articles: {e}")
        return 0

def collect_historical_news_batch(symbol, days_back=30, chunk_size=7):
    """Collect historical news for a symbol in smaller chunks to avoid timeouts"""
    end_date = datetime.now()
    start_date = end_date - timedelta(days=days_back)
    
    print(f"🔍 Collecting {days_back} days of news for {symbol} in {chunk_size}-day chunks...")
    
    total_stored = 0
    current_date = start_date
    
    while current_date < end_date:
        chunk_end = min(current_date + timedelta(days=chunk_size), end_date)
        
        print(f"📅 Processing chunk: {current_date.date()} to {chunk_end.date()}")
        
        # Collect news for this chunk
        articles = gdelt_collector.fetch_historical_news(
            symbol, current_date, chunk_end, max_records=100
        )
        
        # Store articles
        if articles:
            stored = store_raw_news_articles(articles)
            total_stored += stored
            print(f"📦 Chunk result: {stored} articles stored")
        else:
            print("📦 Chunk result: No articles found")
        
        # Move to next chunk
        current_date = chunk_end
        
        # Rate limiting between chunks (be respectful to GDELT)
        if current_date < end_date:
            print("⏳ Pausing 3 seconds between chunks...")
            time.sleep(3)
    
    print(f"🎉 Total collection complete: {total_stored} articles stored for {symbol}")
    return total_stored

def collect_historical_news_robust():
    """Robust collection with better error handling"""
    
    print("🚀 Starting robust GDELT historical news collection...")
    print("=" * 60)
    
    collection_results = {}
    
    for symbol in ['INTC', 'AMD', 'NVDA']:
        print(f"\\n🎯 Processing {symbol}...")
        
        try:
            # Start with a smaller test (7 days)
            count = collect_historical_news_batch(symbol, days_back=7, chunk_size=3)
            collection_results[symbol] = count
            
            print(f"✅ {symbol} collection complete: {count} articles")
            
        except Exception as e:
            print(f"❌ {symbol} collection failed: {e}")
            collection_results[symbol] = 0
        
        # Rate limiting between symbols
        print("⏳ Pausing 5 seconds between symbols...")
        time.sleep(5)
    
    print(f"\\n🎉 GDELT collection summary: {collection_results}")
    print(f"📊 Total articles collected: {sum(collection_results.values())}")
    
    return collection_results

print("✅ News collection and storage functions ready!")


✅ News collection and storage functions ready!


In [5]:
from openai import OpenAI

class AggressiveSentimentProcessor:
    """Optimized sentiment processing with aggressive rate limiting"""
    
    def __init__(self, base_delay=15, max_delay=300):
        self.openai_key = get_api_key('openai')
        self.client = OpenAI(api_key=self.openai_key) if self.openai_key else None
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.consecutive_failures = 0
        
    def process_batch_sentiment(self, symbol, process_date, max_articles=10):
        """Process sentiment for all articles on a given date"""
        
        if not self.client:
            print("❌ OpenAI client not configured")
            return None
            
        # Get articles for the date
        articles = self._get_articles_for_date(symbol, process_date, max_articles)
        
        if not articles:
            print(f"⚠️  No articles found for {symbol} on {process_date}")
            return None
            
        # Create batch prompt
        prompt = self._create_batch_prompt(symbol, articles, process_date)
        
        # Process with aggressive rate limiting
        return self._request_with_backoff(prompt, symbol, process_date, len(articles))
    
    def _get_articles_for_date(self, symbol, process_date, max_articles):
        """Get articles for a specific date"""
        try:
            engine = get_database_connection()
            
            query = """
            SELECT rna.title, rna.source, rna.relevance_score
            FROM raw_news_articles rna
            JOIN symbols s ON rna.symbol_id = s.id
            WHERE s.symbol = :symbol AND rna.article_date = :process_date
            ORDER BY rna.relevance_score DESC
            LIMIT :max_articles
            """
            
            result = engine.execute(sqlalchemy.text(query), {
                'symbol': symbol,
                'process_date': process_date,
                'max_articles': max_articles
            })
            
            return [{'title': row[0], 'source': row[1], 'relevance': row[2]} for row in result]
            
        except Exception as e:
            print(f"❌ Error getting articles: {e}")
            return []
    
    def _create_batch_prompt(self, symbol, articles, date):
        """Create optimized batch processing prompt"""
        
        articles_text = "\\n".join([f"- {art['title']} (Source: {art['source']})" for art in articles])
        
        return f"""
You are analyzing financial news sentiment for {symbol} on {date}.

NEWS ARTICLES:
{articles_text}

Provide sentiment analysis as JSON with scores from -1.0 to 1.0:
{{
    "smo": 0.0,  // Market open impact
    "smd": 0.0,  // Mid-day impact  
    "smc": 0.0,  // Market close impact
    "sms": 0.0,  // Semiconductor sector impact
    "sdc": 0.0,  // Direct competitor impact
    "confidence": 0.8,  // Analysis confidence
    "summary": "Brief analysis summary"
}}
"""
    
    def _request_with_backoff(self, prompt, symbol, process_date, article_count):
        """Make OpenAI request with exponential backoff"""
        
        while True:
            try:
                response = self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": "You are a financial sentiment analysis expert."},
                        {"role": "user", "content": prompt}
                    ],
                    max_tokens=300,
                    temperature=0.3
                )
                
                # Parse response
                result_text = response.choices[0].message.content.strip()
                sentiment_data = json.loads(result_text)
                
                # Add metadata
                sentiment_data['symbol'] = symbol
                sentiment_data['process_date'] = process_date
                sentiment_data['articles_analyzed'] = article_count
                sentiment_data['tokens_used'] = response.usage.total_tokens
                
                self.consecutive_failures = 0  # Reset on success
                return sentiment_data
                
            except Exception as e:
                print(f"⚠️  OpenAI request failed: {e}")
                delay = min(self.base_delay * (2 ** self.consecutive_failures), self.max_delay)
                print(f"⏳ Backing off for {delay} seconds...")
                time.sleep(delay)
                self.consecutive_failures += 1
                
                if self.consecutive_failures > 5:
                    print("❌ Too many failures, giving up")
                    return None

# Initialize processor
sentiment_processor = AggressiveSentimentProcessor(base_delay=12)  # 12-second base interval
print("✅ Aggressive Sentiment Processor ready!")


✅ Aggressive Sentiment Processor ready!


In [6]:
# Execute robust GDELT collection
print("🚀 TESTING IMPROVED GDELT IMPLEMENTATION")
print("=" * 50)

collection_results = collect_historical_news_robust()

# Verify what we collected
engine = get_database_connection()
verification_query = """
SELECT s.symbol, COUNT(*) as article_count, 
       MIN(rna.article_date) as earliest_date,
       MAX(rna.article_date) as latest_date,
       AVG(rna.relevance_score) as avg_relevance
FROM raw_news_articles rna
JOIN symbols s ON rna.symbol_id = s.id
GROUP BY s.symbol
ORDER BY s.symbol
"""

try:
    verification_df = pd.read_sql(verification_query, engine)
    print("\\n📊 FINAL COLLECTION SUMMARY:")
    print("=" * 40)
    
    total_articles = 0
    for _, row in verification_df.iterrows():
        total_articles += row['article_count']
        print(f"📈 {row['symbol']}: {row['article_count']} articles")
        print(f"   📅 Date range: {row['earliest_date']} to {row['latest_date']}")
        print(f"   🎯 Avg relevance: {row['avg_relevance']:.2f}")
        print()
    
    print(f"🎉 TOTAL SUCCESS: {total_articles} articles collected and stored!")
    
    if total_articles > 0:
        print("\\n✅ GDELT INTEGRATION SUCCESSFUL - Proceeding with sentiment processing")
    else:
        print("\\n⚠️  GDELT COLLECTION FAILED - Consider switching to EOD Historical Data")
        print("    See docs/HISTORICAL_NEWS_STRATEGY.md for backup plan")
        
except Exception as e:
    print(f"\\n❌ Database verification failed: {e}")
    print("⚠️  Consider switching to EOD Historical Data backup plan")


🔍 Collecting 7 days of news for INTC...
❌ Error fetching GDELT data: HTTPSConnectionPool(host='api.gdeltproject.org', port=443): Max retries exceeded with url: /api/v2/doc/doc?query=INTC+OR+%22Intel+Corporation%22+OR+%22Advanced+Micro+Devices%22+OR+%22NVIDIA+Corporation%22&mode=artlist&maxrecords=200&startdatetime=20250621212943&enddatetime=20250628212943&sort=datedesc&format=json&theme=ECON_STOCKMARKET (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x16dcbf290>, 'Connection to api.gdeltproject.org timed out. (connect timeout=30)'))
🔍 Collecting 7 days of news for AMD...
❌ Error fetching GDELT data: HTTPSConnectionPool(host='api.gdeltproject.org', port=443): Max retries exceeded with url: /api/v2/doc/doc?query=AMD+OR+%22Intel+Corporation%22+OR+%22Advanced+Micro+Devices%22+OR+%22NVIDIA+Corporation%22&mode=artlist&maxrecords=200&startdatetime=20250621213015&enddatetime=20250628213015&sort=datedesc&format=json&theme=ECON_STOCKMARKET (Caused by ConnectTimeoutE

In [None]:
def store_processed_sentiment(sentiment_data):
    """Store processed sentiment results"""
    if not sentiment_data:
        return False
        
    try:
        engine = get_database_connection()
        
        with engine.connect() as conn:
            # Get symbol_id
            symbol_query = "SELECT id FROM symbols WHERE symbol = :symbol"
            symbol_result = conn.execute(sqlalchemy.text(symbol_query), {'symbol': sentiment_data['symbol']})
            symbol_row = symbol_result.fetchone()
            
            if not symbol_row:
                return False
                
            symbol_id = symbol_row[0]
            
            # Store sentiment
            insert_query = """
            INSERT INTO processed_sentiment 
            (symbol_id, analysis_date, smo_score, smd_score, smc_score, sms_score, sdc_score,
             articles_analyzed, confidence_score, analysis_summary)
            VALUES (:symbol_id, :analysis_date, :smo, :smd, :smc, :sms, :sdc, :articles, :confidence, :summary)
            ON CONFLICT (symbol_id, analysis_date) DO UPDATE SET
                smo_score = EXCLUDED.smo_score,
                smd_score = EXCLUDED.smd_score,
                smc_score = EXCLUDED.smc_score,
                sms_score = EXCLUDED.sms_score,
                sdc_score = EXCLUDED.sdc_score,
                articles_analyzed = EXCLUDED.articles_analyzed,
                confidence_score = EXCLUDED.confidence_score,
                analysis_summary = EXCLUDED.analysis_summary
            """
            
            conn.execute(sqlalchemy.text(insert_query), {
                'symbol_id': symbol_id,
                'analysis_date': sentiment_data['process_date'],
                'smo': sentiment_data.get('smo', 0.0),
                'smd': sentiment_data.get('smd', 0.0),
                'smc': sentiment_data.get('smc', 0.0),
                'sms': sentiment_data.get('sms', 0.0),
                'sdc': sentiment_data.get('sdc', 0.0),
                'articles': sentiment_data.get('articles_analyzed', 0),
                'confidence': sentiment_data.get('confidence', 0.5),
                'summary': sentiment_data.get('summary', '')[:500]
            })
            
            conn.commit()
        
        return True
        
    except Exception as e:
        print(f"❌ Error storing sentiment: {e}")
        return False

def process_recent_sentiment():
    """Process sentiment for recent dates with news data"""
    
    # Get dates with news data
    engine = get_database_connection()
    
    query = """
    SELECT DISTINCT s.symbol, rna.article_date, COUNT(*) as article_count
    FROM raw_news_articles rna
    JOIN symbols s ON rna.symbol_id = s.id
    WHERE s.symbol IN ('INTC', 'AMD', 'NVDA')
    AND rna.article_date >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY s.symbol, rna.article_date
    HAVING COUNT(*) >= 2
    ORDER BY rna.article_date DESC, s.symbol
    """
    
    results = engine.execute(sqlalchemy.text(query))
    processing_queue = [(row[0], row[1], row[2]) for row in results]
    
    print(f"📊 Found {len(processing_queue)} symbol-date combinations to process")
    
    successful_processes = 0
    
    for symbol, process_date, article_count in tqdm(processing_queue, desc="Processing sentiment"):
        print(f"\\n🔄 Processing {symbol} on {process_date} ({article_count} articles)")
        
        # Process sentiment
        sentiment_data = sentiment_processor.process_batch_sentiment(symbol, process_date)
        
        if sentiment_data:
            if store_processed_sentiment(sentiment_data):
                successful_processes += 1
                print(f"✅ Completed {symbol} on {process_date}")
                print(f"📈 Sentiment scores: SMO={sentiment_data.get('smo', 0):.2f}, SMS={sentiment_data.get('sms', 0):.2f}")
            else:
                print(f"❌ Failed to store {symbol} on {process_date}")
        else:
            print(f"❌ Failed to process {symbol} on {process_date}")
        
        # Aggressive but respectful rate limiting (12-15 seconds)
        time.sleep(12)
    
    print(f"\\n🎉 Successfully processed {successful_processes}/{len(processing_queue)} sentiment analyses")
    return successful_processes

# Execute sentiment processing
if sentiment_processor.client:
    processed_count = process_recent_sentiment()
else:
    print("⚠️  OpenAI API key not configured - skipping sentiment processing")
    processed_count = 0


In [None]:
def validate_historical_pipeline():
    """Validate the historical news and sentiment pipeline"""
    
    engine = get_database_connection()
    
    print("📊 HISTORICAL PIPELINE VALIDATION")
    print("=" * 50)
    
    # Raw news articles count
    news_query = """
    SELECT s.symbol, COUNT(*) as article_count, 
           MIN(rna.article_date) as earliest_date,
           MAX(rna.article_date) as latest_date
    FROM raw_news_articles rna
    JOIN symbols s ON rna.symbol_id = s.id
    GROUP BY s.symbol
    ORDER BY s.symbol
    """
    
    news_df = pd.read_sql(news_query, engine)
    print("\\n📰 RAW NEWS COLLECTION:")
    for _, row in news_df.iterrows():
        print(f"📈 {row['symbol']}: {row['article_count']} articles ({row['earliest_date']} to {row['latest_date']})")
    
    # Processed sentiment count
    sentiment_query = """
    SELECT s.symbol, COUNT(*) as sentiment_count,
           AVG(ps.confidence_score) as avg_confidence,
           AVG(ps.articles_analyzed) as avg_articles_per_day
    FROM processed_sentiment ps
    JOIN symbols s ON ps.symbol_id = s.id
    GROUP BY s.symbol
    ORDER BY s.symbol
    """
    
    try:
        sentiment_df = pd.read_sql(sentiment_query, engine)
        print("\\n🧠 PROCESSED SENTIMENT:")
        for _, row in sentiment_df.iterrows():
            print(f"📊 {row['symbol']}: {row['sentiment_count']} days processed, "
                  f"Avg confidence: {row['avg_confidence']:.2f}, "
                  f"Avg articles/day: {row['avg_articles_per_day']:.1f}")
    except Exception as e:
        print("\\n⚠️  No processed sentiment data yet")
    
    # Sample sentiment scores
    try:
        sample_query = """
        SELECT s.symbol, ps.analysis_date, ps.smo_score, ps.sms_score, ps.confidence_score
        FROM processed_sentiment ps
        JOIN symbols s ON ps.symbol_id = s.id
        ORDER BY ps.analysis_date DESC
        LIMIT 5
        """
        
        sample_df = pd.read_sql(sample_query, engine)
        print("\\n📈 RECENT SENTIMENT SAMPLES:")
        for _, row in sample_df.iterrows():
            print(f"📊 {row['symbol']} on {row['analysis_date']}: "
                  f"SMO={row['smo_score']:.2f}, SMS={row['sms_score']:.2f}, "
                  f"Confidence={row['confidence_score']:.2f}")
    except Exception as e:
        print("\\n⚠️  No sentiment samples available yet")
    
    print("\\n✅ Historical pipeline validation complete!")

# Run validation
validate_historical_pipeline()
