# Academic VPIN Implementation Using Rust Module

Volume-Synchronized Probability of Informed Trading (VPIN) analysis using academic methodology from Easley, López de Prado, O'Hara (2012).

In [None]:
import numpy as np
import pandas as pd
from scipy.stats import norm
import rust_indicators
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pycoingecko import CoinGeckoAPI
import warnings
warnings.filterwarnings('ignore')

ta = rust_indicators.RustTA()
cg = CoinGeckoAPI()
coin = 'ethereum'
days = 180
print(f"✓ Backend: {ta.device()}")

## Core Academic VPIN Functions

In [None]:
def create_volume_buckets(df, bucket_size_pct=0.01):
    """
    Create volume-synchronized buckets as per academic VPIN
    Each bucket contains exactly V volume units
    """
    # Calculate bucket size as percentage of average daily volume
    n_days = df['datetime'].dt.normalize().nunique()
    avg_daily_volume = df['volume'].sum() / max(1, n_days)
    bucket_size = avg_daily_volume * bucket_size_pct
    
    # Create volume buckets
    df['cum_volume'] = df['volume'].cumsum()
    df['bucket_id'] = (df['cum_volume'] / bucket_size).astype(int)
    
    # Aggregate to bucket level
    buckets = df.groupby('bucket_id').agg({
        'datetime': 'first',
        'price': ['first', 'last'],
        'volume': 'sum'
    })
    
    buckets.columns = ['datetime', 'open', 'close', 'volume']
    buckets['price_change'] = buckets['close'] - buckets['open']
    buckets = buckets.reset_index()
    
    return buckets, bucket_size

In [None]:
def bulk_volume_classification(buckets, window=50):
    """
    Implement Bulk Volume Classification (BVC) algorithm
    from Easley, López de Prado, O'Hara (2012)
    """

    buckets['ret'] = np.log(buckets['close']).diff().fillna(0.0)
    buckets['sigma'] = buckets['ret'].rolling(window, min_periods=10).std()
    buckets['sigma'] = buckets['sigma'].fillna(method='bfill')
    buckets['sigma'] = buckets['sigma'].replace(0, np.nan).fillna(buckets['ret'].std())
    sigma_floor = 1e-6
    buckets['z_score'] = buckets['ret'] / (buckets['sigma'].clip(lower=sigma_floor))
    buckets['buy_prob'] = norm.cdf(buckets['z_score'])
    buckets['buy_volume']  = buckets['volume'] * buckets['buy_prob']
    buckets['sell_volume'] = buckets['volume'] * (1 - buckets['buy_prob'])

    return buckets

In [None]:
def calculate_academic_vpin(df, bucket_size_pct=0.01, vpin_window=50):
    """
    Calculate VPIN using academic methodology with Rust module
    """
    # Step 1: Create volume buckets
    buckets, bucket_size = create_volume_buckets(df, bucket_size_pct)
    
    # Step 2: Apply Bulk Volume Classification
    buckets = bulk_volume_classification(buckets, window=vpin_window)
    # after creating `buckets`


    if len(buckets) <= 2:
        raise ValueError("Too few buckets. Reduce bucket_size_pct.")
    if vpin_window >= len(buckets):
        vpin_window = max(2, int(0.2 * len(buckets)))  # e.g., 20% of buckets

    # Step 3: Calculate VPIN using Rust module on bucketed data
    vpin_values = np.array(ta.vpin(
        buckets['buy_volume'].values,
        buckets['sell_volume'].values,
        vpin_window
    ))
    
    buckets['vpin'] = vpin_values
    
    # Step 4: Map back to original timeframe
    df['bucket_id'] = (df['volume'].cumsum() / bucket_size).astype(int)
    vpin_map = dict(zip(buckets['bucket_id'], vpin_values))
    df['vpin'] = df['bucket_id'].map(vpin_map)
    
    return df, buckets, vpin_values

In [None]:
from scipy.stats import rankdata

def calculate_vpin_cdf(vpin_values):
    out = np.full_like(vpin_values, np.nan, dtype=float)
    m = ~np.isnan(vpin_values)
    if m.sum() == 0:
        return out
    ranks = rankdata(vpin_values[m], method='average')  # 1..N
    out[m] = ranks / (len(ranks) + 1.0)                 # (0,1)
    return out


