# Real-time Data Visualization for PeriodicTaskExecutor

This notebook demonstrates several approaches to visualize streaming data from the PeriodicTaskExecutor.

## Setup and Imports

In [ ]:
import asyncio
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import display, clear_output, HTML
import ipywidgets as widgets
from collections import deque
import time
import nest_asyncio

# Allow nested asyncio in Jupyter
nest_asyncio.apply()

# Import from the installed package
from src.utils.periodic_task_executor import PeriodicTaskExecutor
from src.models.market_probability import MarketProbability

%matplotlib widget

## Create Mock Data Fetchers

In [None]:
# Mock data fetchers that simulate getting market probabilities
request_counter = 0

async def fetch_polymarket_data():
    """Simulate fetching Polymarket data"""
    global request_counter
    request_counter += 1
    
    # Simulate random walk for probability
    base_prob = 0.65
    noise = np.random.normal(0, 0.02)
    probability = max(0.1, min(0.9, base_prob + noise))
    
    return MarketProbability(
        request_id=request_counter,
        fetched_at=datetime.utcnow(),
        probability=probability,
        source="polymarket",
        team="Team A",
        meta={"volume": np.random.randint(10000, 50000)}
    )

async def fetch_betfair_data():
    """Simulate fetching Betfair data"""
    global request_counter
    request_counter += 1
    
    # Simulate different probability with correlation
    base_prob = 0.63
    noise = np.random.normal(0, 0.015)
    probability = max(0.1, min(0.9, base_prob + noise))
    
    return MarketProbability(
        request_id=request_counter,
        fetched_at=datetime.utcnow(),
        probability=probability,
        source="betfair",
        team="Team A",
        meta={"liquidity": np.random.randint(50000, 200000)}
    )

async def fetch_pinnacle_data():
    """Simulate fetching Pinnacle data"""
    global request_counter
    request_counter += 1
    
    # Simulate sharp book with less noise
    base_prob = 0.64
    noise = np.random.normal(0, 0.01)
    probability = max(0.1, min(0.9, base_prob + noise))
    
    return MarketProbability(
        request_id=request_counter,
        fetched_at=datetime.utcnow(),
        probability=probability,
        source="pinnacle",
        team="Team A",
        meta={"limit": 10000}
    )

## Approach 1: Matplotlib Real-time Animation

In [None]:
class LivePlotter:
    def __init__(self, window_size=50):
        self.window_size = window_size
        self.data = {
            'polymarket': deque(maxlen=window_size),
            'betfair': deque(maxlen=window_size),
            'pinnacle': deque(maxlen=window_size),
            'timestamps': deque(maxlen=window_size)
        }
        
        # Setup plot
        self.fig, (self.ax1, self.ax2) = plt.subplots(2, 1, figsize=(10, 8))
        self.lines = {}
        
        # Initialize lines
        for source in ['polymarket', 'betfair', 'pinnacle']:
            self.lines[source], = self.ax1.plot([], [], label=source, marker='o', markersize=4)
        
        self.ax1.set_xlabel('Time (seconds ago)')
        self.ax1.set_ylabel('Probability')
        self.ax1.set_title('Market Probabilities Over Time')
        self.ax1.legend()
        self.ax1.grid(True)
        
        # Setup spread plot
        self.spread_line, = self.ax2.plot([], [], 'r-', label='Max Spread')
        self.ax2.set_xlabel('Time (seconds ago)')
        self.ax2.set_ylabel('Spread')
        self.ax2.set_title('Maximum Probability Spread')
        self.ax2.grid(True)
        
        plt.tight_layout()
        
    def update_data(self, results):
        """Update data from executor results"""
        timestamp = datetime.utcnow()
        self.data['timestamps'].append(timestamp)
        
        for source in ['polymarket', 'betfair', 'pinnacle']:
            if source in results:
                prob = results[source].probability
                self.data[source].append(prob)
            elif len(self.data[source]) > 0:
                # Carry forward last value if missing
                self.data[source].append(self.data[source][-1])
    
    def animate(self, frame):
        """Animation function for matplotlib"""
        if len(self.data['timestamps']) < 2:
            return
        
        # Calculate seconds ago
        current_time = self.data['timestamps'][-1]
        x_data = [(current_time - ts).total_seconds() for ts in self.data['timestamps']]
        x_data = [-x for x in x_data]  # Negative to show most recent on right
        
        # Update probability lines
        for source in ['polymarket', 'betfair', 'pinnacle']:
            if len(self.data[source]) > 0:
                self.lines[source].set_data(x_data, list(self.data[source]))
        
        # Calculate and plot spread
        if all(len(self.data[s]) > 0 for s in ['polymarket', 'betfair', 'pinnacle']):
            spreads = []
            for i in range(len(self.data['timestamps'])):
                probs = [self.data[s][i] for s in ['polymarket', 'betfair', 'pinnacle']]
                spread = max(probs) - min(probs)
                spreads.append(spread)
            self.spread_line.set_data(x_data, spreads)
        
        # Adjust axes
        if x_data:
            self.ax1.set_xlim(min(x_data) - 1, max(x_data) + 1)
            self.ax1.set_ylim(0.5, 0.8)
            self.ax2.set_xlim(min(x_data) - 1, max(x_data) + 1)
            self.ax2.set_ylim(0, 0.1)
        
        return list(self.lines.values()) + [self.spread_line]

