In [1]:
import os
from pathlib import Path
import options_wizard as ow
from dotenv import load_dotenv
load_dotenv()
tick_path = os.getenv("TICK_PATH", "").split(os.pathsep)[0]

p = Path(tick_path)
if p.is_dir(): 
    available_ticks =[f.name.replace('.parquet', '') for f in p.iterdir() if f.is_file()]

n_const = 50

suffix = f"50_delta_7_1_cal_spread"
start_date = ow.DateObj(2010, 1, 1)
end_date = ow.DateObj(2020, 12, 31)
save_type = ow.SaveType.PICKLE

universe = ow.Universe()
universe.top_constituents(n_const)




In [None]:
from dotenv import load_dotenv
import os
import pandas as pd

# --------------------------------------------------
# 1. Setup
# --------------------------------------------------
load_dotenv()

cmdty_path = os.getenv("CMDTY_ROLL_PATH", "").split(os.pathsep)[0]
cmdty_folders = [f.path for f in os.scandir(cmdty_path) if f.is_dir()]


for samp in cmdty_folders:
    files = sorted(
        f.path for f in os.scandir(samp)
        if f.is_file() and f.name.endswith(".parquet")
    )

    # --------------------------------------------------
    # 2. Load all parquet files and stack with Contract
    # --------------------------------------------------
    dfs = []

    for f in files:

        df = pd.read_parquet(f)

        df["Date"] = pd.to_datetime(df["Date"])
        df = df.drop_duplicates(subset=["Date"])

        dfs.append(df)

    merged = (
        pd.concat(dfs, ignore_index=True)
        .set_index(["Date", "Contract"])
        .sort_index()
    )
    merged = merged.dropna(subset=["PX_SETTLE", "LAST_TRADEABLE_DT"])

    merged = (
        merged
        .reset_index()          # Date, Contract → columns
        .set_index("Date")      # Date → index
    )

    # --------------------------------------------------

    # 3. Save merged DataFrame
    # --------------------------------------------------
    save_path = os.path.join(os.getenv("CMDTY_PATH"), samp.split('\\')[-1] + ".parquet")
    merged["FUT_CONT_SIZE"]= pd.to_numeric(merged["FUT_CONT_SIZE"], errors='coerce')
    merged["CRNCY"] = merged["CRNCY"].astype("string")
    merged["EXCH_CODE"] = merged["EXCH_CODE"].astype("string")

    merged.to_parquet(save_path)



In [114]:
from dotenv import load_dotenv
import os
import options_wizard as ow
import polars as pl

load_dotenv()
cmdty_path = os.getenv("CMDTY_PATH", "").split(os.pathsep)[0]
files = [f.path for f in os.scandir(cmdty_path) if f.is_file() and f.name.endswith(".parquet")]

for f in files:
    df = pl.scan_parquet(f)
    break

df.collect().head(5)


def load_data(**kwargs) -> ow.DataType:
    """Loads in the commodity futures data"""
    import os
    from dotenv import load_dotenv
    import polars as pl

    tick = kwargs.get("tick", None)

    load_dotenv()
    cmdty_path = os.getenv("CMDTY_PATH", "").split(os.pathsep)[0]
    files = [f.path for f in os.scandir(cmdty_path) if f.is_file() and f.name.endswith(".parquet")]

    df = None
    for file in files:
        if tick == file.split('\\')[-1].replace('_FUT.parquet', ''):
            df = pl.scan_parquet(file)
    
    if df is None:
        raise ValueError(f"Tick {tick} not found in CMDTY_PATH")
    

    df = (
        df
        .with_columns(
        pl.col("LAST_TRADEABLE_DT")
        .str.strptime(pl.Date, format="%d/%m/%Y", strict=False)
        )
        .with_columns(
            pl.col("FUT_NOTICE_FIRST")
            .str.strptime(pl.Date, format="%d/%m/%Y", strict=False)
        )
        .with_columns(
            pl.col("Date")
            .cast(pl.Date)
        )
    )
    
    return ow.DataType(data = df, tick = tick)

