## Section 1: Data Scheduler Setup

In [None]:
import sys
sys.path.insert(0, '/Users/ajaiupadhyaya/Documents/Models')

from core.pipeline import DataScheduler, UpdateFrequency, UpdateJobBuilder
from core.data_fetcher import DataFetcher
import pandas as pd
import yfinance as yf
from datetime import datetime
import time

# Initialize scheduler
scheduler = DataScheduler()
print("DataScheduler initialized")

## Section 2: Create Stock Data Update Job

In [None]:
# Define function to update stock data
def update_stock_data():
    """
    Fetch latest stock data for portfolio
    """
    try:
        portfolio = ['AAPL', 'MSFT', 'GOOGL', 'TSLA', 'NVDA']
        data = {}
        
        for ticker in portfolio:
            df = yf.download(ticker, period='1mo', progress=False)
            data[ticker] = df
        
        print(f"Updated stock data for {len(data)} securities at {datetime.now()}")
        return data
    
    except Exception as e:
        print(f"Error updating stock data: {str(e)}")
        raise

# Create job using builder
stock_job = UpdateJobBuilder.stock_data_update(
    'STOCK_UPDATE_DAILY',
    update_stock_data,
    UpdateFrequency.DAILY
)

scheduler.add_job(stock_job)
print(f"Added job: {stock_job.job_id}")
print(f"Status: {stock_job.get_status()}")

## Section 3: Create Economic Data Update Job

In [None]:
# Define function to update economic data
def update_economic_data():
    """
    Fetch economic indicators from FRED
    """
    try:
        from fredapi import Fred
        import os
        
        fred = Fred(api_key=os.getenv('FRED_API_KEY'))
        
        # Key indicators
        indicators = {
            'GDP': 'A191RL1Q225SBEA',
            'Unemployment': 'UNRATE',
            'Inflation': 'CPIAUCSL',
            'Fed Funds Rate': 'FEDFUNDS'
        }
        
        data = {}
        for name, series_id in indicators.items():
            try:
                series = fred.get_series(series_id, observation_start='2023-01-01')
                data[name] = series
            except:
                pass
        
        print(f"Updated {len(data)} economic indicators at {datetime.now()}")
        return data
    
    except Exception as e:
        print(f"Error updating economic data: {str(e)}")
        raise

# Create job - less frequent (weekly)
econ_job = UpdateJobBuilder.economic_data_update(
    'ECON_UPDATE_WEEKLY',
    update_economic_data,
    UpdateFrequency.WEEKLY
)

scheduler.add_job(econ_job)
print(f"Added job: {econ_job.job_id}")

## Section 4: Create Portfolio Rebalance Job

In [None]:
# Define portfolio rebalance function
def rebalance_portfolio():
    """
    Check portfolio allocation and rebalance if drifted
    """
    try:
        target_allocation = {
            'AAPL': 0.25,
            'MSFT': 0.25,
            'GOOGL': 0.20,
            'TSLA': 0.15,
            'NVDA': 0.15
        }
        
        # Get current prices
        portfolio_data = update_stock_data()
        
        # Calculate current allocation
        current_prices = {}
        for ticker, df in portfolio_data.items():
            current_prices[ticker] = df['Close'].iloc[-1]
        
        # Check if rebalancing needed (threshold = 5% drift)
        print(f"Portfolio rebalance check at {datetime.now()}")
        print(f"Current prices: {current_prices}")
        print(f"Target allocation: {target_allocation}")
        
        return {'current_prices': current_prices, 'target': target_allocation}
    
    except Exception as e:
        print(f"Error rebalancing portfolio: {str(e)}")
        raise

# Create job - monthly rebalancing
rebalance_job = UpdateJobBuilder.portfolio_rebalance(
    'PORTFOLIO_REBALANCE_MONTHLY',
    rebalance_portfolio,
    UpdateFrequency.MONTHLY
)

scheduler.add_job(rebalance_job)
print(f"Added job: {rebalance_job.job_id}")

## Section 5: Schedule Management

In [None]:
# View all scheduled jobs
status = scheduler.get_status()
print("\nScheduler Status:")
print(f"Running: {status['is_running']}")
print(f"Active jobs: {status['total_jobs']}")
print(f"\nJobs:")
for job_id, job_status in status['jobs'].items():
    print(f"  {job_id}:")
    print(f"    Frequency: {job_status['frequency']}")
    print(f"    Executions: {job_status['success_count']}")
    print(f"    Errors: {job_status['error_count']}")
    if job_status['last_run']:
        print(f"    Last run: {job_status['last_run']}")

