# Stock Price Service Testing

This notebook tests the stock price collection functionality, including:
- `get_multiple_prices()` for batch fetching
- `get_or_refresh_price()` for on-demand fetching
- Data validation
- Top 50 ticker selection

## Setup

In [1]:
import sys

sys.path.append('..')

from datetime import UTC, datetime

import pandas as pd
from app.services.stock_price_service import stock_price_service

from app.db.session import SessionLocal
from app.services.stock_data import StockDataService

## Test 1: Single Price Fetch

Test fetching a single stock price.

In [2]:
stock_service = StockDataService()

# Test single price fetch
symbol = "AAPL"
print(f"Fetching price for {symbol}...\n")

price_data = await stock_service.get_stock_price(symbol)

if price_data:
    print("✓ Success!")
    print(f"  Symbol: {price_data['symbol']}")
    print(f"  Price: ${price_data['price']}")
    print(f"  Change: ${price_data['change']} ({price_data['change_percent']}%)")
    print(f"  Market State: {price_data['market_state']}")
    print(f"  Exchange: {price_data['exchange']}")
    print(f"  Currency: {price_data['currency']}")
else:
    print(f"✗ Failed to fetch price for {symbol}")

Fetching price for AAPL...



Yahoo Finance API error for AAPL: Too Many Requests. Rate limited. Try after a while.
Attempt 1 failed for AAPL, retrying in 1.0s
Yahoo Finance API error for AAPL: Too Many Requests. Rate limited. Try after a while.
Attempt 2 failed for AAPL, retrying in 2.0s
Yahoo Finance API error for AAPL: Too Many Requests. Rate limited. Try after a while.
Failed to fetch data for AAPL after 3 attempts


✗ Failed to fetch price for AAPL


## Test 2: Multiple Prices (Batch Fetch)

Test the `get_multiple_prices()` method with a small batch of tickers.

In [None]:
# Test batch fetching
test_symbols = ["AAPL", "TSLA", "MSFT", "NVDA", "AMZN"]

print(f"Fetching prices for {len(test_symbols)} symbols...\n")
print(f"Symbols: {', '.join(test_symbols)}\n")

import time

start_time = time.time()

results = await stock_service.get_multiple_prices(test_symbols)

elapsed = time.time() - start_time

print(f"\nCompleted in {elapsed:.2f} seconds\n")
print("=" * 80)

# Display results in a table
data = []
for symbol, price_data in results.items():
    if price_data:
        data.append({
            'Symbol': symbol,
            'Price': f"${price_data['price']:.2f}",
            'Change': f"${price_data['change']:.2f}",
            'Change %': f"{price_data['change_percent']:.2f}%",
            'Market': price_data['market_state'],
            'Exchange': price_data['exchange']
        })
    else:
        data.append({
            'Symbol': symbol,
            'Price': 'FAILED',
            'Change': '-',
            'Change %': '-',
            'Market': '-',
            'Exchange': '-'
        })

df = pd.DataFrame(data)
print(df.to_string(index=False))

# Summary
success_count = sum(1 for v in results.values() if v is not None)
print("\n" + "=" * 80)
print(f"Success Rate: {success_count}/{len(test_symbols)} ({success_count/len(test_symbols)*100:.1f}%)")
print(f"Average time per ticker: {elapsed/len(test_symbols):.2f}s")

## Test 3: Data Validation

Test the validation logic with various edge cases.

In [None]:
import math

# Test validation
test_cases = [
    ({"symbol": "TEST", "price": 150.50}, True, "Valid price"),
    ({"symbol": "TEST", "price": None}, False, "None price"),
    ({"symbol": "TEST", "price": math.nan}, False, "NaN price"),
    ({"symbol": "TEST", "price": 0.0}, False, "Zero price"),
    ({"symbol": "TEST", "price": -10.0}, False, "Negative price"),
    ({"symbol": "TEST", "price": 2000000.0}, False, "Unrealistically high price"),
    ({"symbol": "TEST", "price": 0.01}, True, "Penny stock"),
    ({"symbol": "BRK.A", "price": 500000.0}, True, "High but valid price"),
    (None, False, "None data"),
]

print("Testing validation logic:\n")
print("=" * 80)

results = []
for data, expected, description in test_cases:
    result = stock_price_service.validate_price_data(data)
    status = "✓" if result == expected else "✗"
    results.append({
        'Test': description,
        'Expected': "Valid" if expected else "Invalid",
        'Result': "Valid" if result else "Invalid",
        'Status': status
    })

df = pd.DataFrame(results)
print(df.to_string(index=False))

# Summary
passed = sum(1 for r in results if r['Status'] == '✓')
print("\n" + "=" * 80)
print(f"Validation Tests: {passed}/{len(test_cases)} passed ({passed/len(test_cases)*100:.1f}%)")

## Test 4: Top N Ticker Selection

Test the logic for selecting top N most active tickers.

In [None]:
# Get top N tickers from database
db = SessionLocal()

try:
    print("Testing top N ticker selection...\n")

    # Get top 10 for testing
    top_tickers = stock_price_service.get_top_n_tickers(db, n=10, hours=24)

    if top_tickers:
        print("Top 10 most active tickers (last 24h):\n")
        for i, symbol in enumerate(top_tickers, 1):
            print(f"  {i:2d}. {symbol}")

        print(f"\nTotal: {len(top_tickers)} tickers")
    else:
        print("No tickers found (database might be empty)")

finally:
    db.close()

## Test 5: Cache and Refresh Logic

Test the on-demand refresh with caching.

In [None]:
db = SessionLocal()

