# Trading Agent MCP Server and Client

This notebook contains the AI agent-based trading decision system that replaces procedural logic with LLM intelligence. It runs as a persistent MCP server for low-latency decisions in 0DTE options trading.

## Architecture:
- **MCP Server**: Persistent agent that maintains context
- **Client**: Fast interface for the orchestrator to query
- **Fallback**: Returns None to trigger procedural logic if needed

Author: AI Trading Systems  
Version: 1.0  
Date: 2025

## 1. Imports and Setup

In [2]:
# Standard imports
import asyncio
import json
import logging
from typing import Dict, Optional, Any, List
from dataclasses import dataclass, asdict
from datetime import datetime
import nest_asyncio
nest_asyncio.apply()

# MCP and Agent imports
import mcp
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import Tool, Resource
from agents import Agent, FunctionTool
from agents.mcp import MCPServerStdio

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('trading_agent')

## 2. Data Structures for Communication

These dataclasses define the structure of information passed between the orchestrator and agent.

In [3]:
@dataclass
class TradingContext:
    """Complete trading context for agent decision-making"""
    
    # Market Data
    current_price: float
    current_vix: float
    trend_strength: float  # -100 to +100
    trend_classification: str
    market_regime: str  # trending, ranging, volatile, squeeze
    
    # Technical Indicators
    rsi: float
    momentum_score: float
    support_level: float
    resistance_level: float
    atr_points: float
    atr_percent: float
    
    # Account Status
    account_size: float
    daily_pnl: float
    trades_today: int
    active_positions: List[Dict]
    total_contracts_today: int
    
    # Session Info
    minutes_since_open: float
    minutes_to_close: float
    day_of_week: str
    madrid_time: str
    
    # Option Chain (top candidates)
    put_options: List[Dict]  # [{strike, delta, credit, distance}]
    call_options: List[Dict]
    
    # Historical Stats
    avg_daily_range: float
    avg_up_move: float
    avg_down_move: float
    win_rate_last_20: float
    
    # Risk Parameters (current)
    max_daily_trades: int = 5
    max_concurrent: int = 4
    daily_loss_limit: float = 1500
    min_distance_points: int = 100


@dataclass
class TradingDecision:
    """Structured decision from the agent"""
    
    strategy: str  # 'iron_condor', 'put_spread', 'call_spread', 'skip'
    confidence: float  # 0-100
    
    # Strike selections
    put_strike: Optional[float] = None
    call_strike: Optional[float] = None
    put_wing_width: int = 25
    call_wing_width: int = 25
    
    # Position sizing
    contracts: int = 1
    
    # Risk management
    stop_loss_buffer: int = 50
    take_profit_target: float = 30.0
    
    # Execution style
    style: str = 'balanced'  # 'safe', 'balanced', 'aggressive'
    
    # Reasoning
    rationale: str = ""
    risk_factors: List[str] = None
    
    # Timing
    execute_now: bool = True
    wait_minutes: int = 0
    
    def to_dict(self):
        return asdict(self)

## 3. Trading Rules and Constraints

These are the critical rules the agent must follow for safe trading.

In [4]:
TRADING_RULES = """
TRADING RULES AND CONSTRAINTS:

1. SAFETY RULES:
- Minimum 100 points from current price (absolute safety)
- Stop loss at 50% cushion loss
- Max 4 concurrent positions
- Max 5 trades per day
- Daily loss limit: $1500
- No new trades after 21:00 Madrid (last hour)

2. VIX-ADJUSTED PARAMETERS:
- VIX <16: Min credit $5.50, strikes 0.25-0.4% away
- VIX 16-25: Min credit $15, strikes 0.35-0.5% away
- VIX >25: Min credit $25, strikes 0.45-0.6% away
- Wing width: 25-35 points based on VIX

3. POSITION SIZING:
- Base risk: 1.5% of account per trade
- If P&L > $300: Reduce size to 70%
- If P&L < -$500: Reduce size to 50%
- Monday: 1.2x size, Friday: 0.8x size
- Confidence >80%: 3 contracts max

4. TRADING STYLES:
- Safe: 0.45% strikes, 25pt wings, 0.30 delta
- Balanced: 0.35% strikes, 20pt wings, 0.35 delta
- Aggressive: 0.25% strikes, 20pt wings, 0.40 delta

5. EXIT RULES:
- Exit if strike threatened (50% cushion lost)
- Take profit at 30% after 30 minutes
- Close all positions by 15:00 ET
"""

print("Trading rules loaded successfully!")

Trading rules loaded successfully!


## 4. MCP Server Implementation

