In [3]:
# Cell 1: Core imports and configuration
import os
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
import json
import time
from dataclasses import dataclass
import numpy as np
import pandas as pd

# Core libraries
import yfinance as yf
import requests
from alpha_vantage.timeseries import TimeSeries
from alpha_vantage.fundamentaldata import FundamentalData
import torch
import torch.nn as nn
from transformers import pipeline
import sendgrid
from sendgrid.helpers.mail import Mail
from supabase import create_client, Client
import google.generativeai as genai

# CrewAI imports
from crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
from langchain.tools import tool

# Configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Market Coverage
TICKERS = [
    "AAPL", "MSFT", "NVDA", "AMZN", "META", "GOOGL", "GOOG", "TSLA", "AVGO", "TSM",
    "COST", "ADBE", "PEP", "CSCO", "ACN", "TMUS", "CMCSA", "AMD", "INTC", "INTU",
    "AMGN", "REGN", "ADP", "QCOM", "VRTX", "AMAT", "ISRG", "MU", "ATVI", "MDLZ",
    "PYPL", "ADSK", "MELI", "KLAC", "SNPS", "CDNS", "ASML", "CHTR", "MNST", "LRCX",
    "ORLY", "KDP", "DXCM", "MAR", "IDXX", "CTAS", "ROST", "WDAY", "PCAR", "AZN",
    "CPRT", "XEL", "DLTR", "FAST", "VRSK", "ANSS", "SGEN", "BIIB", "ALGN", "SIRI",
    "EBAY", "EXC", "NTES", "JD", "BIDU", "SWKS", "INCY", "WBA", "ULTA", "TTWO",
    "VRSN", "LULU", "MTCH", "ZM", "DOCU", "OKTA", "DDOG", "CRWD", "NET", "FTNT",
    "ZS", "PANW", "TEAM", "PLTR", "DBX", "AFRM", "COIN", "HOOD", "DASH", "RIVN",
    "LCID", "PTON", "ABNB", "UBER", "LYFT", "SNOW", "DDOG", "MDB", "TWLO", "SPLK",
    "BRK.B", "JPM", "V", "MA", "BAC", "WFC", "GS", "AXP", "MS", "BLK", "C",
    "XOM", "CVX", "LLY", "JNJ", "UNH", "ABBV", "MRK", "ABT", "PFE", "MDT",
    "WMT", "PG", "KO", "HD", "MCD", "PEP", "TJX", "NKE", "SBUX", "MDLZ", "CL",
    "GE", "UNP", "CAT", "HON", "RTX", "LMT", "UPS", "NOC", "BA", "CRM"
]

# Environment variables (set these in your environment)
ALPHA_VANTAGE_API_KEY = os.getenv("ALPHA_VANTAGE_API_KEY")
NEWS_API_KEY = os.getenv("NEWS_API_KEY")
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# Initialize services
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL else None
genai.configure(api_key=GEMINI_API_KEY) if GEMINI_API_KEY else None

@dataclass
class MarketEvent:
    timestamp: datetime
    event_type: str
    description: str
    affected_tickers: List[str]
    severity_score: float
    source_url: str

@dataclass
class RiskAssessment:
    event: MarketEvent
    impact_analysis: str
    probability_score: float
    time_horizon: str
    recommended_actions: List[str]

