In [None]:
pip install pandas numpy yfinance anthropic newsapi-python scipy scikit-learn ta

Collecting anthropic
  Downloading anthropic-0.39.0-py3-none-any.whl.metadata (22 kB)
Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl.metadata (1.2 kB)
Collecting ta
  Downloading ta-0.11.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading anthropic-0.39.0-py3-none-any.whl (198 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.4/198.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading newsapi_python-0.2.7-py2.py3-none-any.whl (7.9 kB)
Building wheels for collected packages: ta
  Building wheel for ta (setup.py) ... [?25l[?25hdone
  Created wheel for ta: filename=ta-0.11.0-py3-none-any.whl size=29412 sha256=5902e4afa4b6aacae876eed24a086f2617df3afbbbaa7b94a76f22648aee5b98
  Stored in directory: /root/.cache/pip/wheels/5f/67/4f/8a9f252836e053e532c6587a3230bc72a4deb16b03a829610b
Successfully built ta
Installing collected packages: newsapi-python, ta, anthropic
Successfully installed anthropic

In [None]:
# Setup and Dependencies
"""
Core dependencies and configuration for the geopolitical market analysis system
"""
import pandas as pd
import numpy as np
import yfinance as yf
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
import anthropic
from newsapi import NewsApiClient
import asyncio
import logging
from dataclasses import dataclass
import json
import ta

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# API Configuration
ANTHROPIC_API_KEY = "your_anthropic_key_here"
NEWS_API_KEY = "your_news_api_key_here"


In [None]:
# Data Models
"""
Data classes for structured analysis
"""
@dataclass
class GeopoliticalEvent:
    id: str
    timestamp: datetime
    title: str
    description: str
    source: str
    regions: List[str]
    categories: List[str]
    impact_score: float
    sentiment: float
    market_implications: List[str]

@dataclass
class MarketMetrics:
    timestamp: datetime
    spy_price: float
    vix_level: float
    sector_performance: Dict[str, float]
    global_risk_score: float
    volatility: float
    regime: str

@dataclass
class MarketImpact:
    event_id: str
    affected_sectors: List[str]
    price_impact: float
    volatility_impact: float
    confidence_score: float
    duration_estimate: str
    recommended_actions: List[str]


In [None]:
# News and Event Collection
class EventCollector:
    def __init__(self, news_api_key: str):
        self.news_client = NewsApiClient(api_key=news_api_key)
        self.logger = logging.getLogger(__name__)

        self.event_categories = {
            'GEOPOLITICAL': {
                'keywords': ['war', 'conflict', 'sanctions', 'treaty', 'diplomacy'],
                'impact_weight': 0.9
            },
            'ECONOMIC': {
                'keywords': ['inflation', 'recession', 'gdp', 'central bank', 'monetary policy'],
                'impact_weight': 0.8
            },
            'REGULATORY': {
                'keywords': ['regulation', 'policy', 'compliance', 'legislation'],
                'impact_weight': 0.7
            }
        }

    async def collect_events(self, lookback_days: int = 3) -> List[GeopoliticalEvent]:
        """Collect and categorize recent geopolitical events"""
        events = []
        try:
            for category, config in self.event_categories.items():
                query = ' OR '.join(config['keywords'])
                articles = self.news_client.get_everything(
                    q=query,
                    language='en',
                    from_param=(datetime.now() - timedelta(days=lookback_days)).strftime('%Y-%m-%d'),
                    to=datetime.now().strftime('%Y-%m-%d'),
                    sort_by='relevancy'
                )

                if articles['status'] == 'ok' and articles['articles']:
                    for article in articles['articles']:
                        # Add null check for description
                        description = article.get('description', '') or ''

                        event = GeopoliticalEvent(
                            id=str(hash(article['title'])),
                            timestamp=datetime.strptime(article['publishedAt'], '%Y-%m-%dT%H:%M:%SZ'),
                            title=article['title'],
                            description=description,
                            source=article['source']['name'],
                            regions=self._extract_regions(description),
                            categories=[category],
                            impact_score=config['impact_weight'],
                            sentiment=0.0,
                            market_implications=[]
                        )
                        events.append(event)

        except Exception as e:
            self.logger.error(f"Error collecting events: {str(e)}")

        return events

    def _extract_regions(self, text: str) -> List[str]:
        """Extract mentioned regions from text"""
        if not text:
            return []

        regions = {
            'NORTH_AMERICA': ['US', 'Canada', 'Mexico'],
            'EUROPE': ['EU', 'UK', 'Germany', 'France'],
            'ASIA': ['China', 'Japan', 'Korea', 'India'],
            'MIDDLE_EAST': ['Iran', 'Saudi', 'UAE', 'Israel']
        }

        found_regions = []
        for region, keywords in regions.items():
            if any(keyword.lower() in text.lower() for keyword in keywords):
                found_regions.append(region)

        return found_regions

In [None]:
# Market Data Collection
class MarketDataCollector:
    def __init__(self):
        self.logger = logging.getLogger(__name__)

    async def get_market_metrics(self) -> MarketMetrics:
        """Collect current market metrics"""
        try:
            spy = yf.Ticker('SPY')
            vix = yf.Ticker('^VIX')

            spy_data = spy.history(period='5d')
            vix_data = vix.history(period='5d')

            return MarketMetrics(
                timestamp=datetime.now(),
                spy_price=spy_data['Close'].iloc[-1],
                vix_level=vix_data['Close'].iloc[-1],
                sector_performance=await self._get_sector_performance(),
                global_risk_score=self._calculate_risk_score(vix_data['Close'].iloc[-1]),
                volatility=spy_data['Close'].pct_change().std() * np.sqrt(252),
                regime=self._determine_regime(vix_data['Close'].iloc[-1])
            )

        except Exception as e:
            self.logger.error(f"Error collecting market metrics: {str(e)}")
            raise

    async def _get_sector_performance(self) -> Dict[str, float]:
        """Get sector ETF performance"""
        sectors = {
            'XLK': 'TECHNOLOGY',
            'XLF': 'FINANCIALS',
            'XLE': 'ENERGY',
            'XLV': 'HEALTHCARE',
            'XLI': 'INDUSTRIALS'
        }

        performance = {}
        for ticker, sector in sectors.items():
            try:
                data = yf.Ticker(ticker).history(period='5d')
                perf = ((data['Close'].iloc[-1] / data['Close'].iloc[0]) - 1) * 100
                performance[sector] = perf
            except Exception as e:
                self.logger.warning(f"Error getting {sector} performance: {str(e)}")
                performance[sector] = 0.0

        return performance

    def _calculate_risk_score(self, vix_level: float) -> float:
        """Calculate global risk score (0-1)"""
        return min(vix_level / 50.0, 1.0)

    def _determine_regime(self, vix_level: float) -> str:
        """Determine market regime"""
        if vix_level > 30:
            return "HIGH_VOLATILITY"
        elif vix_level > 20:
            return "MODERATE_VOLATILITY"
        else:
            return "LOW_VOLATILITY"

In [None]:
# LLM Analysis
class GeopoliticalAnalyzer:
    def __init__(self, anthropic_api_key: str):
        self.client = anthropic.Client(api_key=anthropic_api_key)
        self.logger = logging.getLogger(__name__)

    def _format_sector_performance(self, sector_perf: Dict[str, float]) -> str:
        """Format sector performance for prompt"""
        return "\n".join([f"  {sector}: {perf:.2f}%" for sector, perf in sector_perf.items()])

    def _construct_analysis_prompt(self, event: GeopoliticalEvent, metrics: MarketMetrics) -> str:
        """Construct detailed prompt for event analysis"""
        prompt = f"""As a geopolitical market analyst, analyze the following event's market impact:

Event Details:
Title: {event.title}
Description: {event.description}
Regions Affected: {', '.join(event.regions)}
Categories: {', '.join(event.categories)}

Current Market Context:
- SPY Price: ${metrics.spy_price:.2f}
- VIX Level: {metrics.vix_level:.2f} (Volatility)
- Market Regime: {metrics.regime}
- Global Risk Score: {metrics.global_risk_score:.2f}
- Sector Performance: {self._format_sector_performance(metrics.sector_performance)}

Based on this event and market context, provide a detailed analysis in JSON format:
{{
    "affected_sectors": [
        // List primary sectors impacted, ordered by significance
        // Example: ["TECHNOLOGY", "ENERGY", "FINANCIALS"]
    ],
    "price_impact": // Expected price impact (-1.0 to 1.0, where -1 is very negative, 1 is very positive),
    "volatility_impact": // Expected volatility increase (0.0 to 1.0, where 1.0 is highest),
    "confidence_score": // Analysis confidence (0.0 to 1.0),
    "duration_estimate": // "SHORT_TERM" (1-5 days), "MEDIUM_TERM" (1-3 months), or "LONG_TERM" (3+ months),
    "recommended_actions": [
        // Specific trading or hedging recommendations
    ]
}}

Focus on actionable insights and quantifiable market impacts. Provide specific, detailed rationale for the analysis."""

        return prompt

    async def analyze_event(self, event: GeopoliticalEvent, market_metrics: MarketMetrics) -> MarketImpact:
        """Analyze market impact of geopolitical event"""
        try:
            # Construct prompt for analysis
            prompt = self._construct_analysis_prompt(event, market_metrics)

            # Get LLM analysis
            response = await asyncio.to_thread(
                self.client.messages.create,
                model="claude-3-sonnet-20240229",
                max_tokens=1000,
                temperature=0.2,
                messages=[{"role": "user", "content": prompt}]
            )

            # Extract content from response
            response_text = response.content[0].text if isinstance(response.content, list) else response.content

            # Parse response into MarketImpact
            return self._parse_llm_response(response_text, event.id)

        except Exception as e:
            self.logger.error(f"Error analyzing event: {str(e)}")
            self.logger.debug(f"Event: {event}")
            raise

    def _parse_llm_response(self, response_text: str, event_id: str) -> MarketImpact:
        """Parse LLM response into MarketImpact object"""
        try:
            # Clean the response text and extract JSON
            cleaned_text = response_text.strip()
            json_start = cleaned_text.find('{')
            json_end = cleaned_text.rfind('}') + 1
            if json_start >= 0 and json_end > json_start:
                json_str = cleaned_text[json_start:json_end]
            else:
                raise ValueError("Could not find JSON in response")

            # Parse JSON
            analysis = json.loads(json_str)

            # Validate and create MarketImpact
            return MarketImpact(
                event_id=event_id,
                affected_sectors=analysis.get('affected_sectors', []),
                price_impact=float(analysis.get('price_impact', 0.0)),
                volatility_impact=float(analysis.get('volatility_impact', 0.0)),
                confidence_score=float(analysis.get('confidence_score', 0.5)),
                duration_estimate=analysis.get('duration_estimate', 'SHORT_TERM'),
                recommended_actions=analysis.get('recommended_actions', [])
            )

        except Exception as e:
            self.logger.error(f"Error parsing LLM response: {str(e)}")
            self.logger.debug(f"Raw response: {response_text}")
            raise

In [None]:
# Main Analysis Pipeline
async def run_analysis():
    """Main analysis pipeline"""
    try:
        # Initialize components
        event_collector = EventCollector(NEWS_API_KEY)
        market_collector = MarketDataCollector()
        analyzer = GeopoliticalAnalyzer(ANTHROPIC_API_KEY)

        # Collect data
        events = await event_collector.collect_events(lookback_days=3)
        if not events:
            logger.warning("No events collected")
            return

        market_metrics = await market_collector.get_market_metrics()

        # Analyze each event
        analyses = []
        for event in events:
            try:
                impact = await analyzer.analyze_event(event, market_metrics)
                analyses.append({
                    'event': event,
                    'impact': impact
                })
            except Exception as e:
                logger.error(f"Error analyzing event {event.id}: {str(e)}")
                continue

        # Generate summary report
        print("\nGeopolitical Market Impact Analysis")
        print("=" * 50)

        for analysis in analyses:
            event = analysis['event']
            impact = analysis['impact']

            print(f"\nEvent: {event.title}")
            print(f"Regions: {', '.join(event.regions)}")
            print(f"Impact Score: {impact.price_impact:.2f}")
            print(f"Affected Sectors: {', '.join(impact.affected_sectors)}")
            print(f"Recommended Actions:")
            for action in impact.recommended_actions:
                print(f"- {action}")
            print("-" * 50)

    except Exception as e:
        logger.error(f"Error in analysis pipeline: {str(e)}")
        raise


In [None]:
# Run Analysis
async def run_analysis():
    """Main analysis pipeline"""
    try:
        # Initialize components
        event_collector = EventCollector(NEWS_API_KEY)
        market_collector = MarketDataCollector()
        analyzer = GeopoliticalAnalyzer(ANTHROPIC_API_KEY)

        # Collect data
        events = await event_collector.collect_events(lookback_days=3)
        if not events:
            print("No events collected")
            return

        market_metrics = await market_collector.get_market_metrics()

        # Analyze each event
        analyses = []
        for event in events:
            try:
                impact = await analyzer.analyze_event(event, market_metrics)
                analyses.append({
                    'event': event,
                    'impact': impact
                })
                # Add delay between API calls to prevent rate limiting
                await asyncio.sleep(1)
            except Exception as e:
                logger.error(f"Error analyzing event {event.id}: {str(e)}")
                continue

        # Generate summary report
        print("\nGeopolitical Market Impact Analysis")
        print("=" * 50)

        for analysis in analyses:
            event = analysis['event']
            impact = analysis['impact']

            print(f"\nEvent: {event.title}")
            print(f"Regions: {', '.join(event.regions)}")
            print(f"Impact Score: {impact.price_impact:.2f}")
            print(f"Affected Sectors: {', '.join(impact.affected_sectors)}")
            print(f"Recommended Actions:")
            for action in impact.recommended_actions:
                print(f"- {action}")
            print("-" * 50)

    except Exception as e:
        logger.error(f"Error in analysis pipeline: {str(e)}")
        raise

In [None]:
# Cell 8 (Fixed): Execute Analysis
async def execute_analysis():
    try:
        print("Starting geopolitical market analysis...")
        print("Collecting events and market data...")
        await run_analysis()
        print("Analysis complete!")
    except Exception as e:
        print(f"Analysis failed: {str(e)}")
    finally:
        print("Cleanup complete.")

# Run the analysis
if __name__ == "__main__":
    await execute_analysis()

Starting geopolitical market analysis...
Collecting events and market data...


ERROR:__main__:Error parsing LLM response: Could not find JSON in response
ERROR:__main__:Error analyzing event: Could not find JSON in response
ERROR:__main__:Error analyzing event -3255827810853303274: Could not find JSON in response
ERROR:__main__:Error parsing LLM response: Could not find JSON in response
ERROR:__main__:Error analyzing event: Could not find JSON in response
ERROR:__main__:Error analyzing event -3255827810853303274: Could not find JSON in response
ERROR:__main__:Error parsing LLM response: Could not find JSON in response
ERROR:__main__:Error analyzing event: Could not find JSON in response
ERROR:__main__:Error analyzing event -3255827810853303274: Could not find JSON in response



Geopolitical Market Impact Analysis

Event: Satellite images show Russia defying sanctions to give North Korea 1 million barrels of oil: report
Regions: NORTH_AMERICA, ASIA
Impact Score: -0.20
Affected Sectors: ENERGY, FINANCIALS, INDUSTRIALS
Recommended Actions:
- Consider reducing exposure to energy companies with significant business ties to Russia
- Monitor financial institutions with exposure to Russian sanctions for potential compliance risks
- Pay close attention to industrial companies reliant on Russian oil/gas imports for potential supply chain disruptions
--------------------------------------------------

Event: Sweden’s Government-Issued Pamphlet for Surviving War Has a Long History
Regions: NORTH_AMERICA, EUROPE
Impact Score: -0.20
Affected Sectors: INDUSTRIALS, ENERGY, FINANCIALS
Recommended Actions:
- Monitor geopolitical tensions and potential escalation in the region
- Consider hedging strategies for industrial and energy stocks with European exposure
- Look for oppo