## Data Collection and VPIN Calculation

In [None]:
# Fetch real data
print("Fetching Bitcoin data...")
data = cg.get_coin_market_chart_by_id(coin, vs_currency='usd', days=days)

df = pd.DataFrame(data['prices'], columns=['timestamp', 'price'])
df['volume'] = pd.DataFrame(data['total_volumes'])[1]
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.sort_values('datetime').reset_index(drop=True)

print(f"Data: {len(df)} points from {df['datetime'].min()} to {df['datetime'].max()}")

# Calculate academic VPIN
print("\nCalculating academic VPIN...")
df, buckets, vpin_raw = calculate_academic_vpin(df, bucket_size_pct=0.01)

# Convert to CDF (toxicity metric)
vpin_cdf = calculate_vpin_cdf(vpin_raw)
buckets['vpin_cdf'] = vpin_cdf

# Identify toxicity events
buckets['toxic_event'] = vpin_cdf > 0.9

print(f"Buckets: {len(buckets)} volume-synchronized buckets")
print(f"Toxic events: {buckets['toxic_event'].sum()} buckets with CDF > 0.9")

## Visualization: Academic VPIN Analysis

In [None]:
def plot_academic_vpin(df, buckets, vpin_raw):
    """
    Create academic VPIN visualization with toxicity levels
    """
    fig = make_subplots(
        rows=4, cols=1,
        row_heights=[0.25, 0.25, 0.25, 0.25],
        subplot_titles=[
            'Price with Volume Buckets',
            'Order Flow Imbalance (Buy vs Sell Volume)',
            'VPIN (Raw) - Order Flow Toxicity',
            'VPIN CDF - Toxicity Probability'
        ],
        vertical_spacing=0.05
    )
    
    # 1. Price with bucket boundaries
    fig.add_trace(
        go.Scatter(
            x=df['datetime'],
            y=df['price'],
            name='Price',
            line=dict(color='lightblue', width=1)
        ),
        row=1, col=1
    )
    
    # Mark every 10th bucket boundary
    bucket_starts = df.groupby('bucket_id')['datetime'].first()
    for i, start_time in enumerate(bucket_starts):
        if i % 10 == 0:
            fig.add_vline(x=start_time, line_dash="dot", 
                         line_color="gray", opacity=0.3, row=1, col=1)
    
    # 2. Order Flow Imbalance
    imbalance = buckets['buy_volume'] - buckets['sell_volume']
    colors = ['green' if x > 0 else 'red' for x in imbalance]
    
    fig.add_trace(
        go.Bar(
            x=buckets['datetime'],
            y=imbalance,
            marker_color=colors,
            opacity=0.6,
            name='Volume Imbalance',
            showlegend=False
        ),
        row=2, col=1
    )
    
    # 3. Raw VPIN
    fig.add_trace(
        go.Scatter(
            x=buckets['datetime'],
            y=vpin_raw,
            name='VPIN',
            line=dict(color='orange', width=2)
        ),
        row=3, col=1
    )
    
    # Add Flash Crash threshold
    fig.add_hline(y=0.2, line_dash="dash", line_color="red",
                  annotation_text="Flash Crash Level (0.2)",
                  row=3, col=1)
    
    # 4. VPIN CDF
    fig.add_trace(
        go.Scatter(
            x=buckets['datetime'],
            y=buckets['vpin_cdf'],
            name='Toxicity Probability',
            fill='tozeroy',
            line=dict(color='purple', width=2)
        ),
        row=4, col=1
    )
    
    # Mark extreme toxicity events
    toxic_events = buckets[buckets['toxic_event']]
    if len(toxic_events) > 0:
        fig.add_trace(
            go.Scatter(
                x=toxic_events['datetime'],
                y=toxic_events['vpin_cdf'],
                mode='markers',
                marker=dict(color='red', size=10, symbol='x'),
                name='Toxic Events',
                showlegend=False
            ),
            row=4, col=1
        )
    
    # Add 90% threshold
    fig.add_hline(y=0.9, line_dash="dash", line_color="red",
                  annotation_text="Extreme Toxicity (90%)",
                  row=4, col=1)
    
    fig.update_layout(
        height=900,
        title="Academic VPIN: Volume-Synchronized Order Flow Toxicity",
        template="plotly_white",
        showlegend=True,
        hovermode='x unified'
    )
    
    fig.update_yaxes(title="Price ($)", row=1, col=1)
    fig.update_yaxes(title="Volume Δ", row=2, col=1)
    fig.update_yaxes(title="VPIN", row=3, col=1)
    fig.update_yaxes(title="CDF", range=[0, 1], row=4, col=1)
    
    return fig