def days_to_anchor(data: ow.DataType, **kwargs) -> ow.DataType:

    import pandas as pd
    import polars as pl
    import options_wizard as ow

    df = data._data
    tick = kwargs.get("tick", None)

    min_day = pd.Timestamp(
        df.select(pl.col("Date").min()).collect().item()
    )

    max_day = pd.Timestamp(
        df.select(pl.col("LAST_TRADEABLE_DT").max()).collect().item()
    )

    exchange = (
        df.select(pl.col("EXCH_CODE"))
        .unique()
        .collect()
        .item()
    )

    dates = ow.market_dates(
        exchange=exchange,
        lower=min_day,
        upper=max_day
    )

    calendar = (
        pl.LazyFrame({"Date": dates})
        .with_columns(pl.col("Date").cast(pl.Date))
        .with_row_index("TradeDateIdx")
        .sort("Date")               
    )

    df = df.sort("Date")

    df = (
        df.join_asof(
            calendar,
            on="Date",
            strategy="backward"
        )
        .filter(pl.col("TradeDateIdx").is_not_null())
        .rename({"TradeDateIdx": "DateIdx"})
    )

    calendar_ltd = (
        calendar
        .rename({
            "Date": "LAST_TRADEABLE_DT",
            "TradeDateIdx": "LtdIdx"
        })
        .sort("LAST_TRADEABLE_DT")   
    )

    df = df.sort("LAST_TRADEABLE_DT")

    df = (
        df.join_asof(
            calendar_ltd,
            on="LAST_TRADEABLE_DT",
            strategy="backward"
        )
        .filter(pl.col("LtdIdx").is_not_null())
    )

    df = df.with_columns(
        (pl.col("LtdIdx") - pl.col("DateIdx")).alias("DAYS_TO_ANCHOR")
    )
    df = df.with_columns(
        pl.col("DAYS_TO_ANCHOR")
        .min()
        .over("Date")
        .alias("DAYS_TO_FRONT_ANCHOR")
    )
    return ow.DataType(data = df, tick =tick )

def curve_structure(data: ow.DataType, **kwargs) -> ow.DataType:
    import polars as pl
    import options_wizard as ow

    df = data._data
    tick = kwargs.get("tick", None)

    # --- build clean cross-sectional curve (one price per tenor per day) ---
    curve = (
        df
        .select(["Date", "DAYS_TO_ANCHOR", "PX_SETTLE"])
        .group_by(["Date", "DAYS_TO_ANCHOR"])
        .agg(pl.col("PX_SETTLE").last())
        .sort(["Date", "DAYS_TO_ANCHOR"])
        .with_columns(
            pl.row_index().over("Date").alias("TenorIdx")
        )
    )

    # --- identify front / back / interior ---
    curve = curve.with_columns([
        pl.when(pl.col("TenorIdx") == 0)
          .then(pl.lit("front"))
          .when(pl.col("TenorIdx") == pl.max("TenorIdx").over("Date"))
          .then(pl.lit("back"))
          .otherwise(pl.lit("int"))
          .alias("CURVE_POS")
    ])

    # --- neighbour tenors in tenor space ---
    curve = curve.with_columns([
        pl.col("PX_SETTLE").shift(-1).over("Date").alias("f_p"),
        pl.col("PX_SETTLE").shift(1).over("Date").alias("f_m"),
        pl.col("DAYS_TO_ANCHOR").shift(-1).over("Date").alias("T_p"),
        pl.col("DAYS_TO_ANCHOR").shift(1).over("Date").alias("T_m"),
    ])

    # --- weighted second derivative (cross-sectional curvature) ---
    curve = curve.with_columns([
        (pl.col("T_p") - pl.col("DAYS_TO_ANCHOR")).alias("dt_fwd"),
        (pl.col("DAYS_TO_ANCHOR") - pl.col("T_m")).alias("dt_bwd"),
        (pl.col("T_p") - pl.col("T_m")).alias("dt_span"),
    ]).with_columns(
        pl.when(
            pl.all_horizontal([
                pl.col("f_p").is_not_null(),
                pl.col("f_m").is_not_null(),
                pl.col("dt_fwd") > 0,
                pl.col("dt_bwd") > 0,
            ])
        )
        .then(
            2.0 / pl.col("dt_span") * (
                (pl.col("f_p") - pl.col("PX_SETTLE")) / pl.col("dt_fwd")
                -
                (pl.col("PX_SETTLE") - pl.col("f_m")) / pl.col("dt_bwd")
            ) * (252.0 ** 2)
        )
        .otherwise(None)
        .alias("CURVATURE")
    )

    # --- relative curvature (scale-free) ---
    curve = curve.with_columns(
        (pl.col("CURVATURE") / pl.col("PX_SETTLE")).alias("REL_CURVATURE")
    )

    # --- keep only what we want to merge back ---
    curve = curve.select([
        "Date",
        "DAYS_TO_ANCHOR",
        "CURVE_POS",
        "REL_CURVATURE",
    ])

    # --- merge back to original dataframe ---
    out = df.join(
        curve,
        on=["Date", "DAYS_TO_ANCHOR"],
        how="left"
    )

    return ow.DataType(data=out, tick=tick)


