![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [None]:
from AlgorithmImports import *
import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt

qb = QuantBook()

In [3]:
spy = qb.add_equity("SPY", Resolution.DAILY).symbol
history = qb.history(spy, datetime(2019, 1, 1), datetime(2024, 1, 1))

history.head()

In [5]:
closes = history['close'].droplevel(0)
returns = np.log(closes / closes.shift(1)).dropna()

print(f"\nReturns statistics:")
print(f"  Mean:     {returns.mean():.6f}")
print(f"  Std:      {returns.std():.6f}")
print(f"  Skew:     {returns.skew():.4f}")
print(f"  Kurtosis: {returns.kurtosis():.4f}")  # Should be > 3

In [6]:
def estimate_hurst(series, min_window=10, max_window=None):
    if isinstance(series, pd.Series):
        series = series.values
    series = np.asarray(series, dtype=np.float64)
    
    n = len(series)
    if n < 4 * min_window:
        return 0.5
    
    if max_window is None:
        max_window = n // 4
    
    mean = np.mean(series)
    y = np.cumsum(series - mean)
    
    window_sizes = []
    fluctuations = []
    
    for window in range(min_window, max_window + 1):
        n_segments = n // window
        if n_segments < 2:
            continue
            
        f2 = [] 
        for i in range(n_segments):
            start = i * window
            end = start + window
            segment = y[start:end]
            
            # Fit linear trend
            x = np.arange(window)
            coeffs = np.polyfit(x, segment, 1)
            trend = np.polyval(coeffs, x)
            
            # Compute variance of detrended segment
            detrended = segment - trend
            f2.append(np.mean(detrended ** 2))
        
        # RMS fluctuation
        fluctuation = np.sqrt(np.mean(f2))
        
        window_sizes.append(window)
        fluctuations.append(fluctuation)
    
    if len(window_sizes) < 5:
        return 0.5
    
    log_windows = np.log(window_sizes)
    log_fluct = np.log(fluctuations)
    
    slope, _, r_value, _, _ = stats.linregress(log_windows, log_fluct)
    return float(np.clip(slope, 0.0, 1.0))

In [7]:
print("HURST EXPONENT VALIDATION")
print("=" * 60)
print(f"{'Method':<25} {'Random Walk':<15} {'Trending':<15} {'Mean-Rev':<15}")
print("-" * 60)

np.random.seed(42)
n_samples = 2000

random_walk = np.random.randn(n_samples)


persistent = []
x = 0
for i in range(n_samples):
    x = 0.3 * x + np.random.randn()
    persistent.append(x)
persistent = np.cumsum(persistent)  # Integrate to get FBM-like series
persistent_returns = np.diff(persistent)

mean_rev = []
x = 0
for i in range(n_samples):
    x = -0.3 * x + np.random.randn()  # Anti-persistence
    mean_rev.append(x)
mean_rev_returns = np.array(mean_rev)

h_rs_random = estimate_hurst(random_walk)
h_rs_trend = estimate_hurst(persistent_returns)
h_rs_meanrev = estimate_hurst(mean_rev_returns)

print(f"{'R/S (DFA)':<25} {h_rs_random:<15.3f} {h_rs_trend:<15.3f} {h_rs_meanrev:<15.3f}")

print("-" * 60)
print(f"{'Expected':<25} {'~0.50':<15} {'>0.50':<15} {'<0.50':<15}")

In [None]:
print("\nREAL MARKET DATA (SPY)")
print("=" * 40)

h_dfa_spy = estimate_hurst(returns.values)
print(f"DFA:              H = {h_dfa_spy:.3f}")

In [10]:
# =============================================================================
# CELL 1: Hill Estimator Implementation
# =============================================================================

def estimate_tail_index(returns, threshold_pct=10, min_exceedances=20):
    """Estimate tail index α using the Hill estimator.
    
    The tail index α characterizes the heaviness of distribution tails:
    - α = 2: Gaussian (thin tails)
    - α < 2: Fat tails (Mandelbrot regime)
    - α ≈ 1.7: Typical for financial returns (Mandelbrot's finding)
    - α < 1: Extremely fat tails (infinite mean)
    
    Lower α = fatter tails = more extreme events than Gaussian predicts.
    
    Args:
        returns: Array of returns (will use absolute values)
        threshold_pct: Percentage of largest observations to use (default 10%)
        min_exceedances: Minimum number of tail observations required
        
    Returns:
        Tuple of (alpha, standard_error, n_exceedances)
    """
    if isinstance(returns, pd.Series):
        returns = returns.values
    returns = np.asarray(returns, dtype=np.float64)
    
    # Use absolute returns (we care about tail magnitude, not direction)
    abs_returns = np.abs(returns)
    abs_returns = abs_returns[~np.isnan(abs_returns)]
    abs_returns = abs_returns[abs_returns > 1e-10]  # Remove zeros
    
    n = len(abs_returns)
    if n < min_exceedances * 2:
        return 2.0, np.nan, 0  # Default to Gaussian if insufficient data
    
    # Number of tail observations to use
    k = max(min_exceedances, int(n * threshold_pct / 100))
    k = min(k, n - 1)  # Can't use all observations
    
    # Sort in descending order and get the k largest
    sorted_returns = np.sort(abs_returns)[::-1]
    tail_observations = sorted_returns[:k]
    threshold = sorted_returns[k]  # The k-th largest value
    
    if threshold <= 0:
        return 2.0, np.nan, 0
    
    # Hill estimator: α = k / Σ log(X_i / X_k)
    log_ratios = np.log(tail_observations / threshold)
    
    # Remove any infinite or nan values
    log_ratios = log_ratios[np.isfinite(log_ratios)]
    log_ratios = log_ratios[log_ratios > 0]  # Must be positive
    
    if len(log_ratios) < min_exceedances:
        return 2.0, np.nan, len(log_ratios)
    
    # Hill estimate
    alpha = len(log_ratios) / np.sum(log_ratios)
    
    # Standard error (asymptotic)
    se = alpha / np.sqrt(len(log_ratios))
    
    # Clip to reasonable range
    alpha = float(np.clip(alpha, 0.5, 5.0))
    
    return alpha, se, len(log_ratios)


def estimate_tail_index_both_tails(returns, threshold_pct=10, min_exceedances=20):
    """Estimate tail index separately for left and right tails.
    
    Returns:
        Dictionary with 'left' (negative returns), 'right' (positive returns), 
        and 'combined' tail indices.
    """
    if isinstance(returns, pd.Series):
        returns = returns.values
    returns = np.asarray(returns, dtype=np.float64)
    returns = returns[~np.isnan(returns)]
    
    # Split into positive and negative
    positive_returns = returns[returns > 0]
    negative_returns = -returns[returns < 0]  # Make positive for Hill estimator
    
    alpha_right, se_right, n_right = estimate_tail_index(
        positive_returns, threshold_pct, min_exceedances
    )
    alpha_left, se_left, n_left = estimate_tail_index(
        negative_returns, threshold_pct, min_exceedances
    )
    alpha_combined, se_combined, n_combined = estimate_tail_index(
        returns, threshold_pct, min_exceedances
    )
    
    return {
        'left': {'alpha': alpha_left, 'se': se_left, 'n': n_left},
        'right': {'alpha': alpha_right, 'se': se_right, 'n': n_right},
        'combined': {'alpha': alpha_combined, 'se': se_combined, 'n': n_combined}
    }

In [None]:
# =============================================================================
# CELL 2: Generate Synthetic Test Distributions
# =============================================================================
np.random.seed(42)
n_samples = 10000  # Large sample for accurate estimation

# Distribution 1: Gaussian (α should be high, ~3-4 or higher)
# Note: Gaussian technically has α = ∞, but Hill estimator gives finite values
gaussian_samples = np.random.randn(n_samples) * 0.01

# Distribution 2: Student's t with df=3 (α should be ≈ 3)
from scipy.stats import t as t_dist
t3_samples = t_dist.rvs(df=3, size=n_samples) * 0.01

# Distribution 3: Student's t with df=2 (α should be ≈ 2)
t2_samples = t_dist.rvs(df=2, size=n_samples) * 0.01

# Distribution 4: Pareto-like / Power law with α = 1.5 (very fat tails)
# Using inverse transform: if U ~ Uniform(0,1), then X = u^(-1/α) follows Pareto
def generate_pareto_tails(n, alpha_true):
    """Generate samples with Pareto tails (power law)."""
    u = np.random.uniform(0.01, 1, n)  # Avoid zero
    signs = np.random.choice([-1, 1], n)
    return signs * (u ** (-1 / alpha_true)) * 0.01

pareto_15_samples = generate_pareto_tails(n_samples, alpha_true=1.5)
pareto_20_samples = generate_pareto_tails(n_samples, alpha_true=2.0)
pareto_25_samples = generate_pareto_tails(n_samples, alpha_true=2.5)

print("Synthetic distributions generated:")
print(f"  Gaussian:    n={n_samples}, expected α > 3")
print(f"  Student t3:  n={n_samples}, expected α ≈ 3.0")
print(f"  Student t2:  n={n_samples}, expected α ≈ 2.0")
print(f"  Pareto 1.5:  n={n_samples}, expected α ≈ 1.5")
print(f"  Pareto 2.0:  n={n_samples}, expected α ≈ 2.0")
print(f"  Pareto 2.5:  n={n_samples}, expected α ≈ 2.5")

In [12]:
# =============================================================================
# CELL 3: Validate Hill Estimator on Synthetic Data
# =============================================================================
print("HILL ESTIMATOR VALIDATION")
print("=" * 70)
print(f"{'Distribution':<20} {'Expected α':<12} {'Estimated α':<12} {'Std Err':<10} {'Status'}")
print("-" * 70)

test_cases = [
    ("Gaussian", gaussian_samples, ">3.0"),
    ("Student t (df=3)", t3_samples, "≈3.0"),
    ("Student t (df=2)", t2_samples, "≈2.0"),
    ("Pareto α=1.5", pareto_15_samples, "≈1.5"),
    ("Pareto α=2.0", pareto_20_samples, "≈2.0"),
    ("Pareto α=2.5", pareto_25_samples, "≈2.5"),
]

for name, samples, expected in test_cases:
    alpha, se, n_exc = estimate_tail_index(samples, threshold_pct=10)
    
    # Determine if estimate is reasonable
    if ">" in expected:
        status = "✓" if alpha > 3.0 else "✗"
    elif "1.5" in expected:
        status = "✓" if 1.2 < alpha < 1.8 else "✗"
    elif "2.0" in expected:
        status = "✓" if 1.7 < alpha < 2.3 else "✗"
    elif "2.5" in expected:
        status = "✓" if 2.2 < alpha < 2.8 else "✗"
    elif "3.0" in expected:
        status = "✓" if 2.7 < alpha < 3.5 else "✗"
    else:
        status = "?"
    
    print(f"{name:<20} {expected:<12} {alpha:<12.3f} {se:<10.3f} {status}")

print("-" * 70)

In [None]:
# =============================================================================
# CELL 4: Sensitivity Analysis - Effect of threshold_pct
# =============================================================================
print("\nSENSITIVITY TO THRESHOLD PERCENTAGE")
print("=" * 70)
print("Testing Pareto α=2.0 distribution with different threshold percentages:")
print("-" * 70)
print(f"{'Threshold %':<15} {'Estimated α':<15} {'Std Error':<15} {'N exceedances'}")
print("-" * 70)

for thresh in [5, 10, 15, 20, 25, 30]:
    alpha, se, n_exc = estimate_tail_index(pareto_20_samples, threshold_pct=thresh)
    print(f"{thresh:<15} {alpha:<15.3f} {se:<15.3f} {n_exc}")

print("-" * 70)
print("Note: Lower threshold % uses fewer extreme observations (less bias, more variance)")
print("      Higher threshold % uses more observations (more bias, less variance)")