## Section 6: Alert System Setup

In [None]:
from core.pipeline import AlertSystem, AlertSeverity, AlertCondition

# Initialize alert system
alert_system = AlertSystem()

# Create price alerts
# Alert if AAPL drops below $150
alert_system.create_price_alert(
    asset='AAPL',
    alert_type='below',
    threshold=150,
    severity=AlertSeverity.WARNING
)

# Alert if MSFT rises above $400
alert_system.create_price_alert(
    asset='MSFT',
    alert_type='above',
    threshold=400,
    severity=AlertSeverity.INFO
)

# Alert on technical signals
alert_system.create_technical_alert(
    asset='TSLA',
    technical_signal='rsi_overbought',
    severity=AlertSeverity.WARNING
)

alert_system.create_technical_alert(
    asset='NVDA',
    technical_signal='rsi_oversold',
    severity=AlertSeverity.WARNING
)

print(f"Created {len(alert_system.rules)} alert rules")
for rule_id, rule in alert_system.rules.items():
    print(f"  {rule.name}")

## Section 7: Manual Alert Evaluation

In [None]:
# Simulate current market data
import yfinance as yf

# Fetch real data
tickers = ['AAPL', 'MSFT', 'TSLA', 'NVDA']
data = {}

for ticker in tickers:
    try:
        price = yf.Ticker(ticker).info.get('currentPrice')
        if price:
            data[ticker] = price
    except:
        pass

if data:
    print("Current prices:")
    for ticker, price in data.items():
        print(f"  {ticker}: ${price:.2f}")
    
    # Evaluate alerts
    triggered = alert_system.evaluate_all(data)
    
    if triggered:
        print(f"\nTriggered {len(triggered)} alerts:")
        for alert in triggered:
            print(f"  [{alert.severity.value}] {alert.message}")
    else:
        print("\nNo alerts triggered")
else:
    print("Could not fetch real-time data. Using simulated data...")
    
    # Simulated data for demonstration
    simulated_data = {
        'AAPL': 145.50,  # Below 150 - should trigger alert
        'MSFT': 385.20,
        'TSLA': 245.30,
        'NVDA': 875.50
    }
    
    print("Simulated prices:")
    for ticker, price in simulated_data.items():
        print(f"  {ticker}: ${price:.2f}")
    
    triggered = alert_system.evaluate_all(simulated_data)
    
    if triggered:
        print(f"\nTriggered {len(triggered)} alerts:")
        for alert in triggered:
            print(f"  [{alert.severity.value}] {alert.message}")

## Section 8: Create Custom Alert Rule

In [None]:
from core.pipeline import AlertRule

# Define custom check function
def price_gap_check(current_value, threshold):
    """
    Check if price has gapped beyond threshold
    """
    return abs(current_value) > threshold

# Create custom rule
custom_rule = AlertRule(
    rule_id='CUSTOM_GAP_ALERT',
    name='Large price gap detector',
    asset='SPY',
    condition=AlertCondition.CUSTOM_FUNCTION,
    threshold=5.0,  # 5% gap
    severity=AlertSeverity.CRITICAL,
    check_function=price_gap_check
)

alert_system.add_rule(custom_rule)
print("Added custom alert rule: Large price gap detector")

## Section 9: Alert History and Management

In [None]:
# Get active alerts
active_alerts = alert_system.get_active_alerts()
print(f"Active alerts: {len(active_alerts)}")

if active_alerts:
    for alert_data in active_alerts:
        print(f"  - [{alert_data['severity']}] {alert_data['message']}")

# Get alert history
history = alert_system.get_alert_history(hours=24)
print(f"\nAlerts in last 24 hours: {len(history)}")

# Acknowledge an alert
if alert_system.alerts:
    first_alert = alert_system.alerts[0]
    alert_system.acknowledge_alert(first_alert.alert_id)
    print(f"\nAcknowledged alert: {first_alert.alert_id}")

## Section 10: Data Quality Monitoring

In [None]:
from core.pipeline import DataQualityMonitor, DataValidator
import numpy as np

# Initialize monitor and validator
quality_monitor = DataQualityMonitor()
validator = DataValidator()