data = load_data(tick = "CC")
data = days_to_anchor(data, tick = "CC")
data = curve_structure(data, tick = "CC")

In [117]:
out = data._data.collect().filter(pl.col("CURVE_POS") == "int")
out['REL_CURVATURE'].describe()

statistic,value
str,f64
"""count""",17897.0
"""null_count""",0.0
"""mean""",0.01201
"""std""",0.309006
"""min""",-2.93395
"""25%""",-0.077484
"""50%""",-0.017291
"""75%""",0.038237
"""max""",4.588169


In [80]:
import polars as pl

lf = (
    data._data.group_by("Date").agg(pl.col("DAYS_TO_ANCHOR").implode().alias("days_to_anchor"))  # collect into list per date.sort("Date")  # optional
)

df = lf.collect()

# Print each date followed by the DAYS_TO_ANCHOR values for its contracts
df = df.sort("Date")
for date, values in df.iter_rows():
    print(date, values)

ColumnNotFoundError: unable to find column "DAYS_TO_ANCHOR"; valid columns: ["Date", "CURV_TENOR_CURV_TENOR_1", "CURV_TENOR_CURV_TENOR_2", "CURV_TENOR_CURV_TENOR_3", "CURV_TENOR_CURV_TENOR_4"]

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'sink' <---
SELECT [col("Date"), col("CURV_TENOR_1").alias("CURV_TENOR_CURV_TENOR_1"), col("CURV_TENOR_2").alias("CURV_TENOR_CURV_TENOR_2"), col("CURV_TENOR_3").alias("CURV_TENOR_CURV_TENOR_3"), col("CURV_TENOR_4").alias("CURV_TENOR_CURV_TENOR_4")]
  SORT BY [col("Date")]
    AGGREGATE[maintain_order: false]
      [when([(col("TenorIdx")) == (1)]).then(col("CURVATURE")).otherwise(null.cast(Float64)).first().alias("CURV_TENOR_1"), when([(col("TenorIdx")) == (2)]).then(col("CURVATURE")).otherwise(null.cast(Float64)).first().alias("CURV_TENOR_2"), when([(col("TenorIdx")) == (3)]).then(col("CURVATURE")).otherwise(null.cast(Float64)).first().alias("CURV_TENOR_3"), when([(col("TenorIdx")) == (4)]).then(col("CURVATURE")).otherwise(null.cast(Float64)).first().alias("CURV_TENOR_4")] BY [col("Date")]
      FROM
      SELECT [col("Date"), col("TenorIdx"), col("CURVATURE")]
        FILTER col("CURVATURE").is_not_null()
        FROM
           WITH_COLUMNS:
           [[([(dyn float: 2) / ([(col("T_p")) - (col("T_m"))].cast(Unknown(Float)))]) * ([([([(col("F_p")) - (col("MID_PX"))]) / ([(col("T_p")) - (col("DAYS_TO_ANCHOR"))].cast(Float64))]) - ([([(col("MID_PX")) - (col("F_m"))]) / ([(col("DAYS_TO_ANCHOR")) - (col("T_m"))].cast(Float64))])])].alias("CURVATURE")] 
             WITH_COLUMNS:
             [col("MID_PX").shift([dyn int: -1]).over([col("Date")]).alias("F_p"), col("MID_PX").shift([dyn int: 1]).over([col("Date")]).alias("F_m"), col("DAYS_TO_ANCHOR").shift([dyn int: -1]).over([col("Date")]).alias("T_p"), col("DAYS_TO_ANCHOR").shift([dyn int: 1]).over([col("Date")]).alias("T_m")] 
               WITH_COLUMNS:
               [0.int_range([len()]).over([col("Date")]).alias("TenorIdx")] 
                SORT BY [col("Date"), col("DAYS_TO_ANCHOR")]
                   WITH_COLUMNS:
                   [[([(col("PX_BID")) + (col("PX_ASK"))]) / (2.0)].alias("MID_PX")] 
                     WITH_COLUMNS:
                     [col("DAYS_TO_ANCHOR").min().over([col("Date")]).alias("DAYS_TO_FRONT_ANCHOR")] 
                       WITH_COLUMNS:
                       [[(col("LtdIdx")) - (col("DateIdx"))].alias("DAYS_TO_ANCHOR")] 
                        FILTER col("LtdIdx").is_not_null()
                        FROM
                          ASOF JOIN:
                          LEFT PLAN ON: [col("LAST_TRADEABLE_DT")]
                            SORT BY [col("LAST_TRADEABLE_DT")]
                              SELECT [col("Contract"), col("VOLUME"), col("OPEN_INT"), col("PX_BID"), col("PX_ASK"), col("PX_SETTLE"), col("LAST_TRADEABLE_DT"), col("FUT_NOTICE_FIRST"), col("FUT_CONT_SIZE"), col("CRNCY"), col("EXCH_CODE"), col("Date"), col("TradeDateIdx").alias("DateIdx")]
                                FILTER col("TradeDateIdx").is_not_null()
                                FROM
                                  ASOF JOIN:
                                  LEFT PLAN ON: [col("Date")]
                                    SORT BY [col("Date")]
                                       WITH_COLUMNS:
                                       [col("Date").strict_cast(Date)] 
                                         WITH_COLUMNS:
                                         [col("FUT_NOTICE_FIRST").str.strptime(["raise"])] 
                                           WITH_COLUMNS:
                                           [col("LAST_TRADEABLE_DT").str.strptime(["raise"])] 
                                            Parquet SCAN [S:\\Trading\\NSA\\Christoph\\Commodity Futures\\updated\CC_FUT.parquet]
                                            PROJECT */12 COLUMNS
                                            ESTIMATED ROWS: 26881
                                  RIGHT PLAN ON: [col("Date")]
                                    SORT BY [col("Date")]
                                      ROW INDEX name: TradeDateIdx, offset: 0
                                         WITH_COLUMNS:
                                         [col("Date").strict_cast(Date)] 
                                          DF ["Date"]; PROJECT */1 COLUMNS
                                  END ASOF JOIN
                          RIGHT PLAN ON: [col("LAST_TRADEABLE_DT")]
                            SORT BY [col("LAST_TRADEABLE_DT")]
                              SELECT [col("TradeDateIdx").alias("LtdIdx"), col("Date").alias("LAST_TRADEABLE_DT")]
                                SORT BY [col("Date")]
                                  ROW INDEX name: TradeDateIdx, offset: 0
                                     WITH_COLUMNS:
                                     [col("Date").strict_cast(Date)] 
                                      DF ["Date"]; PROJECT */1 COLUMNS
                          END ASOF JOIN

In [None]:
for day = in 