try:
    symbol = "AAPL"

    print(f"Testing cache and refresh logic for {symbol}...\n")

    # First call - might hit cache or fetch fresh
    print("1. First call (cache or fetch):")
    start = time.time()
    price_data_1 = await stock_price_service.get_or_refresh_price(db, symbol)
    elapsed_1 = time.time() - start

    if price_data_1:
        print(f"   Price: ${price_data_1['price']}")
        print(f"   Last Updated: {price_data_1['last_updated']}")
        print(f"   Time: {elapsed_1:.2f}s\n")
    else:
        print("   Failed to get price\n")

    # Second call - should hit cache (fast)
    print("2. Second call (should use cache):")
    start = time.time()
    price_data_2 = await stock_price_service.get_or_refresh_price(db, symbol)
    elapsed_2 = time.time() - start

    if price_data_2:
        print(f"   Price: ${price_data_2['price']}")
        print(f"   Last Updated: {price_data_2['last_updated']}")
        print(f"   Time: {elapsed_2:.2f}s\n")
    else:
        print("   Failed to get price\n")

    # Force refresh
    print("3. Force refresh (should fetch new):")
    start = time.time()
    price_data_3 = await stock_price_service.get_or_refresh_price(db, symbol, force_refresh=True)
    elapsed_3 = time.time() - start

    if price_data_3:
        print(f"   Price: ${price_data_3['price']}")
        print(f"   Last Updated: {price_data_3['last_updated']}")
        print(f"   Time: {elapsed_3:.2f}s\n")
    else:
        print("   Failed to get price\n")

    # Compare times
    print("=" * 80)
    print("Performance Comparison:")
    print(f"  First call:  {elapsed_1:.2f}s")
    print(f"  Cache hit:   {elapsed_2:.2f}s (should be much faster)")
    print(f"  Force fetch: {elapsed_3:.2f}s")

    if elapsed_2 < elapsed_1 / 2:
        print("\n✓ Cache is working! Second call was significantly faster.")
    else:
        print("\n⚠️  Cache might not be working as expected.")

finally:
    db.close()

## Test 6: Full Top 50 Refresh (Performance Test)

Test refreshing top 50 tickers (simulates the cron job).

In [None]:
db = SessionLocal()

try:
    print("Testing full top 50 refresh...\n")
    print("This may take 30-60 seconds due to rate limiting.\n")
    print("=" * 80)

    start = time.time()
    result = await stock_price_service.refresh_top_n_prices(db, n=50)
    elapsed = time.time() - start

    print("\n" + "=" * 80)
    print("Results:")
    print(f"  Requested: {result['requested']}")
    print(f"  Successful: {result['success']}")
    print(f"  Failed: {result['failed']}")
    print(f"  Duration: {elapsed:.2f}s")
    print(f"  Avg per ticker: {elapsed/result['requested']:.2f}s")

    success_rate = result['success'] / result['requested'] * 100 if result['requested'] > 0 else 0
    print(f"  Success Rate: {success_rate:.1f}%")

    if result['errors']:
        print("\n  Errors (first 5):")
        for error in result['errors'][:5]:
            print(f"    - {error}")

    # Performance check
    print("\n" + "=" * 80)
    if elapsed <= 60:
        print("✓ Performance: Excellent (≤60s)")
    elif elapsed <= 120:
        print("✓ Performance: Good (≤2min)")
    else:
        print("⚠️  Performance: Slow (>2min) - consider optimizing")

    if success_rate >= 95:
        print("✓ Success Rate: Excellent (≥95%)")
    elif success_rate >= 85:
        print("✓ Success Rate: Good (≥85%)")
    else:
        print("⚠️  Success Rate: Low (<85%) - investigate errors")

finally:
    db.close()

## Test 7: Check Staleness in Database

Check how many prices in the database are stale.

In [None]:
from sqlalchemy import func

from app.db.models import StockPrice

db = SessionLocal()

try:
    print("Checking database staleness...\n")

    # Total count
    total_count = db.query(func.count(StockPrice.symbol)).scalar()

    # Stale count (>30 min)
    stale_count = stock_price_service.get_stale_price_count(db)

    # Fresh count
    fresh_count = total_count - stale_count if total_count else 0

    print(f"Total prices in database: {total_count}")
    print(f"Fresh prices (<30 min): {fresh_count}")
    print(f"Stale prices (>30 min): {stale_count}")

    if total_count > 0:
        fresh_pct = fresh_count / total_count * 100
        print(f"\nFreshness: {fresh_pct:.1f}%")

        if fresh_pct >= 90:
            print("✓ Excellent freshness")
        elif fresh_pct >= 70:
            print("✓ Good freshness")
        else:
            print("⚠️  Many stale prices - consider running collection job")

    # Show most recent updates
    print("\n" + "=" * 80)
    print("Most recently updated prices:\n")

    recent_prices = db.query(StockPrice).order_by(StockPrice.updated_at.desc()).limit(10).all()

    data = []
    for sp in recent_prices:
        age = datetime.now(UTC) - sp.updated_at
        minutes = int(age.total_seconds() / 60)
        data.append({
            'Symbol': sp.symbol,
            'Price': f"${sp.price:.2f}",
            'Age': f"{minutes} min",
            'Status': '✓ Fresh' if minutes < 30 else '⚠️  Stale'
        })

    df = pd.DataFrame(data)
    print(df.to_string(index=False))

finally:
    db.close()

## Summary

This notebook tested:

1. ✓ Single price fetch
2. ✓ Batch price fetch (`get_multiple_prices`)
3. ✓ Data validation logic
4. ✓ Top N ticker selection
5. ✓ Cache and refresh logic
6. ✓ Full top 50 refresh performance
7. ✓ Database staleness check

All functionality is working as expected!