# Create sample OHLC data
dates = pd.date_range('2024-01-01', periods=100, freq='D')
sample_ohlc = pd.DataFrame({
    'Date': dates,
    'Open': np.random.uniform(150, 160, 100),
    'High': np.random.uniform(160, 170, 100),
    'Low': np.random.uniform(140, 150, 100),
    'Close': np.random.uniform(150, 160, 100),
    'Volume': np.random.uniform(1000000, 10000000, 100)
})

# Ensure OHLC constraints
for i in range(len(sample_ohlc)):
    high = max(sample_ohlc.loc[i, 'Open'], sample_ohlc.loc[i, 'Close'], sample_ohlc.loc[i, 'High'])
    low = min(sample_ohlc.loc[i, 'Open'], sample_ohlc.loc[i, 'Close'], sample_ohlc.loc[i, 'Low'])
    sample_ohlc.loc[i, 'High'] = high
    sample_ohlc.loc[i, 'Low'] = low

sample_ohlc.set_index('Date', inplace=True)

# Validate OHLC
ohlc_validation = validator.validate_ohlc(sample_ohlc)
print("OHLC Validation:")
print(f"  Valid: {ohlc_validation['valid']}")
if ohlc_validation['issues']:
    for issue in ohlc_validation['issues']:
        print(f"  Issue: {issue}")

## Section 11: Data Quality Metrics

In [None]:
# Evaluate data quality
metrics = quality_monitor.evaluate_quality(sample_ohlc, 'AAPL_OHLC')

print("Data Quality Metrics:")
print(f"  Completeness: {metrics.completeness:.2f}%")
print(f"  Validity: {metrics.validity:.2f}%")
print(f"  Consistency: {metrics.consistency:.2f}%")
print(f"  Timeliness: {metrics.timeliness:.2f}%")
print(f"  Overall Accuracy: {metrics.accuracy:.2f}%")

# Get quality report
report = quality_monitor.get_quality_report('AAPL_OHLC')
print("\nQuality Report:")
print(f"  Dataset: {report['dataset']}")
print(f"  Overall Accuracy: {report['latest_metrics']['overall_accuracy']:.2f}%")
if report['alerts']:
    print(f"  Alerts:")
    for alert in report['alerts']:
        print(f"    - {alert}")

## Section 12: Data Profile Analysis

In [None]:
# Generate data profile
profile = quality_monitor.data_profile(sample_ohlc)

print("Data Profile:")
print(f"  Shape: {profile['shape']}")
print(f"  Memory usage: {profile['memory_usage']:.4f} MB")
print(f"  Duplicates: {profile['duplicates']}")

print("\nColumn Statistics:")
for col, stats in profile['columns'].items():
    print(f"  {col}:")
    print(f"    Type: {stats['dtype']}")
    print(f"    Non-null: {stats['non_null']}")
    if 'mean' in stats:
        print(f"    Mean: {stats['mean']:.4f}")
        print(f"    Std: {stats['std']:.4f}")

## Section 13: Integrated Workflow Example

In [None]:
print("\n" + "="*60)
print("INTEGRATED PIPELINE WORKFLOW")
print("="*60)

# 1. Run stock update job manually
print("\n1. Running stock data update...")
try:
    scheduler.manual_run('STOCK_UPDATE_DAILY')
    print("   ✓ Stock data updated")
except:
    print("   ✗ Could not update stock data")

# 2. Fetch data and validate
print("\n2. Validating data quality...")
metrics = quality_monitor.evaluate_quality(sample_ohlc, 'Live_Data')
print(f"   Completeness: {metrics.completeness:.1f}%")
print(f"   Validity: {metrics.validity:.1f}%")

# 3. Check for price alerts
print("\n3. Evaluating price alerts...")
test_data = {'AAPL': 148.50, 'MSFT': 410.20}
alerts = alert_system.evaluate_all(test_data)
print(f"   Triggered {len(alerts)} alerts")
for alert in alerts:
    print(f"   [{alert.severity.value}] {alert.message}")

# 4. Generate status report
print("\n4. Pipeline Status Report:")
status = scheduler.get_status()
print(f"   Active jobs: {status['total_jobs']}")
for job_id, job_status in status['jobs'].items():
    print(f"   - {job_id}: {job_status['success_count']} runs, {job_status['error_count']} errors")

print("\n" + "="*60)
print("PIPELINE WORKFLOW COMPLETE")
print("="*60)