In [82]:
import polars as pl

import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import polars.exceptions as pl_exc

from typing import Dict, List, Tuple, Optional, Union, Callable
import pyarrow
import altair


In [83]:
data = pl.read_csv("../data/binance_perp_daily.csv")

data = data.rename({"ticker": "symbol"})


In [84]:
data = data.drop_nulls()

In [None]:


class VolumeFilter:
    """
    Filters assets based on rolling dollar volume to focus on the most liquid assets.
    """
    
    def __init__(self,
                 window_size: int = 30,
                 min_volume_decile: int = 3,  
                 date_col: str = "date",
                 symbol_col: str = "ticker"):
        """
        Initialize the volume filter.
        
        Parameters:
        -----------
        window_size: Rolling window size for volume calculations (in days)
        min_volume_decile: Minimum volume decile to include in universe (1-10, with 10 being highest volume)
        date_col: Name of the date column
        symbol_col: Name of the symbol/ticker column
        """
        self.window_size = window_size
        self.min_volume_decile = min_volume_decile
        self.date_col = date_col
        self.symbol_col = symbol_col
    
    def filter_by_volume(self, 
                        df: pl.DataFrame,
                        dollar_volume_col: Optional[str] = None,
                        volume_col: Optional[str] = "volume",
                        price_col: Optional[str] = "close") -> pl.DataFrame:
        """
        Filter assets by rolling dollar volume, keeping only those above minimum decile.
        
        Parameters:
        -----------
        df: DataFrame containing market data
        dollar_volume_col: Name of the column containing dollar volume (if already provided)
        volume_col: Name of the column containing volume (used if dollar_volume_col not provided)
        price_col: Name of the column containing price (used if dollar_volume_col not provided)
        
        Returns:
        --------
        DataFrame with all assets, containing 'is_universe' flag column marking high-volume assets
        """
        if df[self.date_col].dtype != pl.Date:
            df = df.with_columns(
                pl.col(self.date_col).str.to_date().alias(self.date_col)
            )
        
        df = df.sort([self.date_col, self.symbol_col])
        
        if dollar_volume_col and dollar_volume_col in df.columns:
            df = df.with_columns(
                pl.col(dollar_volume_col).alias("dollar_volume")
            )
        else:
            df = df.with_columns(
                (pl.col(price_col) * pl.col(volume_col)).alias("dollar_volume")
            )
        
        df = df.with_columns(
            pl.col("dollar_volume")
            .rolling_mean(window_size=self.window_size, min_periods=1)
            .over(self.symbol_col)
            .alias("trail_volume")
        )
        
        def calc_decile(group_df):
            n_assets = group_df.height
            
            if n_assets == 0:
                return group_df
            

            group_df = group_df.with_columns(
                pl.col("trail_volume").rank(descending=True).alias("rank")
            )
            
            group_df = group_df.with_columns(
                (10 - (pl.col("rank") - 1) * 10 // n_assets).clip(lower_bound=1).alias("volume_decile")
            )
            
            group_df = group_df.with_columns(
                (pl.col("volume_decile") >= self.min_volume_decile).alias("is_universe")
            )
            
            return group_df

        result = df.group_by(self.date_col).map_groups(calc_decile)
        
        return result
    
    def get_universe(self, df: pl.DataFrame) -> pl.DataFrame:
        """
        Return only the assets that are in the universe.
        
        Parameters:
        -----------
        df: DataFrame with 'is_universe' column
        
        Returns:
        --------
        Filtered DataFrame with only universe assets
        """
        if "is_universe" not in df.columns:
            raise ValueError("DataFrame does not have 'is_universe' column. Run filter_by_volume first.")
        
        return df.filter(pl.col("is_universe"))
    
    def plot_universe_size(self, filtered_df: pl.DataFrame):
        """
        Plot the number of securities in the universe over time.
        
        Parameters:
        -----------
        filtered_df: DataFrame with 'is_universe' column
        """
        universe_counts = (
            filtered_df.group_by([self.date_col, "is_universe"])
            .agg(pl.count().alias("count"))
            .sort(self.date_col)
        )
        
        universe_counts_pd = universe_counts.to_pandas()
        
        plt.figure(figsize=(12, 6))
        
        for universe_flag, color in [(True, 'blue'), (False, 'gray')]:
            subset = universe_counts_pd[universe_counts_pd['is_universe'] == universe_flag]
            if not subset.empty:
                plt.plot(
                    subset[self.date_col], 
                    subset['count'], 
                    label=f"In Universe: {universe_flag}", 
                    color=color
                )
        
        plt.title('Universe Size Over Time')
        plt.xlabel('Date')
        plt.ylabel('Number of Securities')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        
        return plt, universe_counts
    

    

In [132]:



volume_filter = VolumeFilter(
    window_size=30,     
    min_volume_decile=9,  
    date_col="date",
    symbol_col="symbol"
)

filtered_df = volume_filter.filter_by_volume(
    df=data,
    dollar_volume_col="dollar_volume"  
)

# plt, universe_counts = volume_filter.plot_universe_size(filtered_df)
# plt.show()

universe_only = volume_filter.get_universe(filtered_df)

  .rolling_mean(window_size=self.window_size, min_periods=1)


In [None]:



class CryptoCarryFactor:
    """
    Calculates cross-sectional carry factor scores based on cryptocurrency funding rates.
    """
    
    def __init__(self,
                date_col: str = "date",
                symbol_col: str = "ticker",
                winsor_factor: float = 0.05,
                standardize: bool = True):
        """
        Initialize the crypto carry factor calculator.
        
        Parameters:
        -----------
        date_col: Name of the date column
        symbol_col: Name of the symbol column
        winsor_factor: Factor for winsorizing values (caps/floors at specified percentiles)
        standardize: Whether to standardize the factor scores cross-sectionally
        """
        self.date_col = date_col
        self.symbol_col = symbol_col
        self.winsor_factor = winsor_factor
        self.standardize = standardize
    
    def _winsorize(self, df: pl.DataFrame, col: str) -> pl.DataFrame:
        """Winsorize values at specified percentiles."""
        bounds = (
            df.group_by(self.date_col)
            .agg([
                pl.col(col).quantile(self.winsor_factor).alias("lower_bound"),
                pl.col(col).quantile(1 - self.winsor_factor).alias("upper_bound")
            ])
        )
        
        df_with_bounds = df.join(
            bounds, 
            on=self.date_col, 
            how="left"
        )
        
        df_winsorized = df_with_bounds.with_columns(
            pl.col(col).clip(
                pl.col("lower_bound"), 
                pl.col("upper_bound")
            ).alias(f"{col}_winsorized")
        )
        
        return df_winsorized.drop(["lower_bound", "upper_bound"])
    
    def _standardize(self, df: pl.DataFrame, col: str) -> pl.DataFrame:
        """Standardize values cross-sectionally by date."""
        stats = (
            df.group_by(self.date_col)
            .agg([
                pl.col(col).mean().alias("mean"),
                pl.col(col).std().alias("std")
            ])
        )
        
        df_with_stats = df.join(
            stats, 
            on=self.date_col, 
            how="left"
        )
        
        df_standardized = df_with_stats.with_columns(
            ((pl.col(col) - pl.col("mean")) / (pl.col("std") + 1e-8))
            .alias(f"{col}_standardized")
        )
        
        return df_standardized.drop(["mean", "std"])
    
    def compute_from_funding_rates(self,
                                 df: pl.DataFrame,
                                 funding_rate_col: str = "funding_rate",
                                 annualize: bool = True,
                                 periods_per_year: int = 365,  # For daily funding rates
                                 output_col: str = "carry_score") -> pl.DataFrame:
        """
        Compute carry factor from daily funding rates for cryptocurrencies.
        
        Parameters:
        -----------
        df: DataFrame containing funding rate data
        funding_rate_col: Name of the column containing funding rates
        annualize: Whether to annualize the funding rates
        periods_per_year: Number of funding periods per year
        output_col: Name for the output carry score column
        
        Returns:
        --------
        DataFrame with carry scores
        """
        # Validate input
        if funding_rate_col not in df.columns:
            raise ValueError(f"Funding rate column '{funding_rate_col}' not found")
        
        if df[self.date_col].dtype != pl.Date:
            df = df.with_columns(
                pl.col(self.date_col).str.to_date().alias(self.date_col)
            )
        
        result_df = df.clone()
        
        if annualize:
            result_df = result_df.with_columns(
                (pl.col(funding_rate_col) * periods_per_year).alias("funding_annual")
            )
            funding_col = "funding_annual"
        else:
            funding_col = funding_rate_col
        
        # Negate funding rate (negative funding = positive carry)
        # In crypto, when funding rate is negative, longs pay shorts, so shorts have positive carry
        result_df = result_df.with_columns(
            (-pl.col(funding_col)).alias("carry_raw")
        )
        
        result_df = self._winsorize(result_df, "carry_raw")
        
        if self.standardize:
            result_df = self._standardize(result_df, "carry_raw_winsorized")
            final_col = "carry_raw_winsorized_standardized"
        else:
            final_col = "carry_raw_winsorized"
        
        result_df = result_df.with_columns(
            pl.col(final_col).alias(output_col)
        )
        
        cols_to_keep = [self.date_col, self.symbol_col, funding_rate_col, output_col]
        if annualize:
            cols_to_keep.append("funding_annual")
            
        return result_df.select(cols_to_keep)



def assign_deciles(df: pl.DataFrame, 
                  score_col: str = "carry_score", 
                  date_col: str = "date",
                  output_col: str = "decile_carry") -> pl.DataFrame:
    """
    Assign deciles (1-10) to standardized scores within each date.
    
    Parameters:
    -----------
    df: DataFrame with scores
    score_col: Name of the column containing scores
    date_col: Name of the date column
    output_col: Name for the output decile column
    
    Returns:
    --------
    DataFrame with deciles added
    """
    
    def calc_deciles(group: pl.DataFrame) -> pl.DataFrame:
        n_assets = group.height
        if n_assets < 2:
            return group.with_columns(pl.lit(None).cast(pl.Int32).alias(output_col))
        
        group = group.with_columns(
            pl.col(score_col).rank(descending=True).alias("temp_rank")
        )
        
        group = group.with_columns(
            ((pl.col("temp_rank") - 1) * 10 // n_assets + 1).cast(pl.Int32).alias(output_col)
        )
        
        return group.drop("temp_rank")
    
    result = df.group_by(date_col).map_groups(calc_deciles)
    
    return result



In [89]:


carry_factor = CryptoCarryFactor(
    date_col="date",
    symbol_col="symbol",
    winsor_factor=0.05,
    standardize=True
)

carry_scores = carry_factor.compute_from_funding_rates(
    df=filtered_df,
    funding_rate_col="funding_rate", 
    annualize=False,
    output_col="carry_score"
)

carry_scores = assign_deciles(
    df=carry_scores,
    score_col="carry_score",
    date_col="date",
    output_col="decile_carry"
)


In [90]:
carry_scores = carry_scores.drop_nulls()

In [91]:
print(carry_scores)

shape: (187_174, 5)
┌────────────┬──────────┬──────────────┬─────────────┬──────────────┐
│ date       ┆ symbol   ┆ funding_rate ┆ carry_score ┆ decile_carry │
│ ---        ┆ ---      ┆ ---          ┆ ---         ┆ ---          │
│ date       ┆ str      ┆ f64          ┆ f64         ┆ i32          │
╞════════════╪══════════╪══════════════╪═════════════╪══════════════╡
│ 2019-11-27 ┆ BTCUSDT  ┆ 0.000238     ┆ -0.707084   ┆ 6            │
│ 2019-11-27 ┆ ETHUSDT  ┆ -0.0002      ┆ 0.707084    ┆ 1            │
│ 2019-11-28 ┆ BTCUSDT  ┆ -0.0003      ┆ 0.707073    ┆ 1            │
│ 2019-11-28 ┆ ETHUSDT  ┆ -0.000008    ┆ -0.707073   ┆ 6            │
│ 2019-11-29 ┆ BTCUSDT  ┆ -0.0003      ┆ 0.707094    ┆ 1            │
│ …          ┆ …        ┆ …            ┆ …           ┆ …            │
│ 2024-02-13 ┆ ZECUSDT  ┆ -0.000775    ┆ 0.524675    ┆ 4            │
│ 2024-02-13 ┆ ZENUSDT  ┆ -0.000846    ┆ 0.735833    ┆ 3            │
│ 2024-02-13 ┆ ZETAUSDT ┆ -0.000784    ┆ 0.551827    ┆ 3            │


In [None]:
import polars as pl
from typing import Optional, Sequence, Union

def carry_decile_to_target_sigmoid(
    df:            pl.DataFrame,
    decile_col:    str,
    keep_deciles:  Optional[Sequence[int]] = None,
    k:             float = 4.0,
    x_offset:      float = 0.0,
    y_min:         float = -0.85,
    y_max:         float =  0.85,
    date_col:      str = "date",
    symbol_col:    str = "symbol",
    return_lazy:   bool = False,
) -> Union[pl.DataFrame, pl.LazyFrame]:
    """Filter optional deciles, linearly map them into alpha∈[-1,1],
       then logistic→target_position∈[y_min,y_max]."""

    lf = df.lazy()

    # 1) Filter / pull lo/hi
    if keep_deciles is not None:
        lf = lf.filter(pl.col(decile_col).is_in(keep_deciles))
        lo, hi = float(min(keep_deciles)), float(max(keep_deciles))
    else:
        lohi = (
          lf
          .select([
            pl.col(decile_col).min().alias("lo"),
            pl.col(decile_col).max().alias("hi"),
          ])
          .collect()
        )
        lo, hi = float(lohi["lo"][0]), float(lohi["hi"][0])

    if hi == lo:
        raise ValueError(f"decile range zero: lo=hi={lo}")

    # 2) Build the alpha expression
    #    α = 2*(decile-lo)/(hi-lo) - 1   ⇒  decile=lo→-1, decile=hi→+1
    alpha_expr = (
      (pl.col(decile_col).cast(pl.Float64) - lo)
      / (hi - lo) * 2.0
      - 1.0
    ).alias("alpha")

    # 3) Build the target_position expression using the previously
    #    defined "alpha" column .
    target_expr = (
      pl.lit(y_min)
      + (y_max - y_min)
        * (1.0 / (1.0 + ( -k * (pl.col("alpha") - x_offset) ).exp()))
    ).alias("target_position")

    out = (
      lf
      .with_columns([alpha_expr])       
      .with_columns([target_expr])      
      .select([date_col, symbol_col, decile_col, "alpha", "target_position"])
    )

    return out if return_lazy else out.collect()

In [93]:

sig = carry_decile_to_target_sigmoid(
    carry_scores,
    k=1.0,
    y_min=-0.90,
    y_max=0.90,
    decile_col="decile_carry"
)
print(sig)

shape: (187_174, 5)
┌────────────┬──────────┬──────────────┬───────────┬─────────────────┐
│ date       ┆ symbol   ┆ decile_carry ┆ alpha     ┆ target_position │
│ ---        ┆ ---      ┆ ---          ┆ ---       ┆ ---             │
│ date       ┆ str      ┆ i32          ┆ f64       ┆ f64             │
╞════════════╪══════════╪══════════════╪═══════════╪═════════════════╡
│ 2019-11-27 ┆ BTCUSDT  ┆ 6            ┆ 0.111111  ┆ 0.049949        │
│ 2019-11-27 ┆ ETHUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ 2019-11-28 ┆ BTCUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ 2019-11-28 ┆ ETHUSDT  ┆ 6            ┆ 0.111111  ┆ 0.049949        │
│ 2019-11-29 ┆ BTCUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ …          ┆ …        ┆ …            ┆ …         ┆ …               │
│ 2024-02-13 ┆ ZECUSDT  ┆ 4            ┆ -0.333333 ┆ -0.148626       │
│ 2024-02-13 ┆ ZENUSDT  ┆ 3            ┆ -0.555556 ┆ -0.243762       │
│ 2024-02-13 ┆ ZETAUSDT ┆ 3            ┆ -0.555556 ┆ -0.2

In [94]:
sig = sig.drop_nans()
print(sig)

shape: (187_174, 5)
┌────────────┬──────────┬──────────────┬───────────┬─────────────────┐
│ date       ┆ symbol   ┆ decile_carry ┆ alpha     ┆ target_position │
│ ---        ┆ ---      ┆ ---          ┆ ---       ┆ ---             │
│ date       ┆ str      ┆ i32          ┆ f64       ┆ f64             │
╞════════════╪══════════╪══════════════╪═══════════╪═════════════════╡
│ 2019-11-27 ┆ BTCUSDT  ┆ 6            ┆ 0.111111  ┆ 0.049949        │
│ 2019-11-27 ┆ ETHUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ 2019-11-28 ┆ BTCUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ 2019-11-28 ┆ ETHUSDT  ┆ 6            ┆ 0.111111  ┆ 0.049949        │
│ 2019-11-29 ┆ BTCUSDT  ┆ 1            ┆ -1.0      ┆ -0.415905       │
│ …          ┆ …        ┆ …            ┆ …         ┆ …               │
│ 2024-02-13 ┆ ZECUSDT  ┆ 4            ┆ -0.333333 ┆ -0.148626       │
│ 2024-02-13 ┆ ZENUSDT  ┆ 3            ┆ -0.555556 ┆ -0.243762       │
│ 2024-02-13 ┆ ZETAUSDT ┆ 3            ┆ -0.555556 ┆ -0.2

In [None]:
import polars as pl
import numpy as np

class EMARealizedVol:
    """
    σ̂_t = sqrt( w·v_s[t−1] + (1−w)·v_l[t−1] )
    
    where
      v_s = EMA(span=S) of r²,
      v_l = EMA(span=L) of r²,
      w   = blend_weight in [0,1].

    Uses Polars’ ewm_mean under the hood.
    """

    def __init__(self,
                 short_span: int     = 30,
                 long_span: int      = 120,
                 blend_weight: float = 0.5,
                 annualise: bool     = False,
                 freq: int           = 365):
        assert 0 < short_span < long_span
        assert 0.0 <= blend_weight <= 1.0
        self.S = short_span
        self.L = long_span
        self.w = blend_weight
        self.ann = annualise
        self.freq = freq

    def add_vol(self,
                df: pl.DataFrame,
                ret_col: str = "log_returns",
                by: list[str] = ["symbol"]
               ) -> pl.DataFrame:
        """
        Input:
          df with at least [date, symbol, <ret_col>]
        Output:
          same DataFrame + a column 'vol_fcst'
        """
        return (
            df
            .sort(by + ["date"])
            .with_columns([
                # squared returns
                (pl.col(ret_col)**2).alias("r2")
            ])
            .with_columns([
                pl.col("r2")
                  .ewm_mean(span=self.S)
                  .over("symbol")
                  .alias("v_s_raw"),
                pl.col("r2")
                  .ewm_mean(span=self.L)
                  .over("symbol")
                  .alias("v_l_raw"),
            ])
            .with_columns([
                pl.col("v_s_raw").shift(1).over("symbol").alias("v_s"),
                pl.col("v_l_raw").shift(1).over("symbol").alias("v_l"),
            ])
            .with_columns([
                ( (self.w * pl.col("v_s") + (1-self.w) * pl.col("v_l"))
                  .sqrt()
                ).alias("vol_fcst_raw")
            ])
            .with_columns([
                (pl.col("vol_fcst_raw") * np.sqrt(self.freq)
                 if self.ann else
                 pl.col("vol_fcst_raw")
                ).alias("vol_fcst")
            ])
            .drop(["r2","v_s_raw","v_l_raw","v_s","v_l","vol_fcst_raw"])
        )



In [None]:

universe_only = universe_only.with_columns(
    (pl.col("close").log() - pl.col("close").shift(1).over("symbol").log()).alias("log_returns")
)

returns = universe_only.select([
    'date','symbol','log_returns','close'
])

In [133]:

volf = EMARealizedVol(
    short_span    = 21,
    long_span     = 90,
    blend_weight  = 0.5,
    annualise     = False
)

rets2 = volf.add_vol(returns, ret_col="log_returns")

rets2 = rets2.with_columns(
    pl.col("log_returns").abs().alias("vol_real")
)

# global_r2 = rets2.select(
#     pl.corr("vol_real","vol_fcst")
# ).item()**2   # corr() gives ρ, so ρ² is R²
# print(f"Global R² = {global_r2:.4f}")

# daily = (
#     rets2
#     .group_by("date")
#     .agg(pl.corr("vol_real","vol_fcst").alias("rho"))
#     .with_columns((pl.col("rho")**2).alias("r2"))
#     .filter(pl.col("r2").is_not_null())
#     .to_pandas()
# )
# print("Mean cross-sec daily R² =", round(daily["r2"].mean(),4))



In [134]:
print(rets2)

shape: (38_135, 6)
┌────────────┬──────────────┬─────────────┬──────────┬──────────┬──────────┐
│ date       ┆ symbol       ┆ log_returns ┆ close    ┆ vol_fcst ┆ vol_real │
│ ---        ┆ ---          ┆ ---         ┆ ---      ┆ ---      ┆ ---      │
│ date       ┆ str          ┆ f64         ┆ f64      ┆ f64      ┆ f64      │
╞════════════╪══════════════╪═════════════╪══════════╪══════════╪══════════╡
│ 2023-12-08 ┆ 1000BONKUSDT ┆ null        ┆ 0.012287 ┆ null     ┆ null     │
│ 2023-12-09 ┆ 1000BONKUSDT ┆ 0.079147    ┆ 0.013299 ┆ null     ┆ 0.079147 │
│ 2023-12-10 ┆ 1000BONKUSDT ┆ -0.14317    ┆ 0.011525 ┆ 0.079147 ┆ 0.14317  │
│ 2023-12-11 ┆ 1000BONKUSDT ┆ -0.003651   ┆ 0.011483 ┆ 0.116576 ┆ 0.003651 │
│ 2023-12-12 ┆ 1000BONKUSDT ┆ 0.071005    ┆ 0.012328 ┆ 0.093774 ┆ 0.071005 │
│ …          ┆ …            ┆ …           ┆ …        ┆ …        ┆ …        │
│ 2023-12-07 ┆ ZRXUSDT      ┆ -0.024267   ┆ 0.4071   ┆ 0.041897 ┆ 0.024267 │
│ 2023-12-08 ┆ ZRXUSDT      ┆ 0.041853    ┆ 0.4245   ┆ 0.

In [99]:
rets2 = rets2.drop_nulls()

In [100]:
universe_only = universe_only.drop_nulls()

In [101]:
print(carry_scores)

shape: (187_174, 5)
┌────────────┬──────────┬──────────────┬─────────────┬──────────────┐
│ date       ┆ symbol   ┆ funding_rate ┆ carry_score ┆ decile_carry │
│ ---        ┆ ---      ┆ ---          ┆ ---         ┆ ---          │
│ date       ┆ str      ┆ f64          ┆ f64         ┆ i32          │
╞════════════╪══════════╪══════════════╪═════════════╪══════════════╡
│ 2019-11-27 ┆ BTCUSDT  ┆ 0.000238     ┆ -0.707084   ┆ 6            │
│ 2019-11-27 ┆ ETHUSDT  ┆ -0.0002      ┆ 0.707084    ┆ 1            │
│ 2019-11-28 ┆ BTCUSDT  ┆ -0.0003      ┆ 0.707073    ┆ 1            │
│ 2019-11-28 ┆ ETHUSDT  ┆ -0.000008    ┆ -0.707073   ┆ 6            │
│ 2019-11-29 ┆ BTCUSDT  ┆ -0.0003      ┆ 0.707094    ┆ 1            │
│ …          ┆ …        ┆ …            ┆ …           ┆ …            │
│ 2024-02-13 ┆ ZECUSDT  ┆ -0.000775    ┆ 0.524675    ┆ 4            │
│ 2024-02-13 ┆ ZENUSDT  ┆ -0.000846    ┆ 0.735833    ┆ 3            │
│ 2024-02-13 ┆ ZETAUSDT ┆ -0.000784    ┆ 0.551827    ┆ 3            │


In [None]:

# we’ll compute mid_price = (high + low)/2, then
# spread = half‐spread fraction = 0.5 * (high − low) / mid_price
returns_with_cost = (
    universe_only
    .with_columns([
        ((pl.col("high") - pl.col("low")) 
         / ((pl.col("high") + pl.col("low")) / 2)  # full-range / mid
        * 0.5                                   # half‐spread
        ).alias("spread_est")
    ])
)

print(returns_with_cost.select(["date","symbol","high","low","spread_est"]).head())

spread_df = returns_with_cost.select([
    "date","symbol","spread_est"
]).rename({"spread_est":"spread"})


shape: (5, 5)
┌────────────┬─────────┬──────────┬──────────┬────────────┐
│ date       ┆ symbol  ┆ high     ┆ low      ┆ spread_est │
│ ---        ┆ ---     ┆ ---      ┆ ---      ┆ ---        │
│ date       ┆ str     ┆ f64      ┆ f64      ┆ f64        │
╞════════════╪═════════╪══════════╪══════════╪════════════╡
│ 2019-09-12 ┆ BTCUSDT ┆ 10365.15 ┆ 9934.11  ┆ 0.021234   │
│ 2019-09-13 ┆ BTCUSDT ┆ 10450.13 ┆ 10239.42 ┆ 0.010184   │
│ 2019-09-14 ┆ BTCUSDT ┆ 10396.4  ┆ 10153.51 ┆ 0.01182    │
│ 2019-09-15 ┆ BTCUSDT ┆ 10419.97 ┆ 10024.81 ┆ 0.019328   │
│ 2019-09-16 ┆ BTCUSDT ┆ 10353.81 ┆ 10115.0  ┆ 0.011667   │
└────────────┴─────────┴──────────┴──────────┴────────────┘


In [109]:
spread_df = spread_df.drop_nulls()
sig = sig.drop_nulls()
rets2 = rets2.drop_nulls()

In [None]:
import numpy as np
import polars as pl

class PortfolioAllocator:
    def __init__(
        self,
        alpha_df:      pl.DataFrame,
        spread_df:     pl.DataFrame,
        rets_df:       pl.DataFrame,
        lambda_risk:   float = 10.0,
        gamma_cost:    float = 1.0,
        max_pos:       float = 0.05,
        gross_cap:     float = 1.0,
        turnover_cap:  float | None = None,
        ignore_open:   bool = True,
        vol_target:    float | None = None   # annual vol target, e.g. 0.10 = 10%
    ):
        """
        alpha_df:    [date,symbol,target_position]
        spread_df:   [date,symbol,spread]
        rets_df:     [date,symbol,vol_fcst,close]
        lambda_risk: risk‐aversion (ℓ)
        gamma_cost:  cost multiplier (γ)
        max_pos:     max abs weight per coin
        gross_cap:   max gross leverage
        turnover_cap: max daily turnover (sum|Δw|) as fraction of NAV
        ignore_open: if True, newly‐seen symbols open to full w_desired
        vol_target:  optional annual volatility target (e.g. 0.10)
        """
        self.lmbd        = lambda_risk
        self.gm          = gamma_cost
        self.pmax        = max_pos
        self.gcap        = gross_cap
        self.tcap        = turnover_cap
        self.ignore_open = ignore_open
        self.vol_target  = vol_target

        self.positions: dict[str, float] = {}

        self.data = (
            alpha_df
            .join(spread_df, on=["date","symbol"], how="inner")
            .join(
                rets_df.select(["date","symbol","vol_fcst","close"]),
                on=["date","symbol"],
                how="inner"
            )
            .sort(["date","symbol"])
        )

    def allocate_day(self, date: str, nav: float = 1.0) -> pl.DataFrame:
        """
        1) w_raw     = target_position / vol_fcst
        2) w_desired = w_raw / sum(|w_raw|)
        3) optional vol‐target scaling
        4) dollar_size = w_desired * nav
           coin_qty    = dollar_size / close

        Returns: [symbol, w_desired, dollar_size, coin_qty]
        """
        df = self.data.filter(pl.col("date") == date)
        if df.is_empty():
            return pl.DataFrame({
                "symbol":      pl.Series([], dtype=pl.Utf8),
                "w_desired":   pl.Series([], dtype=pl.Float64),
                "dollar_size": pl.Series([], dtype=pl.Float64),
                "coin_qty":    pl.Series([], dtype=pl.Float64),
            })

        df = df.with_columns((pl.col("target_position") / pl.col("vol_fcst")).alias("w_raw"))

        df = df.with_columns(pl.col("w_raw").abs().sum().over("date").alias("abs_sum"))
        df = df.with_columns((pl.col("w_raw") / pl.col("abs_sum")).alias("w_desired"))

        if self.vol_target is not None:
            df = df.with_columns((pl.col("w_desired")**2 * pl.col("vol_fcst")**2).alias("var_i"))
            daily_var = df.select(pl.col("var_i").sum()).item()
            daily_vol = np.sqrt(daily_var)
            scale = (self.vol_target / np.sqrt(365)) / daily_vol
            df = df.with_columns((pl.col("w_desired") * scale).alias("w_desired"))

        df = df.with_columns([
            (pl.col("w_desired") * nav).alias("dollar_size"),
            (pl.col("w_desired") * nav / pl.col("close")).alias("coin_qty"),
        ])

        return df.select(["symbol","w_desired","dollar_size","coin_qty"])

    def rebalance_day(self, date: str, nav: float = 1.0) -> pl.DataFrame:
        """
        1) Fetch today's universe + yesterday's x_prev
        2) Attach w_desired from allocate_day()
        3) buffer = γ·spread / (ℓ·vol_fcst²), delta = w_desired - x_prev
        4) trade   = sign(delta)·( |delta| - buffer ) except
               • zero if |delta| ≤ buffer
               • open full if is_new & ignore_open
        5) x_new_pre = x_prev + trade
        6) clip per‐asset |x_new_pre| ≤ max_pos
        7) clip gross leverage ≤ gross_cap
        8) trade_final = x_new - x_prev
        9) enforce turnover cap: if Σ|trade_final| > tcap, scale all trades
       10) dollar_pos, coin_qty
       11) update self.positions → x_new

       Returns a DataFrame with:
         [symbol, x_prev, w_desired,
          buffer, delta, trade,
          x_new, trade_final,
          dollar_pos, coin_qty]
        """
        df0 = self.data.filter(pl.col("date") == date)

        if not self.positions:
            prev_df = pl.DataFrame(schema={"symbol": pl.Utf8, "x_prev": pl.Float64})
        else:
            prev_df = (
                pl.DataFrame({
                    "symbol": list(self.positions.keys()),
                    "x_prev": list(self.positions.values())
                })
                .with_columns([
                    pl.col("symbol").cast(pl.Utf8),
                    pl.col("x_prev").cast(pl.Float64)
                ])
            )

        df = (
            df0
            .join(prev_df, on="symbol", how="left")
            .with_columns([
                pl.col("x_prev").fill_null(0.0),
                pl.col("x_prev").is_null().alias("is_new")
            ])
        )
        if df.is_empty():
            return pl.DataFrame({
                "symbol":        pl.Series([], dtype=pl.Utf8),
                "x_prev":        pl.Series([], dtype=pl.Float64),
                "w_desired":     pl.Series([], dtype=pl.Float64),
                "buffer":        pl.Series([], dtype=pl.Float64),
                "delta":         pl.Series([], dtype=pl.Float64),
                "trade":         pl.Series([], dtype=pl.Float64),
                "x_new":         pl.Series([], dtype=pl.Float64),
                "trade_final":   pl.Series([], dtype=pl.Float64),
                "dollar_pos":    pl.Series([], dtype=pl.Float64),
                "coin_qty":      pl.Series([], dtype=pl.Float64),
                "vol_fcst":         pl.Series([], dtype=pl.Float64),

            })

        alloc = self.allocate_day(date, nav).select(["symbol","w_desired"])
        df = df.join(alloc, on="symbol", how="left").with_columns(
            pl.col("w_desired").fill_null(0.0)
        )

        df = df.with_columns([
            (
                self.gm * pl.col("spread")
                / (self.lmbd * pl.col("vol_fcst")**2)
            ).alias("buffer"),
            (pl.col("w_desired") - pl.col("x_prev")).alias("delta")
        ])

        df = df.with_columns(
            pl.when(pl.col("is_new") & pl.lit(self.ignore_open))
              .then(pl.col("w_desired"))
            .when((pl.col("delta").abs() <= pl.col("buffer")))
              .then(0.0)
            .otherwise(
              pl.col("delta").sign() * (pl.col("delta").abs() - pl.col("buffer"))
            )
            .alias("trade")
        )

        df = df.with_columns((pl.col("x_prev") + pl.col("trade")).alias("x_new_pre"))

        df = df.with_columns(pl.col("x_new_pre").clip(-self.pmax, self.pmax).alias("x_cap"))

        gross = df.select(pl.col("x_cap").abs().sum()).item()
        if gross > self.gcap and gross > 0:
            scale = self.gcap / gross
            df = df.with_columns((pl.col("x_cap") * scale).alias("x_new"))
        else:
            df = df.with_columns(pl.col("x_cap").alias("x_new"))

        df = df.with_columns((pl.col("x_new") - pl.col("x_prev")).alias("trade_final"))
        if self.tcap is not None:
            tot_turn = df.select(pl.col("trade_final").abs().sum()).item()
            if tot_turn > self.tcap and tot_turn > 0:
                s = self.tcap / tot_turn
                df = df.with_columns([
                    (pl.col("trade_final") * s).alias("trade_final"),
                    (pl.col("x_prev") + pl.col("trade_final")).alias("x_new")
                ])

        df = df.with_columns([
            (pl.col("x_new") * nav).alias("dollar_pos"),
            ((pl.col("x_new") * nav) / pl.col("close")).alias("coin_qty")
        ])

        self.positions = dict(zip(df["symbol"], df["x_new"]))

        return df.select([
            "symbol",
            "x_prev",
            "w_desired",
            "buffer",
            "delta",
            "trade",
            "x_new",
            "trade_final",
            "dollar_pos",
            "coin_qty",
            "vol_fcst"        

        ])

In [None]:
import numpy as np
import polars as pl

class Backtester:
    def __init__(
        self,
        allocator,              
        price_df: pl.DataFrame, # [date, symbol, close]
        trading_dates: list[str],
        cost_per_dollar: float = 0.0  # e.g. 0.0005 for 5bps round‐trip
    ):
        self.alloc    = allocator
        self.price_df = price_df
        self.dates    = trading_dates
        self.cost_per_dollar = cost_per_dollar

    def run(self, init_nav: float = 1.0, verbose: bool = False):
        nav = init_nav
        metrics   = []
        holdings  = []

        for today, tomorrow in zip(self.dates, self.dates[1:]):
            prev_nav = nav

            book = self.alloc.rebalance_day(today, nav)

            turnover_pct    = book.select(pl.col("trade_final").abs().sum()).item()
            turnover_dollar = turnover_pct * nav
            nav -= turnover_dollar * self.cost_per_dollar

            p0 = (
                self.price_df
                    .filter(pl.col("date") == today)
                    .select(["symbol", "close"])
                    .rename({"close":"p0"})
            )
            p1 = (
                self.price_df
                    .filter(pl.col("date") == tomorrow)
                    .select(["symbol", "close"])
                    .rename({"close":"p1"})
            )
            pnl_df = (
                book
                .join(p0, on="symbol")
                .join(p1, on="symbol")
                .with_columns(
                    (pl.col("coin_qty") * (pl.col("p1") - pl.col("p0")))
                        .alias("pnl")
                )
            )
            total_pnl = pnl_df.select(pl.col("pnl").sum()).item()
            nav += total_pnl

            gross_exp = book.select(pl.col("x_new").abs().sum()).item()
            net_exp   = book.select(pl.col("x_new").sum()).item()

            daily_var = book.select(
                (pl.col("x_new")**2 * pl.col("vol_fcst")**2).sum()
            ).item()
            daily_vol = np.sqrt(daily_var)
            ann_vol   = daily_vol * np.sqrt(365)

            metrics.append({
                "date":            today,
                "nav":             nav,
                "daily_return":    (nav - prev_nav) / prev_nav,
                "pnl":             total_pnl,
                "turnover_pct":    turnover_pct,
                "turnover_$":      turnover_dollar,
                "gross_exposure":  gross_exp,
                "net_exposure":    net_exp,
                "vol_forecast_d":  daily_vol,
                "vol_forecast_a":  ann_vol
            })

            for row in book.select(["symbol","x_new","dollar_pos","coin_qty"]).iter_rows(named=True):
                holdings.append({
                    "date":     today,
                    "symbol":   row["symbol"],
                    "weight":   row["x_new"],
                    "dollar":   row["dollar_pos"],
                    "coin_qty": row["coin_qty"]
                })

            if verbose:
                dr = metrics[-1]["daily_return"]
                print(f"{today}: ret={dr:.2%}, nav={nav:.2f}")

        metrics_df  = pl.DataFrame(metrics).sort("date")
        holdings_df = pl.DataFrame(holdings).sort(["date","symbol"])

        metrics_df = metrics_df.with_columns([
            pl.col("nav").cum_max().alias("nav_high"),
            pl.col("daily_return").rolling_std(21).alias("vol_realized_d")
        ]).with_columns([
            (pl.col("vol_realized_d") * np.sqrt(365)).alias("vol_realized_a")
        ])

        metrics_df = metrics_df.with_columns([
            (pl.col("nav") / pl.col("nav_high") - 1).alias("drawdown")
        ])



        return metrics_df, holdings_df
    

In [None]:


pa = PortfolioAllocator(
    sig, spread_df, rets2,
    lambda_risk=2, gamma_cost=0.3,
    max_pos=0.20, gross_cap=2.0,
    turnover_cap=0.30, vol_target=0.40
)


dates = sig['date'].unique()
bt = Backtester(pa, universe_only, trading_dates=dates, cost_per_dollar=0.0005)

metrics, holdings = bt.run(init_nav=1, verbose=True)

print(metrics.head())
print(holdings.filter(pl.col("date")==dates[1000]))



2019-11-27: ret=0.84%, nav=1.01
2019-11-28: ret=0.51%, nav=1.01
2019-11-29: ret=-0.69%, nav=1.01
2019-11-30: ret=-0.62%, nav=1.00
2019-12-01: ret=0.00%, nav=1.00
2019-12-02: ret=0.07%, nav=1.00
2019-12-03: ret=0.36%, nav=1.00
2019-12-04: ret=-0.35%, nav=1.00
2019-12-05: ret=0.10%, nav=1.00
2019-12-06: ret=0.38%, nav=1.01
2019-12-07: ret=0.10%, nav=1.01
2019-12-08: ret=-0.32%, nav=1.00
2019-12-09: ret=-0.51%, nav=1.00
2019-12-10: ret=-0.17%, nav=1.00
2019-12-11: ret=0.04%, nav=1.00
2019-12-12: ret=0.17%, nav=1.00
2019-12-13: ret=-0.52%, nav=0.99
2019-12-14: ret=0.16%, nav=1.00
2019-12-15: ret=-0.08%, nav=0.99
2019-12-16: ret=-1.12%, nav=0.98
2019-12-17: ret=0.39%, nav=0.99
2019-12-18: ret=0.95%, nav=1.00
2019-12-19: ret=0.02%, nav=1.00
2019-12-20: ret=-0.04%, nav=1.00
2019-12-21: ret=0.13%, nav=1.00
2019-12-22: ret=1.13%, nav=1.01
2019-12-23: ret=-0.82%, nav=1.00
2019-12-24: ret=-0.24%, nav=1.00
2019-12-25: ret=0.12%, nav=1.00
2019-12-26: ret=0.01%, nav=1.00
2019-12-27: ret=0.19%, nav=1

In [128]:
print(metrics)

shape: (1_539, 14)
┌────────────┬──────────┬───────────┬───────────┬───┬──────────┬───────────┬───────────┬───────────┐
│ date       ┆ nav      ┆ daily_ret ┆ pnl       ┆ … ┆ nav_high ┆ vol_reali ┆ vol_reali ┆ drawdown  │
│ ---        ┆ ---      ┆ urn       ┆ ---       ┆   ┆ ---      ┆ zed_d     ┆ zed_a     ┆ ---       │
│ date       ┆ f64      ┆ ---       ┆ f64       ┆   ┆ f64      ┆ ---       ┆ ---       ┆ f64       │
│            ┆          ┆ f64       ┆           ┆   ┆          ┆ f64       ┆ f64       ┆           │
╞════════════╪══════════╪═══════════╪═══════════╪═══╪══════════╪═══════════╪═══════════╪═══════════╡
│ 2019-11-27 ┆ 1.008393 ┆ 0.008393  ┆ 0.008493  ┆ … ┆ 1.008393 ┆ null      ┆ null      ┆ 0.0       │
│ 2019-11-28 ┆ 1.013533 ┆ 0.005098  ┆ 0.00514   ┆ … ┆ 1.013533 ┆ null      ┆ null      ┆ 0.0       │
│ 2019-11-29 ┆ 1.006573 ┆ -0.006867 ┆ -0.00696  ┆ … ┆ 1.013533 ┆ null      ┆ null      ┆ -0.006867 │
│ 2019-11-30 ┆ 1.000319 ┆ -0.006213 ┆ -0.006254 ┆ … ┆ 1.013533 ┆ null   

In [129]:
metrics['pnl'].cum_sum().plot.line()

In [130]:
metrics['nav'].plot.line()

In [131]:
metrics['drawdown'].plot.line()