In [4]:
# Cell 2: Core components implementation
class MarketDataProvider:
    """Unified market data provider with yfinance primary, Alpha Vantage backup"""
    
    def __init__(self):
        self.alpha_vantage_ts = TimeSeries(key=ALPHA_VANTAGE_API_KEY) if ALPHA_VANTAGE_API_KEY else None
        self.alpha_vantage_fd = FundamentalData(key=ALPHA_VANTAGE_API_KEY) if ALPHA_VANTAGE_API_KEY else None
        self.rate_limit_delay = 12  # Alpha Vantage rate limit
        
    async def get_stock_data(self, ticker: str, period: str = "1y") -> pd.DataFrame:
        """Get stock data with yfinance first, Alpha Vantage as backup"""
        try:
            # Primary: yfinance
            stock = yf.Ticker(ticker)
            data = stock.history(period=period)
            
            if not data.empty:
                logger.info(f"Retrieved {ticker} data from yfinance")
                return self._standardize_data(data, ticker)
                
        except Exception as e:
            logger.warning(f"yfinance failed for {ticker}: {e}")
            
        # Backup: Alpha Vantage
        return await self._get_alpha_vantage_data(ticker)
    
    async def _get_alpha_vantage_data(self, ticker: str) -> pd.DataFrame:
        """Fallback to Alpha Vantage API"""
        if not self.alpha_vantage_ts:
            logger.error("Alpha Vantage not configured")
            return pd.DataFrame()
            
        try:
            await asyncio.sleep(self.rate_limit_delay)  # Rate limiting
            data, _ = self.alpha_vantage_ts.get_daily(symbol=ticker, outputsize='full')
            
            df = pd.DataFrame(data).T
            df.index = pd.to_datetime(df.index)
            df = df.astype(float)
            df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
            
            logger.info(f"Retrieved {ticker} data from Alpha Vantage")
            return self._standardize_data(df, ticker)
            
        except Exception as e:
            logger.error(f"Alpha Vantage failed for {ticker}: {e}")
            return pd.DataFrame()
    
    def _standardize_data(self, data: pd.DataFrame, ticker: str) -> pd.DataFrame:
        """Standardize data format and add technical indicators"""
        if data.empty:
            return data
            
        # Add technical indicators
        data['SMA_20'] = data['Close'].rolling(window=20).mean()
        data['SMA_50'] = data['Close'].rolling(window=50).mean()
        data['RSI'] = self._calculate_rsi(data['Close'])
        data['Volatility'] = data['Close'].pct_change().rolling(window=20).std()
        data['Ticker'] = ticker
        
        return data
    
    def _calculate_rsi(self, prices: pd.Series, window: int = 14) -> pd.Series:
        """Calculate Relative Strength Index"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))

class NewsMonitor:
    """Global news monitoring for market events"""
    
    def __init__(self):
        self.news_api_key = NEWS_API_KEY
        self.base_url = "https://newsapi.org/v2"
        self.risk_keywords = [
            "recession", "inflation", "interest rates", "fed", "central bank",
            "geopolitical", "trade war", "sanctions", "supply chain", "earnings",
            "merger", "acquisition", "bankruptcy", "regulation", "crypto", "china"
        ]
    
    async def scan_financial_news(self) -> List[MarketEvent]:
        """Scan for relevant financial news events"""
        events = []
        
        try:
            for keyword in self.risk_keywords:
                url = f"{self.base_url}/everything"
                params = {
                    "q": keyword,
                    "language": "en",
                    "sortBy": "publishedAt",
                    "from": (datetime.now() - timedelta(hours=24)).isoformat(),
                    "apiKey": self.news_api_key
                }
                
                response = requests.get(url, params=params)
                response.raise_for_status()
                data = response.json()
                
                for article in data.get("articles", [])[:5]:  # Limit per keyword
                    event = self._create_market_event(article, keyword)
                    if event:
                        events.append(event)
                
                await asyncio.sleep(0.1)  # Rate limiting
                
        except Exception as e:
            logger.error(f"News scanning failed: {e}")
            
        return events
    
    def _create_market_event(self, article: Dict, keyword: str) -> Optional[MarketEvent]:
        """Create market event from news article"""
        try:
            # Simple ticker extraction (you could enhance with NLP)
            affected_tickers = []
            title_content = f"{article['title']} {article.get('description', '')}"
            
            for ticker in TICKERS[:20]:  # Check subset for performance
                if ticker.lower() in title_content.lower():
                    affected_tickers.append(ticker)
            
            # Calculate severity score based on keyword and source
            severity_score = self._calculate_severity(article, keyword)
            
            return MarketEvent(
                timestamp=datetime.fromisoformat(article["publishedAt"].replace("Z", "+00:00")),
                event_type=keyword,
                description=article["title"],
                affected_tickers=affected_tickers,
                severity_score=severity_score,
                source_url=article["url"]
            )
            
        except Exception as e:
            logger.error(f"Failed to create event from article: {e}")
            return None
    
    def _calculate_severity(self, article: Dict, keyword: str) -> float:
        """Calculate event severity score (0-1)"""
        score = 0.5  # Base score
        
        # Keyword-based scoring
        severity_weights = {
            "recession": 0.9, "inflation": 0.8, "fed": 0.7,
            "bankruptcy": 0.9, "regulation": 0.6, "earnings": 0.4
        }
        score *= severity_weights.get(keyword, 0.5)
        
        # Source reliability (simplified)
        reliable_sources = ["reuters", "bloomberg", "wsj", "ft"]
        if any(source in article.get("source", {}).get("name", "").lower() for source in reliable_sources):
            score *= 1.2
        
        return min(score, 1.0)

class TransformerPredictor(nn.Module):
    """PyTorch Transformer model for market prediction"""
    
    def __init__(self, input_size: int = 64, hidden_size: int = 256, num_layers: int = 4):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        self.input_projection = nn.Linear(input_size, hidden_size)
        self.position_encoding = nn.Parameter(torch.randn(1000, hidden_size))
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=8,
            dim_feedforward=hidden_size * 4,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.output_projection = nn.Linear(hidden_size, 1)
        self.dropout = nn.Dropout(0.1)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        batch_size, seq_len, _ = x.shape
        
        # Project input and add position encoding
        x = self.input_projection(x)
        x = x + self.position_encoding[:seq_len].unsqueeze(0)
        x = self.dropout(x)
        
        # Transformer encoding
        x = self.transformer(x)
        
        # Output projection (predict next day return)
        return self.output_projection(x[:, -1, :])

class MarketSimulator:
    """Market simulation and prediction engine"""
    
    def __init__(self):
        self.model = TransformerPredictor()
        self.scaler = None
        self.feature_columns = ['Open', 'High', 'Low', 'Close', 'Volume', 'SMA_20', 'SMA_50', 'RSI', 'Volatility']
    
    def prepare_features(self, data: pd.DataFrame) -> np.ndarray:
        """Prepare features for model input"""
        features = data[self.feature_columns].fillna(method='ffill').fillna(0)
        
        # Calculate returns and additional features
        features['Return'] = data['Close'].pct_change()
        features['Volume_MA'] = data['Volume'].rolling(window=20).mean()
        features['Price_MA_Ratio'] = data['Close'] / data['SMA_20']
        
        return features.fillna(0).values
    
    async def simulate_scenario(self, ticker_data: Dict[str, pd.DataFrame], 
                               risk_events: List[MarketEvent]) -> Dict[str, Any]:
        """Simulate market scenarios under risk conditions"""
        results = {}
        
        try:
            for ticker, data in ticker_data.items():
                if data.empty:
                    continue
                    
                # Prepare features
                features = self.prepare_features(data)
                
                if len(features) < 60:  # Need sufficient history
                    continue
                
                # Create sequences for transformer
                sequences = self._create_sequences(features[-60:])  # Last 60 days
                
                # Simulate baseline scenario
                baseline_pred = self._predict_price_series(sequences)
                
                # Simulate risk-adjusted scenarios
                risk_adjusted_preds = []
                for event in risk_events:
                    if ticker in event.affected_tickers or not event.affected_tickers:
                        risk_pred = self._apply_risk_adjustment(baseline_pred, event)
                        risk_adjusted_preds.append(risk_pred)
                
                results[ticker] = {
                    "baseline_prediction": baseline_pred.tolist(),
                    "risk_scenarios": risk_adjusted_preds,
                    "current_price": float(data['Close'].iloc[-1]),
                    "volatility": float(data['Volatility'].iloc[-1])
                }
                
        except Exception as e:
            logger.error(f"Simulation failed: {e}")
            
        return results
    
    def _create_sequences(self, features: np.ndarray, seq_length: int = 30) -> torch.Tensor:
        """Create sequences for transformer input"""
        sequences = []
        for i in range(len(features) - seq_length + 1):
            sequences.append(features[i:i + seq_length])
        return torch.FloatTensor(sequences)
    
    def _predict_price_series(self, sequences: torch.Tensor, 
                             forecast_days: int = 30) -> np.ndarray:
        """Predict future price series"""
        self.model.eval()
        predictions = []
        
        with torch.no_grad():
            current_seq = sequences[-1:]  # Last sequence
            
            for _ in range(forecast_days):
                pred = self.model(current_seq)
                predictions.append(pred.item())
                
                # Update sequence (simplified - in practice, you'd update all features)
                new_step = current_seq[:, -1:, :].clone()
                new_step[:, :, 0] = pred  # Update price feature
                current_seq = torch.cat([current_seq[:, 1:, :], new_step], dim=1)
        
        return np.array(predictions)
    
    def _apply_risk_adjustment(self, baseline_pred: np.ndarray, 
                              event: MarketEvent) -> List[float]:
        """Apply risk event adjustments to baseline predictions"""
        # Risk adjustment factors based on event severity and type
        adjustment_factors = {
            "recession": -0.15, "inflation": -0.08, "earnings": 0.05,
            "regulation": -0.12, "fed": -0.10, "geopolitical": -0.18
        }
        
        factor = adjustment_factors.get(event.event_type, 0.0)
        risk_multiplier = factor * event.severity_score
        
        # Apply time-decaying impact
        decay_rates = np.exp(-np.arange(len(baseline_pred)) * 0.1)
        adjustments = risk_multiplier * decay_rates
        
        return (baseline_pred * (1 + adjustments)).tolist()

In [5]:
# Cell 3: CrewAI Tools and Agents
class MarketDataTool(BaseTool):
    name: str = "market_data_fetcher"
    description: str = "Fetch real-time market data for specified tickers"
    
    def __init__(self):
        super().__init__()
        self.provider = MarketDataProvider()
    
    def _run(self, tickers: str) -> str:
        """Fetch market data for given tickers"""
        ticker_list = [t.strip() for t in tickers.split(",")]
        results = {}
        
        loop = asyncio.get_event_loop()
        for ticker in ticker_list[:5]:  # Limit for performance
            data = loop.run_until_complete(self.provider.get_stock_data(ticker))
            if not data.empty:
                latest = data.iloc[-1]
                results[ticker] = {
                    "price": float(latest['Close']),
                    "volume": float(latest['Volume']),
                    "volatility": float(latest['Volatility']),
                    "rsi": float(latest['RSI'])
                }
        
        return json.dumps(results)

class NewsAnalysisTool(BaseTool):
    name: str = "news_analyzer"
    description: str = "Analyze recent financial news for market risks"
    
    def __init__(self):
        super().__init__()
        self.monitor = NewsMonitor()
    
    def _run(self, query: str = "") -> str:
        """Scan and analyze recent financial news"""
        loop = asyncio.get_event_loop()
        events = loop.run_until_complete(self.monitor.scan_financial_news())
        
        event_summaries = []
        for event in events[:10]:  # Top 10 events
            event_summaries.append({
                "timestamp": event.timestamp.isoformat(),
                "type": event.event_type,
                "description": event.description,
                "severity": event.severity_score,
                "affected_tickers": event.affected_tickers
            })
        
        return json.dumps(event_summaries)

class SimulationTool(BaseTool):
    name: str = "market_simulator"
    description: str = "Run market simulations under different risk scenarios"
    
    def __init__(self):
        super().__init__()
        self.simulator = MarketSimulator()
        self.provider = MarketDataProvider()
    
    def _run(self, tickers_and_events: str) -> str:
        """Run market simulations"""
        try:
            data = json.loads(tickers_and_events)
            tickers = data.get("tickers", [])
            events_data = data.get("events", [])
            
            # Convert events data back to MarketEvent objects
            events = []
            for event_data in events_data:
                events.append(MarketEvent(
                    timestamp=datetime.fromisoformat(event_data["timestamp"]),
                    event_type=event_data["type"],
                    description=event_data["description"],
                    affected_tickers=event_data["affected_tickers"],
                    severity_score=event_data["severity"],
                    source_url=""
                ))
            
            # Get market data
            loop = asyncio.get_event_loop()
            ticker_data = {}
            for ticker in tickers[:5]:  # Limit for performance
                data = loop.run_until_complete(self.provider.get_stock_data(ticker))
                if not data.empty:
                    ticker_data[ticker] = data
            
            # Run simulations
            results = loop.run_until_complete(
                self.simulator.simulate_scenario(ticker_data, events)
            )
            
            return json.dumps(results)
            
        except Exception as e:
            return f"Simulation failed: {str(e)}"

# CrewAI Agents
def create_horizon_scanner_agent():
    """Create the Horizon Scanner Agent"""
    return Agent(
        role="Market Horizon Scanner",
        goal="Continuously monitor global markets and news for emerging risk signals",
        backstory="""You are an expert market surveillance analyst with deep experience 
        in identifying early warning signals across global financial markets. You excel 
        at connecting seemingly unrelated events to potential market impacts.""",
        tools=[MarketDataTool(), NewsAnalysisTool()],
        verbose=True,
        allow_delegation=False
    )

def create_economic_analyst_agent():
    """Create the Economic Analyst Agent"""
    return Agent(
        role="Senior Economic Analyst",
        goal="Analyze market events and assess their potential economic impact",
        backstory="""You are a senior economist with 15+ years of experience analyzing 
        market events and their cascading effects across different sectors and geographies. 
        You specialize in second-order effect analysis and risk quantification.""",
        tools=[],
        verbose=True,
        allow_delegation=False,
        llm_config={
            "model": "gemini-pro",
            "api_key": GEMINI_API_KEY
        } if GEMINI_API_KEY else None
    )

def create_simulation_agent():
    """Create the Simulation Agent"""
    return Agent(
        role="Quantitative Risk Modeler",
        goal="Run predictive simulations and model potential market scenarios",
        backstory="""You are a quantitative analyst specializing in market risk modeling 
        and scenario analysis. You use advanced mathematical models to predict market 
        behavior under different risk conditions.""",
        tools=[SimulationTool()],
        verbose=True,
        allow_delegation=False
    )

def create_briefing_agent():
    """Create the Briefing Agent"""
    return Agent(
        role="Risk Intelligence Briefer",
        goal="Synthesize analysis into actionable client briefings",
        backstory="""You are a senior risk communication specialist who translates 
        complex market analysis into clear, actionable intelligence for institutional 
        clients. You excel at prioritizing risks and recommending specific actions.""",
        tools=[],
        verbose=True,
        allow_delegation=False
    )

In [6]:
# Cell 4: Pipeline orchestration and main execution
class MarketRiskPipeline:
    """Main pipeline orchestrator"""
    
    def __init__(self):
        self.agents = {
            "scanner": create_horizon_scanner_agent(),
            "analyst": create_economic_analyst_agent(),
            "simulator": create_simulation_agent(),
            "briefer": create_briefing_agent()
        }
        
        self.sendgrid_client = sendgrid.SendGridAPIClient(api_key=SENDGRID_API_KEY) if SENDGRID_API_KEY else None
    
    async def run_pipeline(self) -> Dict[str, Any]:
        """Execute the complete risk analysis pipeline"""
        logger.info("Starting Predictive Market Risk Pipeline")
        
        # Task 1: Horizon Scanning
        scanning_task = Task(
            description=f"""Scan global markets and news for emerging risks affecting these tickers: 
            {', '.join(TICKERS[:20])}. Focus on events from the last 24 hours that could impact 
            market sentiment or specific sectors. Provide market data and news analysis.""",
            agent=self.agents["scanner"],
            expected_output="JSON formatted market data and news events with risk indicators"
        )
        
        # Task 2: Economic Analysis
        analysis_task = Task(
            description="""Analyze the market events identified by the scanner. Assess the potential 
            economic impact, identify second-order effects, and quantify risk probabilities. 
            Consider sector correlations and macroeconomic implications.""",
            agent=self.agents["analyst"],
            expected_output="Detailed risk assessment with impact analysis and probability scores"
        )
        
        # Task 3: Market Simulation
        simulation_task = Task(
            description="""Run predictive simulations based on the identified risks and market data. 
            Model different scenarios showing potential price impacts over 1-week, 1-month, and 
            3-month horizons. Include confidence intervals and risk-adjusted predictions.""",
            agent=self.agents["simulator"],
            expected_output="Simulation results with predicted price paths and scenario analysis"
        )
        
        # Task 4: Client Briefing
        briefing_task = Task(
            description="""Synthesize all analysis into a comprehensive risk briefing for Fitch 
            clients. Prioritize the most critical risks, provide clear impact assessments, and 
            recommend specific portfolio actions. Format for executive consumption.""",
            agent=self.agents["briefer"],
            expected_output="Executive risk briefing with prioritized risks and action items"
        )
        
        # Create and execute crew
        crew = Crew(
            agents=list(self.agents.values()),
            tasks=[scanning_task, analysis_task, simulation_task, briefing_task],
            process=Process.sequential,
            verbose=True
        )
        
        try:
            result = crew.kickoff()
            
            # Store results in Supabase
            if supabase:
                await self._store_results(result)
            
            # Send briefing to clients
            if self.sendgrid_client:
                await self._send_briefing(result.raw)
            
            logger.info("Pipeline execution completed successfully")
            return {"status": "success", "results": result.raw}
            
        except Exception as e:
            logger.error(f"Pipeline execution failed: {e}")
            return {"status": "error", "error": str(e)}
    
    async def _store_results(self, results: Any):
        """Store pipeline results in Supabase"""
        try:
            data = {
                "timestamp": datetime.now().isoformat(),
                "pipeline_results": str(results),
                "status": "completed"
            }
            
            supabase.table("risk_pipeline_runs").insert(data).execute()
            logger.info("Results stored in Supabase")
            
        except Exception as e:
            logger.error(f"Failed to store results: {e}")
    
    async def _send_briefing(self, briefing_content: str):
        """Send risk briefing via SendGrid"""
        try:
            message = Mail(
                from_email='risk-pipeline@fitch.com',
                to_emails='clients@fitch.com',
                subject=f'Market Risk Briefing - {datetime.now().strftime("%Y-%m-%d")}',
                html_content=f"""
                <h2>Fitch Market Risk Intelligence</h2>
                <p><strong>Generated:</strong> {datetime.now().strftime("%Y-%m-%d %H:%M UTC")}</p>
                <div style="white-space: pre-wrap; font-family: monospace;">
                {briefing_content}
                </div>
                <hr>
                <p><em>This briefing was generated by Fitch's Predictive Market Risk Pipeline</em></p>
                """
            )
            
            response = self.sendgrid_client.send(message)
            logger.info(f"Briefing sent successfully: {response.status_code}")
            
        except Exception as e:
            logger.error(f"Failed to send briefing: {e}")

# Background worker for continuous monitoring
class ContinuousMonitor:
    """Background worker for 24/7 market monitoring"""
    
    def __init__(self, pipeline: MarketRiskPipeline):
        self.pipeline = pipeline
        self.running = False
        self.check_interval = 3600  # 1 hour
    
    async def start_monitoring(self):
        """Start continuous monitoring loop"""
        self.running = True
        logger.info("Starting continuous market monitoring")
        
        while self.running:
            try:
                # Check if markets are open (simplified)
                current_hour = datetime.now().hour
                if 9 <= current_hour <= 16:  # Market hours (EST)
                    await self.pipeline.run_pipeline()
                else:
                    logger.info("Markets closed, monitoring news only")
                    # Run reduced scan for major news events
                    
                await asyncio.sleep(self.check_interval)
                
            except Exception as e:
                logger.error(f"Monitoring error: {e}")
                await asyncio.sleep(300)  # 5 minute error recovery
    
    def stop_monitoring(self):
        """Stop the monitoring loop"""
        self.running = False
        logger.info("Stopping continuous monitoring")

# Main execution
async def main():
    """Main execution function"""
    
    # Verify environment setup
    required_env_vars = ["ALPHA_VANTAGE_API_KEY", "NEWS_API_KEY", "GEMINI_API_KEY"]
    missing_vars = [var for var in required_env_vars if not os.getenv(var)]
    
    if missing_vars:
        logger.warning(f"Missing environment variables: {missing_vars}")
        logger.warning("Some features may be limited")
    
    # Initialize pipeline
    pipeline = MarketRiskPipeline()
    
    # Option 1: Run once
    print("Running Fitch Predictive Market Risk Pipeline...")
    results = await pipeline.run_pipeline()
    print(f"Pipeline completed with status: {results['status']}")
    
    # Option 2: Continuous monitoring (uncomment to enable)
    # monitor = ContinuousMonitor(pipeline)
    # try:
    #     await monitor.start_monitoring()
    # except KeyboardInterrupt:
    #     monitor.stop_monitoring()
    #     logger.info("Pipeline stopped by user")

if __name__ == "__main__":
    asyncio.run(main())

print("✅ Fitch Market Risk Pipeline implementation complete!")
print("\nTo use this pipeline:")
print("1. Set required environment variables")
print("2. Run each cell sequentially")
print("3. The pipeline will execute and provide risk analysis results")

RuntimeError: asyncio.run() cannot be called from a running event loop