In [1]:
import polars as pl
import glob
import os

stocks_df = pl.read_parquet(glob.glob(os.path.join("test_data/stock_agg_month", "**", "*.parquet"), recursive=True))
stocks_df.head()

ticker,volume,open,close,high,low,window_start,transactions
str,i64,f64,f64,f64,f64,i64,i64
"""A""",2310830,135.0,134.69,136.01,134.29,1719288000000000000,32890
"""AA""",3066249,40.1,39.94,40.19,39.17,1719288000000000000,33059
"""AAA""",3046,25.1554,25.15,25.17,25.12,1719288000000000000,47
"""AAAU""",1266888,23.0,22.95,23.03,22.91,1719288000000000000,2190
"""AACG""",6136,0.824,0.8345,0.8345,0.824,1719288000000000000,77


In [2]:
# Calculate realized volatility using Garman-Klass estimator
import numpy as np

# Set the lookback window for RV rank calculation (in days)
rv_rank_window = 20  # You can change this to any number of days

stocks_df_with_rv = stocks_df.with_columns([
    # Garman-Klass realized volatility estimator (daily)
    (
        0.5 * (pl.col("high").log() - pl.col("low").log()) ** 2 -
        (2 * np.log(2) - 1) * (pl.col("close").log() - pl.col("open").log()) ** 2
    ).alias("daily_realized_volatility"),
    
    # Annualized realized volatility (multiply by sqrt(252) for trading days)
    (
        (0.5 * (pl.col("high").log() - pl.col("low").log()) ** 2 -
         (2 * np.log(2) - 1) * (pl.col("close").log() - pl.col("open").log()) ** 2) * 252
    ).sqrt().alias("annualized_realized_volatility")
]).sort(['ticker', 'window_start']).with_columns([
    # Calculate rolling RV rank over specified window
    # Shows percentile rank of current RV vs last N days for each ticker
    (pl.col("annualized_realized_volatility")
     .rank(method="average")
     .over(pl.col("ticker"), order_by=pl.col("window_start"), mapping_strategy="sliding_window")
     .rolling_map(
         lambda s: ((s.tail(1).item() - 1) / max(len(s) - 1, 1) * 100) if len(s) > 1 else 50.0,
         window_size=rv_rank_window,
         min_periods=2
     )
    ).alias(f"rv_rank_{rv_rank_window}d_percentile")
])

# Alternative simpler approach using rolling quantile
stocks_df_with_rv = stocks_df_with_rv.with_columns([
    # Rolling RV rank using quantile method
    (pl.col("annualized_realized_volatility")
     .rolling_quantile(0.5, window_size=rv_rank_window, min_periods=2)
     .over(pl.col("ticker"), order_by=pl.col("window_start"))
    ).alias(f"rv_median_{rv_rank_window}d"),
    
    # Calculate percentile rank manually
    (pl.when(pl.col("annualized_realized_volatility") > 
             pl.col("annualized_realized_volatility")
             .rolling_quantile(0.5, window_size=rv_rank_window, min_periods=2)
             .over(pl.col("ticker"), order_by=pl.col("window_start")))
     .then(75.0)  # Above median
     .when(pl.col("annualized_realized_volatility") < 
           pl.col("annualized_realized_volatility")
           .rolling_quantile(0.5, window_size=rv_rank_window, min_periods=2)
           .over(pl.col("ticker"), order_by=pl.col("window_start")))
     .then(25.0)  # Below median
     .otherwise(50.0)  # At median
    ).alias(f"rv_rank_{rv_rank_window}d_simple")
])

# Check SPY volatility and rank
spy_vol = stocks_df_with_rv.filter(pl.col('ticker') == 'SPY').select([
    'ticker', 'window_start', 'annualized_realized_volatility', 
    f'rv_rank_{rv_rank_window}d_percentile', f'rv_rank_{rv_rank_window}d_simple'
]).sort('window_start')

print(f"SPY volatility and {rv_rank_window}-day rolling RV rank:")
print(spy_vol)

print(f"\nRV Rank ({rv_rank_window}-day window) distribution for all tickers:")
print(stocks_df_with_rv[f'rv_rank_{rv_rank_window}d_simple'].describe())

stocks_df_with_rv.head(10)

ValueError: `mapping_strategy` must be one of {'group_to_rows', 'join', 'explode'}, got sliding_window