fig = plot_academic_vpin(df, buckets, vpin_raw)
fig.show()

## Key Metrics Dashboard

In [None]:
def calculate_toxicity_metrics(buckets, vpin_raw):
    """
    Calculate key metrics from academic VPIN papers
    """
    # Clean data
    clean_vpin = vpin_raw[~np.isnan(vpin_raw)]
    
    if len(clean_vpin) == 0:
        print("No valid VPIN values to analyze")
        return None
    
    # Metrics
    metrics = {
        'mean_vpin': np.mean(clean_vpin),
        'std_vpin': np.std(clean_vpin),
        'max_vpin': np.max(clean_vpin),
        'toxic_periods': (buckets['vpin_cdf'] > 0.9).sum(),
        'total_buckets': len(buckets),
        'avg_bucket_imbalance': np.mean(np.abs(
            buckets['buy_volume'] - buckets['sell_volume']
        ) / (buckets['volume'] + 1e-10))
    }
    
    # Print Flash Crash-style analysis
    print("="*60)
    print("ACADEMIC VPIN ANALYSIS (Easley et al. 2012 Methodology)")
    print("="*60)
    print(f"📊 Mean VPIN: {metrics['mean_vpin']:.4f}")
    print(f"📈 Max VPIN: {metrics['max_vpin']:.4f}")
    print(f"⚠️  Toxic Events (CDF>0.9): {metrics['toxic_periods']}/{metrics['total_buckets']} buckets")
    print(f"📉 Avg Order Imbalance: {metrics['avg_bucket_imbalance']:.2%}")
    
    # Warning levels
    if metrics['max_vpin'] > 0.2:
        print("🚨 WARNING: VPIN exceeds Flash Crash threshold (0.2)")
    
    toxicity_pct = metrics['toxic_periods'] / metrics['total_buckets'] * 100
    if toxicity_pct > 5:
        print(f"🚨 HIGH TOXICITY: {toxicity_pct:.1f}% of time in toxic state")
    
    print("="*60)
    
    return metrics

metrics = calculate_toxicity_metrics(buckets, vpin_raw)

## Comparison: Time-based vs Volume-based VPIN

In [None]:
# Simple time-based VPIN for comparison
print("\nCalculating time-based VPIN for comparison...")
time_vpin = np.array(ta.vpin(
    df['volume'].values * 0.5,  # Simple 50/50 split
    df['volume'].values * 0.5,
    50
))

print("\n" + "="*60)
print("Time-based vs Volume-bucket VPIN Comparison:")
print("="*60)
print(f"Time-based max: {np.nanmax(time_vpin):.4f}")
print(f"Volume-bucket max: {metrics['max_vpin']:.4f}")
print(f"Difference: {abs(metrics['max_vpin'] - np.nanmax(time_vpin)):.4f}")
print(f"\nVolume-bucket approach is {'higher' if metrics['max_vpin'] > np.nanmax(time_vpin) else 'lower'}")
print("This difference reflects the impact of volume synchronization")
print("="*60)

## Summary

### Key Academic Features Implemented:

1. **Volume Bucket Synchronization**: Each bucket contains exactly V volume units (1% of daily volume)
2. **Bulk Volume Classification (BVC)**: Uses CDF of normalized price changes to classify volume
3. **CDF Transformation**: Converts VPIN to toxicity probability (0-1 scale)
4. **Flash Crash Thresholds**: Uses 0.2 raw VPIN and 0.9 CDF as critical levels
5. **Order Flow Imbalance**: Direct visualization of buy-sell pressure per bucket

### References:
- Easley, López de Prado, O'Hara (2012) "Flow Toxicity and Liquidity in a High-frequency World"
- López de Prado (2018) "Advances in Financial Machine Learning"