The server maintains persistent context and provides tools for decision-making.

In [5]:
class TradingAgentServer:
    """
    MCP Server that runs the trading agent.
    Maintains persistent context and provides tools for decision-making.
    """
    
    def __init__(self):
        self.server = Server("trading_agent")
        self.context_history = []
        self.decision_history = []
        self.last_decision_time = None
        
        # Setup MCP tools
        self._setup_tools()
        
        # Trading rules and constraints (cached for speed)
        self.rules = TRADING_RULES
        
    def _setup_tools(self):
        """Setup MCP tools for the agent"""
        
        @self.server.tool()
        async def analyze_market(context_json: str) -> str:
            """
            Analyze market conditions and make trading decision.
            Input: JSON string of TradingContext
            Output: JSON string of TradingDecision
            """
            context = json.loads(context_json)
            decision = await self._make_decision(context)
            return json.dumps(decision.to_dict())
        
        @self.server.tool()
        async def get_quick_signal(price: float, vix: float, trend: float) -> str:
            """
            Get quick trading signal without full analysis.
            Returns: 'BULLISH', 'BEARISH', 'NEUTRAL', 'SKIP'
            """
            if vix > 30:
                return 'SKIP'
            elif trend > 30:
                return 'BULLISH'
            elif trend < -30:
                return 'BEARISH'
            else:
                return 'NEUTRAL'
        
        @self.server.tool()
        async def validate_strikes(put_strike: float, call_strike: float, 
                                  current_price: float) -> str:
            """
            Validate that strikes meet safety requirements.
            Returns: 'VALID' or error message
            """
            put_distance = current_price - put_strike
            call_distance = call_strike - current_price
            
            if put_distance < 100:
                return f"PUT_TOO_CLOSE: Only {put_distance:.0f} points"
            if call_distance < 100:
                return f"CALL_TOO_CLOSE: Only {call_distance:.0f} points"
            
            return "VALID"
    
    async def _make_decision(self, context: Dict) -> TradingDecision:
        """
        Core decision-making using LLM with optimized prompting.
        """
        # Cache check for rapid repeated calls
        if self.last_decision_time:
            elapsed = (datetime.now() - self.last_decision_time).seconds
            if elapsed < 5 and self.decision_history:
                return self.decision_history[-1]
        
        # Format context for LLM (minimal for speed)
        prompt = self._format_fast_prompt(context)
        
        # Make decision via agent (would integrate with OpenAI here)
        decision = await self._query_llm(prompt, context)
        
        # Store in history
        self.context_history.append(context)
        self.decision_history.append(decision)
        self.last_decision_time = datetime.now()
        
        return decision
    
    def _format_fast_prompt(self, context: Dict) -> str:
        """Format minimal prompt for speed."""
        return f"""
        DECIDE NOW:
        
        Price: {context['current_price']:.0f}
        VIX: {context['current_vix']:.1f}
        Trend: {context['trend_classification']} ({context['trend_strength']:+.0f})
        Regime: {context['market_regime']}
        
        Account: ${context['account_size']:.0f}
        P&L: ${context['daily_pnl']:.0f}
        Trades: {context['trades_today']}/{context['max_daily_trades']}
        Active: {len(context['active_positions'])}
        
        Best Puts: {self._format_options(context['put_options'][:3])}
        Best Calls: {self._format_options(context['call_options'][:3])}
        
        Time: {context['madrid_time']} Madrid ({context['minutes_to_close']:.0f} min to close)
        
        {self.rules}
        
        RETURN JSON: strategy, put_strike, call_strike, contracts, confidence, rationale
        """
    
    def _format_options(self, options: List[Dict]) -> str:
        """Format option chain concisely"""
        if not options:
            return "None available"
        return " | ".join([f"{o['strike']:.0f}(Δ{o['delta']:.2f},${o['credit']:.2f})" 
                          for o in options])
    
    async def _query_llm(self, prompt: str, context: Dict) -> TradingDecision:
        """
        Query LLM and parse response.
        In production, this would call OpenAI with gpt-4.1-mini.
        """
        # For now, return intelligent defaults based on context
        decision = self._generate_intelligent_default(context)
        return decision
    
    def _generate_intelligent_default(self, context: Dict) -> TradingDecision:
        """Generate intelligent default decision when LLM unavailable."""
        
        vix = context['current_vix']
        trend = context['trend_strength']
        regime = context['market_regime']
        pnl = context['daily_pnl']
        trades = context['trades_today']
        
        # Skip conditions
        if trades >= context['max_daily_trades']:
            return TradingDecision(
                strategy='skip',
                confidence=0,
                rationale="Daily trade limit reached"
            )
        
        if pnl <= -context['daily_loss_limit']:
            return TradingDecision(
                strategy='skip',
                confidence=0,
                rationale="Daily loss limit reached"
            )
        
        # Strategy selection based on regime
        if regime == 'volatile' and vix > 25:
            strategy = 'skip'
            rationale = f"VIX too high at {vix:.1f}"
            confidence = 10
            
        elif regime == 'trending' and abs(trend) > 40:
            if trend > 0:
                strategy = 'put_spread'
                rationale = f"Strong uptrend {trend:.0f}, bullish put spread"
            else:
                strategy = 'call_spread'
                rationale = f"Strong downtrend {trend:.0f}, bearish call spread"
            confidence = 70
            
        else:  # ranging or squeeze
            strategy = 'iron_condor'
            rationale = f"{regime.title()} market ideal for iron condor"
            confidence = 75
        
        # Position sizing based on confidence and P&L
        if pnl > 300:
            contracts = 1  # Protect profits
        elif pnl < -500:
            contracts = 1  # Limit losses
        elif confidence > 80:
            contracts = 3
        elif confidence > 60:
            contracts = 2
        else:
            contracts = 1
        
        # Strike selection
        current_price = context['current_price']
        atr = context['atr_points']
        
        if strategy != 'skip':
            put_strike = current_price - max(100, atr * 0.8)
            call_strike = current_price + max(100, atr * 0.8)
        else:
            put_strike = None
            call_strike = None
        
        return TradingDecision(
            strategy=strategy,
            confidence=confidence,
            put_strike=put_strike,
            call_strike=call_strike,
            contracts=contracts,
            rationale=rationale,
            style='balanced' if vix < 20 else 'safe'
        )
    
    async def run(self):
        """Run the MCP server"""
        async with stdio_server() as streams:
            await self.server.run(
                streams[0],
                streams[1],
                mcp.ServerOptions(
                    server_name="trading_agent",
                    server_version="1.0.0"
                )
            )

