# Real-Time Market Monitoring with Fluid Dynamics

## Table of Contents
1. [Real-Time Data Streaming](#streaming)
2. [Online Pattern Recognition](#pattern-recognition)
3. [Adaptive Filtering and Noise Reduction](#filtering)
4. [Live Regime Detection](#regime-detection)
5. [Alert Systems and Notifications](#alerts)
6. [Performance Monitoring](#performance)
7. [Dashboard Visualization](#dashboard)
8. [Latency Optimization](#optimization)

## Introduction

Real-time market monitoring applies fluid dynamics principles to live market data streams, enabling:

- **Instantaneous pattern detection**: Identify fluid dynamics patterns as they form
- **Low-latency signal generation**: Generate trading signals within microseconds
- **Adaptive regime tracking**: Monitor market regime changes in real-time
- **Risk monitoring**: Continuous assessment of market risk conditions
- **Alert systems**: Automated notifications for critical market events

### Key Requirements:
- **Latency**: Sub-millisecond processing for HFT applications
- **Scalability**: Handle thousands of instruments simultaneously
- **Reliability**: 99.99% uptime with fault tolerance
- **Accuracy**: Maintain analysis quality under time constraints

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Circle, Rectangle
import seaborn as sns
from scipy import signal
from scipy.ndimage import gaussian_filter1d
from collections import deque
import threading
import time
import queue
import warnings
from datetime import datetime, timedelta
import json
warnings.filterwarnings('ignore')

# Set visualization parameters
plt.style.use('seaborn-v0_8-dark')
sns.set_palette("bright")

print("Real-Time Market Monitoring Environment Initialized")
print("Ready for live data streaming and analysis")

## Real-Time Data Streaming Architecture

### Streaming Data Pipeline

```
Market Data → Buffer → Preprocessing → Fluid Analysis → Signal Generation → Alerts
     ↓           ↓           ↓             ↓              ↓             ↓
   Feed API   Ring Buffer  Filters    Pattern Detect   Trading Sys   Dashboard
```

### Data Structures for High-Frequency Processing

- **Ring buffers**: Efficient circular buffers for streaming data
- **Sliding windows**: Moving analysis windows for pattern detection
- **State machines**: Track market regime transitions
- **Event queues**: Handle asynchronous processing

In [None]:
class RealTimeFluidAnalyzer:
    """
    Real-time fluid dynamics analyzer for live market data
    """
    
    def __init__(self, buffer_size=1000, analysis_window=100):
        self.buffer_size = buffer_size
        self.analysis_window = analysis_window
        
        # Data buffers (using deque for efficient operations)
        self.price_buffer = deque(maxlen=buffer_size)
        self.volume_buffer = deque(maxlen=buffer_size)
        self.timestamp_buffer = deque(maxlen=buffer_size)
        
        # Analysis buffers
        self.velocity_buffer = deque(maxlen=buffer_size)
        self.vorticity_buffer = deque(maxlen=buffer_size)
        self.pressure_buffer = deque(maxlen=buffer_size)
        
        # Pattern detection state
        self.current_regime = 'unknown'
        self.regime_confidence = 0.0
        self.pattern_history = deque(maxlen=50)
        
        # Performance metrics
        self.processing_times = deque(maxlen=1000)
        self.update_count = 0
        
        # Alert system
        self.alert_queue = queue.Queue()
        self.alert_thresholds = {
            'high_vorticity': 2.0,
            'regime_change': 0.8,
            'shock_wave': 3.0,
            'turbulence': 1.5
        }
    
    def add_data_point(self, price, volume, timestamp=None):
        """
        Add new data point and trigger analysis
        """
        start_time = time.perf_counter()
        
        if timestamp is None:
            timestamp = datetime.now()
        
        # Add to buffers
        self.price_buffer.append(price)
        self.volume_buffer.append(volume)
        self.timestamp_buffer.append(timestamp)
        
        # Perform analysis if we have enough data
        if len(self.price_buffer) >= self.analysis_window:
            self._update_fluid_fields()
            self._detect_patterns()
            self._check_alerts()
        
        # Track performance
        processing_time = time.perf_counter() - start_time
        self.processing_times.append(processing_time)
        self.update_count += 1
    
    def _update_fluid_fields(self):
        """
        Update fluid dynamics fields from recent data
        """
        # Get recent data
        recent_prices = np.array(list(self.price_buffer)[-self.analysis_window:])
        recent_volumes = np.array(list(self.volume_buffer)[-self.analysis_window:])
        
        # Compute velocity (price momentum weighted by volume)
        price_changes = np.diff(recent_prices)
        volume_weights = recent_volumes[1:] / (np.mean(recent_volumes) + 1e-8)
        velocity = price_changes * volume_weights
        current_velocity = np.mean(velocity[-10:])  # Recent average
        self.velocity_buffer.append(current_velocity)
        
        # Compute vorticity (second derivative of velocity)
        if len(self.velocity_buffer) >= 3:
            recent_velocities = np.array(list(self.velocity_buffer)[-10:])
            velocity_gradient = np.gradient(recent_velocities)
            vorticity = np.gradient(velocity_gradient)[-1]  # Latest vorticity
            self.vorticity_buffer.append(vorticity)
        
        # Compute pressure (volume-weighted price pressure)
        price_std = np.std(recent_prices)
        volume_pressure = np.sum(recent_volumes[-5:]) / np.sum(recent_volumes)
        pressure = price_std * volume_pressure
        self.pressure_buffer.append(pressure)
    
    def _detect_patterns(self):
        """
        Real-time pattern detection
        """
        if len(self.vorticity_buffer) < 10:
            return
        
        recent_vorticity = np.array(list(self.vorticity_buffer)[-10:])
        recent_velocity = np.array(list(self.velocity_buffer)[-10:])
        recent_pressure = np.array(list(self.pressure_buffer)[-10:])
        
        # Pattern detection logic
        patterns_detected = []
        
        # Vortex detection
        if np.abs(recent_vorticity[-1]) > self.alert_thresholds['high_vorticity']:
            direction = 'bullish' if recent_vorticity[-1] > 0 else 'bearish'
            patterns_detected.append(f'vortex_{direction}')
        
        # Shock wave detection
        velocity_spike = np.abs(recent_velocity[-1]) / (np.std(recent_velocity[:-1]) + 1e-8)
        if velocity_spike > self.alert_thresholds['shock_wave']:
            patterns_detected.append('shock_wave')
        
        # Turbulence detection
        velocity_volatility = np.std(recent_velocity)
        if velocity_volatility > self.alert_thresholds['turbulence']:
            patterns_detected.append('turbulence')
        
        # Laminar flow detection
        if velocity_volatility < 0.1 and np.abs(np.mean(recent_velocity)) > 0.1:
            patterns_detected.append('laminar_flow')
        
        # Update pattern history
        if patterns_detected:
            self.pattern_history.extend(patterns_detected)
        
        # Update regime
        self._update_regime(patterns_detected)
    
    def _update_regime(self, current_patterns):
        """
        Update market regime based on detected patterns
        """
        # Simple regime classification
        pattern_counts = {}
        recent_patterns = list(self.pattern_history)[-20:]  # Last 20 patterns
        
        for pattern in recent_patterns:
            pattern_counts[pattern] = pattern_counts.get(pattern, 0) + 1
        
        if not pattern_counts:
            new_regime = 'neutral'
            confidence = 0.1
        else:
            # Determine dominant pattern
            dominant_pattern = max(pattern_counts, key=pattern_counts.get)
            confidence = pattern_counts[dominant_pattern] / len(recent_patterns)
            
            # Map patterns to regimes
            if 'laminar_flow' in dominant_pattern:
                new_regime = 'trending'
            elif 'turbulence' in dominant_pattern:
                new_regime = 'volatile'
            elif 'vortex' in dominant_pattern:
                new_regime = 'reversal'
            elif 'shock_wave' in dominant_pattern:
                new_regime = 'shock'
            else:
                new_regime = 'mixed'
        
        # Check for regime change
        if new_regime != self.current_regime and confidence > self.alert_thresholds['regime_change']:
            self._generate_alert('regime_change', {
                'old_regime': self.current_regime,
                'new_regime': new_regime,
                'confidence': confidence
            })
        
        self.current_regime = new_regime
        self.regime_confidence = confidence
    
    def _check_alerts(self):
        """
        Check for alert conditions
        """
        if len(self.vorticity_buffer) == 0:
            return
        
        latest_vorticity = self.vorticity_buffer[-1]
        latest_velocity = self.velocity_buffer[-1] if self.velocity_buffer else 0
        latest_pressure = self.pressure_buffer[-1] if self.pressure_buffer else 0
        
        # High vorticity alert
        if np.abs(latest_vorticity) > self.alert_thresholds['high_vorticity']:
            self._generate_alert('high_vorticity', {
                'vorticity': latest_vorticity,
                'timestamp': self.timestamp_buffer[-1]
            })
        
        # Shock wave alert
        if len(self.velocity_buffer) >= 5:
            velocity_spike = np.abs(latest_velocity) / (np.std(list(self.velocity_buffer)[-5:]) + 1e-8)
            if velocity_spike > self.alert_thresholds['shock_wave']:
                self._generate_alert('shock_wave', {
                    'velocity_spike': velocity_spike,
                    'velocity': latest_velocity
                })
    
    def _generate_alert(self, alert_type, data):
        """
        Generate alert and add to queue
        """
        alert = {
            'type': alert_type,
            'timestamp': datetime.now(),
            'data': data,
            'regime': self.current_regime,
            'regime_confidence': self.regime_confidence
        }
        
        try:
            self.alert_queue.put_nowait(alert)
        except queue.Full:
            # Queue is full, remove oldest alert
            try:
                self.alert_queue.get_nowait()
                self.alert_queue.put_nowait(alert)
            except queue.Empty:
                pass
    
    def get_current_state(self):
        """
        Get current analyzer state
        """
        return {
            'regime': self.current_regime,
            'regime_confidence': self.regime_confidence,
            'latest_vorticity': self.vorticity_buffer[-1] if self.vorticity_buffer else 0,
            'latest_velocity': self.velocity_buffer[-1] if self.velocity_buffer else 0,
            'latest_pressure': self.pressure_buffer[-1] if self.pressure_buffer else 0,
            'buffer_fill': len(self.price_buffer) / self.buffer_size,
            'avg_processing_time': np.mean(list(self.processing_times)) if self.processing_times else 0,
            'update_count': self.update_count,
            'alerts_pending': self.alert_queue.qsize()
        }
    
    def get_alerts(self, max_alerts=10):
        """
        Retrieve pending alerts
        """
        alerts = []
        for _ in range(min(max_alerts, self.alert_queue.qsize())):
            try:
                alerts.append(self.alert_queue.get_nowait())
            except queue.Empty:
                break
        return alerts

# Initialize real-time analyzer
rt_analyzer = RealTimeFluidAnalyzer(buffer_size=1000, analysis_window=50)

print("Real-Time Fluid Analyzer initialized")
print(f"Buffer size: {rt_analyzer.buffer_size}")
print(f"Analysis window: {rt_analyzer.analysis_window}")
print(f"Alert thresholds: {rt_analyzer.alert_thresholds}")

## Market Data Simulator

### Realistic High-Frequency Data Generation

For demonstration purposes, we create a market data simulator that generates realistic high-frequency trading data with various market conditions:

- **Normal trading**: Regular price movements with typical volatility
- **Trend phases**: Directional price movements
- **Volatility clusters**: Periods of increased market activity
- **Market shocks**: Sudden price jumps or crashes
- **News events**: Scheduled and unscheduled market-moving events

In [None]:
class MarketDataSimulator:
    """
    Simulate realistic high-frequency market data
    """
    
    def __init__(self, initial_price=100.0, tick_size=0.01):
        self.current_price = initial_price
        self.tick_size = tick_size
        self.time_counter = 0
        
        # Market regime parameters
        self.volatility = 0.02
        self.trend_strength = 0.0
        self.regime_change_prob = 0.001
        
        # Simulation state
        self.current_regime = 'normal'
        self.regime_duration = 0
        
    def generate_tick(self):
        """
        Generate next market tick
        """
        self.time_counter += 1
        self.regime_duration += 1
        
        # Check for regime change
        if np.random.random() < self.regime_change_prob or self.regime_duration > 200:
            self._change_regime()
        
        # Generate price movement based on current regime
        if self.current_regime == 'normal':
            price_change = np.random.normal(0, self.volatility)
            volume = np.random.exponential(1000)
        
        elif self.current_regime == 'trending':
            price_change = np.random.normal(self.trend_strength, self.volatility * 0.7)
            volume = np.random.exponential(1500)
        
        elif self.current_regime == 'volatile':
            price_change = np.random.normal(0, self.volatility * 3)
            volume = np.random.exponential(2000)
        
        elif self.current_regime == 'shock':
            # Generate shock wave
            shock_magnitude = np.random.choice([-1, 1]) * np.random.uniform(0.5, 2.0)
            price_change = shock_magnitude
            volume = np.random.exponential(5000)
            # Reset to normal after shock
            self.current_regime = 'normal'
            self.regime_duration = 0
        
        else:  # quiet
            price_change = np.random.normal(0, self.volatility * 0.3)
            volume = np.random.exponential(500)
        
        # Update price
        self.current_price += price_change
        self.current_price = max(0.01, self.current_price)  # Prevent negative prices
        
        # Round to tick size
        self.current_price = round(self.current_price / self.tick_size) * self.tick_size
        
        return {
            'price': self.current_price,
            'volume': max(1, int(volume)),
            'timestamp': datetime.now() + timedelta(microseconds=self.time_counter * 1000),
            'regime': self.current_regime
        }
    
    def _change_regime(self):
        """
        Change market regime
        """
        regimes = ['normal', 'trending', 'volatile', 'shock', 'quiet']
        probabilities = [0.4, 0.25, 0.15, 0.05, 0.15]  # Weighted probabilities
        
        new_regime = np.random.choice(regimes, p=probabilities)
        self.current_regime = new_regime
        self.regime_duration = 0
        
        # Update parameters for new regime
        if new_regime == 'trending':
            self.trend_strength = np.random.choice([-1, 1]) * np.random.uniform(0.01, 0.05)
            self.volatility = np.random.uniform(0.015, 0.025)
        elif new_regime == 'volatile':
            self.volatility = np.random.uniform(0.04, 0.08)
            self.trend_strength = 0
        elif new_regime == 'quiet':
            self.volatility = np.random.uniform(0.005, 0.015)
            self.trend_strength = 0
        else:
            self.volatility = np.random.uniform(0.015, 0.025)
            self.trend_strength = 0

# Initialize market simulator
market_sim = MarketDataSimulator(initial_price=100.0)

# Demonstrate real-time analysis with simulated data
def run_realtime_simulation(duration_seconds=30, tick_rate=100):
    """
    Run real-time simulation for specified duration
    """
    print(f"Starting real-time simulation for {duration_seconds} seconds...")
    print(f"Tick rate: {tick_rate} ticks/second")
    
    start_time = time.time()
    tick_interval = 1.0 / tick_rate
    
    # Storage for visualization
    simulation_data = {
        'timestamps': [],
        'prices': [],
        'volumes': [],
        'regimes': [],
        'analyzer_states': [],
        'alerts': []
    }
    
    while time.time() - start_time < duration_seconds:
        tick_start = time.time()
        
        # Generate market tick
        tick_data = market_sim.generate_tick()
        
        # Add to analyzer
        rt_analyzer.add_data_point(
            price=tick_data['price'],
            volume=tick_data['volume'],
            timestamp=tick_data['timestamp']
        )
        
        # Store data for visualization
        simulation_data['timestamps'].append(tick_data['timestamp'])
        simulation_data['prices'].append(tick_data['price'])
        simulation_data['volumes'].append(tick_data['volume'])
        simulation_data['regimes'].append(tick_data['regime'])
        simulation_data['analyzer_states'].append(rt_analyzer.get_current_state())
        
        # Check for alerts
        alerts = rt_analyzer.get_alerts()
        if alerts:
            simulation_data['alerts'].extend(alerts)
            for alert in alerts:
                print(f"ALERT: {alert['type']} at {alert['timestamp'].strftime('%H:%M:%S.%f')[:-3]}")
        
        # Maintain tick rate
        elapsed = time.time() - tick_start
        if elapsed < tick_interval:
            time.sleep(tick_interval - elapsed)
    
    print(f"Simulation complete. Processed {len(simulation_data['prices'])} ticks")
    print(f"Alerts generated: {len(simulation_data['alerts'])}")
    
    return simulation_data

# Run short simulation
print("Running real-time simulation...")
sim_data = run_realtime_simulation(duration_seconds=10, tick_rate=50)

print(f"\nSimulation Summary:")
print(f"Total ticks: {len(sim_data['prices'])}")
print(f"Price range: {min(sim_data['prices']):.2f} - {max(sim_data['prices']):.2f}")
print(f"Total alerts: {len(sim_data['alerts'])}")
if sim_data['analyzer_states']:
    final_state = sim_data['analyzer_states'][-1]
    print(f"Final regime: {final_state['regime']} (confidence: {final_state['regime_confidence']:.2f})")
    print(f"Average processing time: {final_state['avg_processing_time']*1000:.2f} μs")

## Real-Time Dashboard and Visualization

### Interactive Dashboard Components

The real-time dashboard provides:

1. **Live price charts**: Real-time price movements with fluid dynamics overlays
2. **Pattern detection panel**: Current patterns and their confidence levels
3. **Regime monitor**: Current market regime and transition probabilities
4. **Alert feed**: Live stream of generated alerts
5. **Performance metrics**: System latency and throughput statistics
6. **Risk indicators**: Real-time risk assessment based on fluid dynamics

In [None]:
# Create comprehensive real-time analysis visualization
fig, axes = plt.subplots(3, 3, figsize=(18, 12))
fig.suptitle('Real-Time Market Monitoring Dashboard', fontsize=16, fontweight='bold')

# Extract data for plotting
times = [t.timestamp() for t in sim_data['timestamps']]
times_rel = [(t - times[0]) for t in times]  # Relative times in seconds
prices = sim_data['prices']
volumes = sim_data['volumes']
states = sim_data['analyzer_states']

# Extract analyzer data
if states:
    vorticities = [s['latest_vorticity'] for s in states]
    velocities = [s['latest_velocity'] for s in states]
    pressures = [s['latest_pressure'] for s in states]
    regimes = [s['regime'] for s in states]
    processing_times = [s['avg_processing_time'] * 1000000 for s in states]  # Convert to microseconds
else:
    vorticities = velocities = pressures = regimes = processing_times = []

# 1. Price and volume (top left)
ax1 = axes[0, 0]
ax1_twin = ax1.twinx()
line1 = ax1.plot(times_rel, prices, 'b-', linewidth=1.5, label='Price')[0]
line2 = ax1_twin.plot(times_rel, volumes, 'r-', alpha=0.6, linewidth=1, label='Volume')[0]
ax1.set_title('Live Price & Volume')
ax1.set_xlabel('Time (seconds)')
ax1.set_ylabel('Price', color='blue')
ax1_twin.set_ylabel('Volume', color='red')
ax1.grid(True, alpha=0.3)

# 2. Vorticity analysis (top center)
ax2 = axes[0, 1]
if vorticities:
    ax2.plot(times_rel, vorticities, 'g-', linewidth=2, label='Vorticity')
    ax2.axhline(y=2.0, color='red', linestyle='--', alpha=0.7, label='Alert Threshold')
    ax2.axhline(y=-2.0, color='red', linestyle='--', alpha=0.7)
ax2.set_title('Vorticity Field')
ax2.set_xlabel('Time (seconds)')
ax2.set_ylabel('Vorticity')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 3. Velocity field (top right)
ax3 = axes[0, 2]
if velocities:
    ax3.plot(times_rel, velocities, 'm-', linewidth=2, label='Velocity')
    # Add smoothed trend
    if len(velocities) > 5:
        smoothed = gaussian_filter1d(velocities, sigma=2)
        ax3.plot(times_rel, smoothed, 'k--', alpha=0.7, label='Trend')
ax3.set_title('Market Velocity')
ax3.set_xlabel('Time (seconds)')
ax3.set_ylabel('Velocity')
ax3.legend()
ax3.grid(True, alpha=0.3)

# 4. Regime detection (middle left)
ax4 = axes[1, 0]
if regimes:
    # Create regime color map
    regime_colors = {'normal': 0, 'trending': 1, 'volatile': 2, 'reversal': 3, 'shock': 4, 'mixed': 5, 'neutral': 6, 'unknown': 7}
    regime_nums = [regime_colors.get(r, 7) for r in regimes]
    
    scatter = ax4.scatter(times_rel, regime_nums, c=regime_nums, cmap='tab10', s=20, alpha=0.8)
    ax4.set_title('Market Regime Detection')
    ax4.set_xlabel('Time (seconds)')
    ax4.set_ylabel('Regime')
    ax4.set_yticks(list(regime_colors.values()))
    ax4.set_yticklabels(list(regime_colors.keys()), rotation=45)
    ax4.grid(True, alpha=0.3)

# 5. Processing performance (middle center)
ax5 = axes[1, 1]
if processing_times:
    ax5.plot(times_rel, processing_times, 'orange', linewidth=1.5, label='Processing Time')
    ax5.axhline(y=np.mean(processing_times), color='red', linestyle='--', alpha=0.7, 
               label=f'Mean: {np.mean(processing_times):.1f}μs')
ax5.set_title('Processing Performance')
ax5.set_xlabel('Time (seconds)')
ax5.set_ylabel('Processing Time (μs)')
ax5.legend()
ax5.grid(True, alpha=0.3)

# 6. Pressure field (middle right)
ax6 = axes[1, 2]
if pressures:
    ax6.plot(times_rel, pressures, 'brown', linewidth=2, label='Pressure')
    ax6.fill_between(times_rel, 0, pressures, alpha=0.3, color='brown')
ax6.set_title('Market Pressure')
ax6.set_xlabel('Time (seconds)')
ax6.set_ylabel('Pressure')
ax6.legend()
ax6.grid(True, alpha=0.3)

# 7. Alert timeline (bottom left)
ax7 = axes[2, 0]
if sim_data['alerts']:
    alert_times = [(a['timestamp'].timestamp() - times[0]) for a in sim_data['alerts']]
    alert_types = [a['type'] for a in sim_data['alerts']]
    
    # Create alert type mapping
    alert_type_map = {'high_vorticity': 0, 'regime_change': 1, 'shock_wave': 2, 'turbulence': 3}
    alert_nums = [alert_type_map.get(at, 4) for at in alert_types]
    
    ax7.scatter(alert_times, alert_nums, c=alert_nums, cmap='Reds', s=100, alpha=0.8, marker='x')
    ax7.set_title(f'Alerts Timeline ({len(sim_data["alerts"])} total)')
    ax7.set_xlabel('Time (seconds)')
    ax7.set_ylabel('Alert Type')
    ax7.set_yticks(list(alert_type_map.values()))
    ax7.set_yticklabels(list(alert_type_map.keys()), rotation=45)
else:
    ax7.text(0.5, 0.5, 'No Alerts Generated', ha='center', va='center', transform=ax7.transAxes)
    ax7.set_title('Alerts Timeline')
ax7.grid(True, alpha=0.3)

# 8. Volume-Price relationship (bottom center)
ax8 = axes[2, 1]
if len(prices) > 1 and len(volumes) > 1:
    # Color points by time
    scatter = ax8.scatter(volumes, prices, c=times_rel, cmap='viridis', alpha=0.6, s=15)
    ax8.set_title('Volume-Price Dynamics')
    ax8.set_xlabel('Volume')
    ax8.set_ylabel('Price')
    plt.colorbar(scatter, ax=ax8, label='Time (s)')
    ax8.grid(True, alpha=0.3)

# 9. System statistics (bottom right)
ax9 = axes[2, 2]
ax9.axis('off')

# Generate summary statistics
final_state = states[-1] if states else {}
stats_text = f"""
REAL-TIME MONITORING SUMMARY

Data Processing:
• Total ticks processed: {len(prices)}
• Processing rate: {len(prices)/10:.1f} ticks/sec
• Avg latency: {final_state.get('avg_processing_time', 0)*1000000:.1f} μs
• Buffer utilization: {final_state.get('buffer_fill', 0):.1%}

Market Analysis:
• Current regime: {final_state.get('regime', 'unknown')}
• Regime confidence: {final_state.get('regime_confidence', 0):.1%}
• Latest vorticity: {final_state.get('latest_vorticity', 0):.3f}
• Latest velocity: {final_state.get('latest_velocity', 0):.3f}

Alert System:
• Total alerts: {len(sim_data['alerts'])}
• Alert rate: {len(sim_data['alerts'])/10:.2f} alerts/sec
• Pending alerts: {final_state.get('alerts_pending', 0)}

Performance Metrics:
• Min price: ${min(prices):.2f}
• Max price: ${max(prices):.2f}
• Price volatility: {np.std(prices):.3f}
• Volume range: {min(volumes)}-{max(volumes)}

System Status: {'🟢 OPERATIONAL' if final_state.get('avg_processing_time', 0) < 0.001 else '🟡 DEGRADED' if final_state.get('avg_processing_time', 0) < 0.01 else '🔴 CRITICAL'}
"""

ax9.text(0.05, 0.95, stats_text, transform=ax9.transAxes, 
         fontsize=10, verticalalignment='top', fontfamily='monospace',
         bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgreen', alpha=0.8))

plt.tight_layout()
plt.show()

print("\n" + "="*70)
print("REAL-TIME MARKET MONITORING COMPLETE")
print("="*70)
print(f"Successfully processed {len(prices)} market ticks in real-time")
print(f"Generated {len(sim_data['alerts'])} alerts based on fluid dynamics patterns")
if final_state:
    print(f"System performance: {final_state['avg_processing_time']*1000000:.1f} μs average latency")
    print(f"Final market regime: {final_state['regime']} (confidence: {final_state['regime_confidence']:.1%})")
print("\nReal-time capabilities demonstrated:")
print("• Live pattern recognition and regime detection")
print("• Sub-millisecond processing latency")
print("• Automated alert generation")
print("• Comprehensive monitoring dashboard")
print("="*70)