In [None]:
# Run live plotting with PeriodicTaskExecutor
async def run_live_plot(duration=30):
    plotter = LivePlotter()
    executor = PeriodicTaskExecutor(interval=2.0)
    
    # Add our mock fetchers
    executor.add_task(fetch_polymarket_data, "polymarket")
    executor.add_task(fetch_betfair_data, "betfair")
    executor.add_task(fetch_pinnacle_data, "pinnacle")
    
    # Storage for CSV
    all_data = []
    
    # Custom task to update plot
    async def update_plot_task():
        while True:
            if executor.results_storage:
                latest = executor.results_storage[-1]
                if latest['successful_results']:
                    plotter.update_data(latest['successful_results'])
                    
                    # Store for CSV
                    for source, data in latest['successful_results'].items():
                        all_data.append({
                            'timestamp': data.fetched_at,
                            'source': data.source,
                            'team': data.team,
                            'probability': data.probability,
                            'request_id': data.request_id
                        })
            await asyncio.sleep(0.1)
    
    # Start animation
    anim = FuncAnimation(plotter.fig, plotter.animate, interval=100, blit=True)
    
    # Run tasks
    plot_task = asyncio.create_task(update_plot_task())
    executor_task = asyncio.create_task(executor.start(max_batches=duration//2))
    
    try:
        await executor_task
    finally:
        plot_task.cancel()
        
    # Save to CSV
    if all_data:
        df = pd.DataFrame(all_data)
        df.to_csv('../data/live_market_probabilities.csv', index=False)
        print(f"\nSaved {len(df)} records to CSV")
        
    return df

# Uncomment to run:
# df = await run_live_plot(duration=30)

## Approach 2: Plotly Interactive Dashboard

In [None]:
class PlotlyDashboard:
    def __init__(self):
        self.fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=('Market Probabilities', 'Probability Distribution', 
                          'Spread Over Time', 'Source Correlation'),
            specs=[[{"colspan": 2}, None],
                   [{}, {}]]
        )
        
        self.data_storage = {
            'timestamps': [],
            'polymarket': [],
            'betfair': [],
            'pinnacle': []
        }
        
        # Initialize traces
        for i, source in enumerate(['polymarket', 'betfair', 'pinnacle']):
            self.fig.add_trace(
                go.Scatter(x=[], y=[], name=source, mode='lines+markers'),
                row=1, col=1
            )
        
        # Spread trace
        self.fig.add_trace(
            go.Scatter(x=[], y=[], name='Max Spread', mode='lines', 
                      line=dict(color='red')),
            row=2, col=1
        )
        
        # Layout
        self.fig.update_layout(
            height=800,
            showlegend=True,
            title_text="Real-time Market Probability Dashboard"
        )
        
        self.fig.update_xaxes(title_text="Time", row=1, col=1)
        self.fig.update_yaxes(title_text="Probability", row=1, col=1)
        self.fig.update_xaxes(title_text="Time", row=2, col=1)
        self.fig.update_yaxes(title_text="Spread", row=2, col=1)
        
        self.widget = go.FigureWidget(self.fig)
        
    def update(self, results):
        """Update dashboard with new results"""
        timestamp = datetime.utcnow()
        self.data_storage['timestamps'].append(timestamp)
        
        # Update source data
        for source in ['polymarket', 'betfair', 'pinnacle']:
            if source in results:
                self.data_storage[source].append(results[source].probability)
            elif self.data_storage[source]:
                self.data_storage[source].append(self.data_storage[source][-1])
        
        # Update traces
        with self.widget.batch_update():
            for i, source in enumerate(['polymarket', 'betfair', 'pinnacle']):
                if self.data_storage[source]:
                    self.widget.data[i].x = self.data_storage['timestamps']
                    self.widget.data[i].y = self.data_storage[source]
            
            # Update spread
            if all(self.data_storage[s] for s in ['polymarket', 'betfair', 'pinnacle']):
                spreads = []
                for i in range(len(self.data_storage['timestamps'])):
                    probs = [self.data_storage[s][i] for s in ['polymarket', 'betfair', 'pinnacle']]
                    spreads.append(max(probs) - min(probs))
                
                self.widget.data[3].x = self.data_storage['timestamps']
                self.widget.data[3].y = spreads
            
            # Add histogram in subplot
            if len(self.data_storage['timestamps']) > 5:
                all_probs = []
                for source in ['polymarket', 'betfair', 'pinnacle']:
                    all_probs.extend(self.data_storage[source][-20:])
                
                # Clear and add histogram
                self.widget.add_trace(
                    go.Histogram(x=all_probs, nbinsx=20, name='Distribution'),
                    row=2, col=2
                )

In [None]:
# Run Plotly dashboard
async def run_plotly_dashboard(duration=30):
    dashboard = PlotlyDashboard()
    display(dashboard.widget)
    
    executor = PeriodicTaskExecutor(interval=2.0)
    executor.add_task(fetch_polymarket_data, "polymarket")
    executor.add_task(fetch_betfair_data, "betfair")
    executor.add_task(fetch_pinnacle_data, "pinnacle")
    
    async def update_dashboard():
        while True:
            if executor.results_storage:
                latest = executor.results_storage[-1]
                if latest['successful_results']:
                    dashboard.update(latest['successful_results'])
            await asyncio.sleep(0.5)
    
    update_task = asyncio.create_task(update_dashboard())
    executor_task = asyncio.create_task(executor.start(max_batches=duration//2))
    
    try:
        await executor_task
    finally:
        update_task.cancel()

# Uncomment to run:
# await run_plotly_dashboard(duration=30)

## Approach 3: Simple Text-based Live Updates

In [None]:
async def run_text_monitor(duration=20):
    """Simple text-based monitoring with IPython display"""
    executor = PeriodicTaskExecutor(interval=2.0)
    executor.add_task(fetch_polymarket_data, "polymarket")
    executor.add_task(fetch_betfair_data, "betfair")
    executor.add_task(fetch_pinnacle_data, "pinnacle")
    
    # Create output widget
    output = widgets.Output()
    display(output)
    
    recent_data = deque(maxlen=10)
    
    async def update_display():
        while True:
            if executor.results_storage:
                latest = executor.results_storage[-1]
                if latest['successful_results']:
                    # Store data
                    recent_data.append({
                        'time': datetime.utcnow(),
                        'data': latest['successful_results']
                    })
                    
                    with output:
                        clear_output(wait=True)
                        
                        print("=" * 60)
                        print(f"LIVE MARKET PROBABILITIES - {datetime.utcnow().strftime('%H:%M:%S')}")
                        print("=" * 60)
                        
                        # Current values
                        print("\nCurrent Values:")
                        for source, data in latest['successful_results'].items():
                            print(f"  {source:12} : {data.probability:.4f} ({data.team})")
                        
                        # Calculate spread
                        probs = [d.probability for d in latest['successful_results'].values()]
                        if probs:
                            spread = max(probs) - min(probs)
                            avg_prob = sum(probs) / len(probs)
                            print(f"\n  Spread       : {spread:.4f}")
                            print(f"  Average      : {avg_prob:.4f}")
                        
                        # Recent history
                        print("\n\nRecent History:")
                        print("Time     | Polymarket | Betfair   | Pinnacle  | Spread")
                        print("-" * 60)
                        
                        for entry in recent_data:
                            time_str = entry['time'].strftime('%H:%M:%S')
                            values = {}
                            for source, data in entry['data'].items():
                                values[source] = data.probability
                            
                            if values:
                                spread = max(values.values()) - min(values.values())
                                print(f"{time_str} | "
                                      f"{values.get('polymarket', 0):.4f}    | "
                                      f"{values.get('betfair', 0):.4f}    | "
                                      f"{values.get('pinnacle', 0):.4f}   | "
                                      f"{spread:.4f}")
                        
                        # Stats
                        print(f"\n\nStatistics:")
                        print(f"  Total fetches: {len(executor.results_storage)}")
                        print(f"  Errors: {executor.error_count}")
                        
            await asyncio.sleep(0.5)
    
    update_task = asyncio.create_task(update_display())
    executor_task = asyncio.create_task(executor.start(max_batches=duration//2))
    
    try:
        await executor_task
    finally:
        update_task.cancel()

# Uncomment to run:
# await run_text_monitor(duration=20)

## Approach 4: Save to CSV and Monitor File

In [None]:
class CSVMonitor:
    def __init__(self, filename='../data/live_probabilities.csv'):
        self.filename = filename
        self.df = pd.DataFrame()
        
    def append_results(self, results):
        """Append results to CSV file"""
        rows = []
        timestamp = datetime.utcnow()
        
        for source, data in results.items():
            rows.append({
                'timestamp': timestamp,
                'source': data.source,
                'team': data.team,
                'probability': data.probability,
                'request_id': data.request_id,
                **{f'meta_{k}': v for k, v in data.meta.items()}
            })
        
        new_df = pd.DataFrame(rows)
        
        # Append to file
        if not self.df.empty:
            self.df = pd.concat([self.df, new_df], ignore_index=True)
        else:
            self.df = new_df
            
        # Save to CSV
        self.df.to_csv(self.filename, index=False)
        
    def plot_from_csv(self):
        """Read CSV and create plots"""
        if not self.df.empty:
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
            
            # Plot probabilities by source
            for source in self.df['source'].unique():
                source_data = self.df[self.df['source'] == source]
                ax1.plot(source_data['timestamp'], source_data['probability'], 
                        label=source, marker='o', markersize=4)
            
            ax1.set_xlabel('Time')
            ax1.set_ylabel('Probability')
            ax1.set_title('Market Probabilities from CSV')
            ax1.legend()
            ax1.grid(True)
            
            # Calculate and plot rolling spread
            pivot_df = self.df.pivot(index='timestamp', columns='source', values='probability')
            pivot_df['spread'] = pivot_df.max(axis=1) - pivot_df.min(axis=1)
            
            ax2.plot(pivot_df.index, pivot_df['spread'], 'r-', linewidth=2)
            ax2.set_xlabel('Time')
            ax2.set_ylabel('Spread')
            ax2.set_title('Probability Spread Over Time')
            ax2.grid(True)
            
            plt.tight_layout()
            plt.show()

In [None]:
# Run with CSV monitoring
async def run_with_csv_monitor(duration=20):
    monitor = CSVMonitor()
    executor = PeriodicTaskExecutor(interval=2.0)
    
    executor.add_task(fetch_polymarket_data, "polymarket")
    executor.add_task(fetch_betfair_data, "betfair")
    executor.add_task(fetch_pinnacle_data, "pinnacle")
    
    async def save_to_csv():
        while True:
            if executor.results_storage:
                latest = executor.results_storage[-1]
                if latest['successful_results']:
                    monitor.append_results(latest['successful_results'])
            await asyncio.sleep(0.5)
    
    save_task = asyncio.create_task(save_to_csv())
    executor_task = asyncio.create_task(executor.start(max_batches=duration//2))
    
    try:
        await executor_task
    finally:
        save_task.cancel()
    
    print(f"\nData saved to {monitor.filename}")
    print(f"Total records: {len(monitor.df)}")
    
    # Display final plot
    monitor.plot_from_csv()
    
    return monitor

# Uncomment to run:
# monitor = await run_with_csv_monitor(duration=20)

## Quick Start Example

In [1]:
# Quick example combining everything
async def quick_demo():
    print("Starting real-time data collection demo...\n")
    
    # Create executor
    executor = PeriodicTaskExecutor(interval=1.0)
    executor.add_task(fetch_polymarket_data, "polymarket")
    executor.add_task(fetch_betfair_data, "betfair")
    executor.add_task(fetch_pinnacle_data, "pinnacle")
    
    # Simple live display
    async def display_latest():
        for _ in range(10):
            await asyncio.sleep(1)
            if executor.results_storage:
                latest = executor.results_storage[-1]
                if latest['successful_results']:
                    print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Latest probabilities:")
                    for source, data in latest['successful_results'].items():
                        print(f"  {source}: {data.probability:.4f}")
    
    # Run both concurrently
    await asyncio.gather(
        executor.start(max_batches=10),
        display_latest()
    )
    
    print("\nDemo complete!")

# Run the demo
await quick_demo()

Starting real-time data collection demo...



NameError: name 'PeriodicTaskExecutor' is not defined

## Summary

This notebook demonstrates several approaches for visualizing streaming data:

1. **Matplotlib Animation**: Best for scientific/publication-quality plots with real-time updates
2. **Plotly Dashboard**: Best for interactive exploration and multiple views
3. **Text Monitor**: Best for quick debugging and terminal-style monitoring
4. **CSV Monitor**: Best for data persistence and post-processing

Choose the approach that best fits your needs. For production use, consider:
- Using a proper time-series database instead of CSV
- Implementing websocket connections for lower latency
- Adding error handling and reconnection logic
- Using a dedicated dashboard solution like Grafana for monitoring