print("TradingAgentServer class defined successfully!")

TradingAgentServer class defined successfully!


## 5. Client Implementation

Fast client for the orchestrator to query the trading agent.

In [6]:
class TradingAgentClient:
    """
    Fast client for querying the trading agent MCP server.
    Used by the orchestrator to get trading decisions.
    """
    
    def __init__(self, logger: Optional[logging.Logger] = None):
        self.logger = logger or logging.getLogger(__name__)
        self.agent = None
        self.mcp_server = None
        self._initialized = False
        
    async def initialize(self):
        """Initialize the agent and MCP connection"""
        if self._initialized:
            return
            
        try:
            # Setup MCP server connection
            params = {"command": "python", "args": ["trading_agent_server.py"]}
            self.mcp_server = MCPServerStdio(params=params, client_session_timeout_seconds=300)
            
            # Initialize agent with trading instructions
            instructions = """
            You are an expert 0DTE options trader. Make fast, safe trading decisions.
            Always prioritize capital preservation over profit.
            Return structured JSON decisions based on market conditions.
            Consider VIX levels, trend strength, and account status.
            Never risk more than 1.5% per trade.
            """
            
            self.agent = Agent(
                name="trading_agent",
                instructions=instructions,
                model="gpt-4.1-mini",  # Fast model for low latency
                mcp_servers=[self.mcp_server]
            )
            
            self._initialized = True
            self.logger.info("✅ Trading agent initialized")
            
        except Exception as e:
            self.logger.error(f"Failed to initialize agent: {e}")
            raise
    
    async def get_decision(self, 
                          market_data: Dict,
                          account_data: Dict,
                          option_chain: Dict,
                          historical_stats: Dict) -> Optional[TradingDecision]:
        """
        Get trading decision from agent.
        Returns None to trigger fallback to procedural logic.
        """
        if not self._initialized:
            await self.initialize()
        
        try:
            # Build context
            context = self._build_context(
                market_data, 
                account_data, 
                option_chain, 
                historical_stats
            )
            
            # Query agent for decision
            context_json = json.dumps(asdict(context))
            
            # Call MCP tool
            response = await self.agent.run_tools([
                {
                    "type": "function",
                    "function": {
                        "name": "analyze_market",
                        "arguments": {"context_json": context_json}
                    }
                }
            ])
            
            # Parse response
            if response and response.get('success'):
                decision_dict = json.loads(response['result'])
                decision = TradingDecision(**decision_dict)
                
                self.logger.info(f"📊 Agent decision: {decision.strategy}")
                self.logger.info(f"   Confidence: {decision.confidence:.0f}%")
                self.logger.info(f"   Rationale: {decision.rationale}")
                
                return decision
            else:
                self.logger.warning("Agent returned no decision, using fallback")
                return None
                
        except Exception as e:
            self.logger.error(f"Agent decision failed: {e}")
            return None  # Trigger fallback
    
    def _build_context(self, market_data: Dict, account_data: Dict, 
                      option_chain: Dict, historical_stats: Dict) -> TradingContext:
        """Build complete trading context for agent"""
        
        # Extract put and call options (top 5 each)
        put_options = []
        call_options = []
        
        if 'puts' in option_chain:
            for opt in option_chain['puts'][:5]:
                put_options.append({
                    'strike': opt['strike'],
                    'delta': abs(opt['delta']),
                    'credit': opt['mid'],
                    'distance': market_data['current_price'] - opt['strike']
                })
        
        if 'calls' in option_chain:
            for opt in option_chain['calls'][:5]:
                call_options.append({
                    'strike': opt['strike'],
                    'delta': opt['delta'],
                    'credit': opt['mid'],
                    'distance': opt['strike'] - market_data['current_price']
                })
        
        # Get Madrid time
        import pytz
        madrid_tz = pytz.timezone('Europe/Madrid')
        madrid_time = datetime.now(madrid_tz).strftime('%H:%M')
        
        return TradingContext(
            # Market data
            current_price=market_data.get('current_price', 0),
            current_vix=market_data.get('vix', 20),
            trend_strength=market_data.get('trend_strength', 0),
            trend_classification=market_data.get('trend_classification', 'neutral'),
            market_regime=market_data.get('regime', 'ranging'),
            
            # Technical indicators
            rsi=market_data.get('rsi', 50),
            momentum_score=market_data.get('momentum_score', 0),
            support_level=market_data.get('support', 0),
            resistance_level=market_data.get('resistance', 0),
            atr_points=market_data.get('atr_points', 100),
            atr_percent=market_data.get('atr_percent', 0.5),
            
            # Account status
            account_size=account_data.get('account_size', 15000),
            daily_pnl=account_data.get('daily_pnl', 0),
            trades_today=account_data.get('trades_today', 0),
            active_positions=account_data.get('active_positions', []),
            total_contracts_today=account_data.get('contracts_today', 0),
            
            # Session info
            minutes_since_open=market_data.get('minutes_since_open', 0),
            minutes_to_close=market_data.get('minutes_to_close', 390),
            day_of_week=datetime.now().strftime('%A'),
            madrid_time=madrid_time,
            
            # Option chain
            put_options=put_options,
            call_options=call_options,
            
            # Historical stats
            avg_daily_range=historical_stats.get('avg_range', 300),
            avg_up_move=historical_stats.get('avg_up', 150),
            avg_down_move=historical_stats.get('avg_down', 150),
            win_rate_last_20=historical_stats.get('win_rate', 70)
        )
    
    async def cleanup(self):
        """Clean up connections"""
        if self.mcp_server:
            await self.mcp_server.cleanup()
        self._initialized = False

print("TradingAgentClient class defined successfully!")

TradingAgentClient class defined successfully!


## 6. Integration Helper Functions

These functions help integrate the agent with your existing orchestrator.

In [7]:
async def create_trading_agent(logger: Optional[logging.Logger] = None) -> TradingAgentClient:
    """
    Factory function to create and initialize trading agent.
    Used by orchestrator at startup.
    """
    client = TradingAgentClient(logger)
    await client.initialize()
    return client


def convert_decision_to_orchestrator_format(decision: TradingDecision) -> Dict:
    """
    Convert agent decision to format expected by existing orchestrator.
    """
    if decision.strategy == 'skip':
        return {
            'action': 'skip',
            'reason': decision.rationale
        }
    
    return {
        'action': 'trade',
        'strategy': decision.strategy,
        'strikes': {
            'put': decision.put_strike,
            'call': decision.call_strike,
            'put_wing': decision.put_wing_width,
            'call_wing': decision.call_wing_width
        },
        'contracts': decision.contracts,
        'style': decision.style,
        'confidence': decision.confidence,
        'risk_management': {
            'stop_buffer': decision.stop_loss_buffer,
            'take_profit': decision.take_profit_target
        },
        'timing': {
            'execute_now': decision.execute_now,
            'wait_minutes': decision.wait_minutes
        },
        'rationale': decision.rationale
    }

print("Integration helper functions defined!")

Integration helper functions defined!


## 7. Test the System

Let's test the agent with sample market data.

In [8]:
async def test_agent_decision():
    """Test the agent with sample data"""
    
    # Create test server (using intelligent defaults since we don't have real LLM here)
    server = TradingAgentServer()
    
    # Sample context data
    test_context = {
        'current_price': 24000,
        'current_vix': 14.9,
        'trend_strength': 15,
        'trend_classification': 'mild_bullish',
        'market_regime': 'ranging',
        'rsi': 55,
        'momentum_score': 10,
        'support_level': 23850,
        'resistance_level': 24150,
        'atr_points': 120,
        'atr_percent': 0.5,
        'account_size': 15000,
        'daily_pnl': -200,
        'trades_today': 2,
        'active_positions': [],
        'total_contracts_today': 6,
        'minutes_since_open': 90,
        'minutes_to_close': 300,
        'day_of_week': 'Wednesday',
        'madrid_time': '16:30',
        'put_options': [
            {'strike': 23850, 'delta': 0.25, 'credit': 6.50, 'distance': 150},
            {'strike': 23825, 'delta': 0.20, 'credit': 5.00, 'distance': 175}
        ],
        'call_options': [
            {'strike': 24150, 'delta': 0.22, 'credit': 5.75, 'distance': 150},
            {'strike': 24175, 'delta': 0.18, 'credit': 4.50, 'distance': 175}
        ],
        'avg_daily_range': 297,
        'avg_up_move': 150,
        'avg_down_move': 147,
        'win_rate_last_20': 70,
        'max_daily_trades': 5,
        'max_concurrent': 4,
        'daily_loss_limit': 1500,
        'min_distance_points': 100
    }
    
    # Get decision
    decision = await server._make_decision(test_context)
    
    # Display results
    print("\n" + "="*60)
    print("AGENT DECISION TEST")
    print("="*60)
    
    print("\n📊 Market Conditions:")
    print(f"   Price: ${test_context['current_price']:,.0f}")
    print(f"   VIX: {test_context['current_vix']:.1f}")
    print(f"   Trend: {test_context['trend_classification']} ({test_context['trend_strength']:+.0f})")
    print(f"   Regime: {test_context['market_regime']}")
    
    print("\n💰 Account Status:")
    print(f"   Size: ${test_context['account_size']:,.0f}")
    print(f"   P&L: ${test_context['daily_pnl']:.0f}")
    print(f"   Trades: {test_context['trades_today']}/{test_context['max_daily_trades']}")
    
    print("\n🎯 Agent Decision:")
    print(f"   Strategy: {decision.strategy.upper()}")
    print(f"   Confidence: {decision.confidence:.0f}%")
    print(f"   Contracts: {decision.contracts}")
    print(f"   Style: {decision.style}")
    
    if decision.put_strike:
        print(f"   Put Strike: {decision.put_strike:.0f}")
    if decision.call_strike:
        print(f"   Call Strike: {decision.call_strike:.0f}")
    
    print(f"\n📝 Rationale: {decision.rationale}")
    
    # Convert to orchestrator format
    orchestrator_format = convert_decision_to_orchestrator_format(decision)
    print("\n🔄 Orchestrator Format:")
    print(json.dumps(orchestrator_format, indent=2))

# Run the test
await test_agent_decision()

AttributeError: 'Server' object has no attribute 'tool'

## 8. Run as Standalone MCP Server

This section shows how to run the agent as a standalone MCP server.

In [None]:
async def run_server():
    """Run the MCP server standalone"""
    print("Starting Trading Agent MCP Server...")
    print("This would normally run as a separate process.")
    print("The orchestrator connects to it via MCP protocol.")
    
    server = TradingAgentServer()
    # In production, this would run:
    # await server.run()
    
    print("\nServer configured with:")
    print(f"- {len(server.server._tools)} MCP tools")
    print("- Decision caching (5 second window)")
    print("- Intelligent fallback system")
    print("- Complete trading rules embedded")

# Show server configuration
await run_server()

## 9. Integration Instructions

To integrate this agent with your existing orchestrator:

### Step 1: Save the Server Script
Export this notebook's server code as `trading_agent_server.py`

### Step 2: Modify Your Orchestrator
```python
# In adaptive_strategy_orchestrator.py
from trading_agent_mcp import TradingAgentClient, create_trading_agent

# In __init__:
self.trading_agent = None

# In initialize_components:
self.trading_agent = await create_trading_agent(self.logger)

# In run_analysis_cycle:
# Replace strategy selection with agent decision
```

### Step 3: Run Both Components
1. Start the MCP server: `python trading_agent_server.py`
2. Run your orchestrator normally

The agent will make intelligent decisions while keeping procedural logic as fallback!