# ORB Analysis of 15-minute markets for BTCUSDT, secondly data from Binance

## Imports

In [1]:
import datetime as dt
import polars as pl
import plotnine as p9

## Load CSV Files

In [None]:
cols = [
    "open_timestamp",
    "open",
    "high",
    "low",
    "close",
    "volume",
    "close_timestamp",
    "quote_asset_volume",
    "number_of_trades",
    "taker_buy_base_asset_volume",
    "taker_buy_quote_asset_volume",
    "ignore",
]

TS_THRESHOLD = 10**14  # <1e14 => ms, >=1e14 => us for 2020+ data

lf = (
    pl.scan_csv(
        r"C:\Users\ccodi\Downloads\btcdata\BTCUSDT-1s-*.csv",
        # r"C:\Users\ccodi\Downloads\btcdata\BTCUSDT-1s-2024-12.csv",
        has_header=False,
        new_columns=cols,
        schema_overrides={
            "open_timestamp": pl.Int64,
            "close_timestamp": pl.Int64,
            "number_of_trades": pl.Int64,
            "ignore": pl.Int64,
        },
    )
    .with_columns(
        # normalize to microseconds
        open_timestamp_us=pl.when(pl.col("open_timestamp") < TS_THRESHOLD)
                           .then(pl.col("open_timestamp") * 1000)
                           .otherwise(pl.col("open_timestamp")),
        close_timestamp_us=pl.when(pl.col("close_timestamp") < TS_THRESHOLD)
                            .then(pl.col("close_timestamp") * 1000)
                            .otherwise(pl.col("close_timestamp")),
    )
    .drop(["open_timestamp", "close_timestamp"])
    .rename({"open_timestamp_us": "open_timestamp", "close_timestamp_us": "close_timestamp"})
    .sort("open_timestamp")
)

df = lf.collect()
# df.columns = cols
# df = df.sort('open_timestamp')

## Store as parquet

In [None]:
df.write_parquet('btc_second_2019_2026.parquet')

## Load parquet

#### To speed up calculations intially, just retain dec 2025 onwards

In [20]:
df = (
    pl.scan_parquet('btc_second_2019_2026.parquet')
    # .filter(pl.col('open_timestamp') >= dt.datetime(2025, 12, 1))
)

## Do some quick sanity checks on Data

In [21]:
df.select(
    pl.col("open_timestamp").min().alias("min_ts"),
    pl.col("open_timestamp").max().alias("max_ts"),
).collect()

min_ts,max_ts
i64,i64
1546300800000000,1769903999000000


In [22]:
df.select(
    (pl.col("open_timestamp").diff().drop_nulls().quantile(0.5) / 1_000_000).alias("median_diff_seconds")
).collect()

median_diff_seconds
f64
1.0


In [23]:
# should be continuous across month boundary
df.select(
    pl.col("open_timestamp").min().alias("min_open_time"),
    pl.col("open_timestamp").max().alias("max_open_time"),
    pl.len().alias('length')
).collect()

min_open_time,max_open_time,length
i64,i64,u32
1546300800000000,1769903999000000,176157050


In [24]:

# check for duplicates (should be zero)
df.select(
    pl.col("open_timestamp").n_unique().alias("unique_ts"),
    pl.len().alias("rows"),
).collect()

unique_ts,rows
u32,u32
176157050,176157050


## Define ORB Feature Function

#### Basic feature definitions

In [25]:
def orb_features(df: pl.DataFrame | pl.LazyFrame, *, cal_minutes: int = 4, bucket_minutes: int = 15, ts_col: str = "close_timestamp", price_col: str = "close", high_col: str = "high", low_col: str = "low", open_col: str = "open") -> pl.LazyFrame:
    """

    :param df:
    :param cal_minutes:
    :param bucket_minutes:
    :param ts_col:           epoch microseconds
    :param price_col:
    :param high_col:
    :param low_col:
    :param open_col:
    :return:
    """
    cal_s = cal_minutes * 60

    lf = df.lazy() if isinstance(df, pl.DataFrame) else df
    # Make sure we are sorted on time column
    lf = lf.sort(ts_col)
    lf = (
        lf.with_columns(ts=pl.from_epoch(pl.col(ts_col), time_unit="us"))
        .with_columns(bucket_start=pl.col("ts").dt.truncate(f"{bucket_minutes}m"))
        .with_columns(sec_in_bucket=((pl.col("ts") - pl.col("bucket_start")).dt.total_seconds()).cast(pl.Int32))
        .with_columns(
            in_cal=pl.col("sec_in_bucket") < cal_s,  # determine if we are IN ORB range
            after_cal=pl.col("sec_in_bucket") >= cal_s,  # determine if we are OUTSIDE ORB range
        )
    )

    # ----
    # 1) Bucket-level base features (what you already compute)
    # ----
    base = (
        lf.group_by("bucket_start")
        .agg(
            start_price=pl.col(open_col).first(),                                                                                               # This is the STRIKE on polymarket
            end_price=pl.col(price_col).last(),                                                                                                 # Last price before start of new window
            upper_band=pl.col(high_col).filter(pl.col("in_cal")).max(),                                                                         # The upper ORB band
            lower_band=pl.col(low_col).filter(pl.col("in_cal")).min(),                                                                          # The lower ORB band
            hit_upper=(pl.col(high_col).filter(pl.col("after_cal")).max() > pl.col(high_col).filter(pl.col("in_cal")).max()).fill_null(False),  # Did we hit upper band?
            hit_lower=(pl.col(low_col).filter(pl.col("after_cal")).min() < pl.col(low_col).filter(pl.col("in_cal")).min()).fill_null(False),    # Did we hit lower band?
            t_first_upper=pl.when(pl.col("after_cal") & (pl.col(high_col) > pl.col(high_col).filter(pl.col("in_cal")).max())).then(pl.col("sec_in_bucket")).min(),   #  time to first UPPER hit
            t_first_lower=pl.when(pl.col("after_cal") & (pl.col(low_col)  < pl.col(low_col).filter(pl.col("in_cal")).min())).then(pl.col("sec_in_bucket")).min(),    #  time to first LOWER hit
            ret = pl.col(price_col).last() - pl.col(open_col).first(),
            ret_bps = (pl.col(price_col).last() / pl.col(open_col).first() - 1) * 1e4,
        )
    )

    out = (
        base
        .with_columns(
            t_first=pl.min_horizontal(["t_first_upper", "t_first_lower"]),
            first_break=pl
                .when(pl.col("t_first_upper").is_not_null() & pl.col("t_first_lower").is_null())
                .then(pl.lit("UP"))
                .when(pl.col("t_first_lower").is_not_null() & pl.col("t_first_upper").is_null())
                .then(pl.lit("DOWN"))
                .when(pl.col("t_first_upper").is_not_null() & pl.col("t_first_lower").is_not_null() & (pl.col("t_first_upper") < pl.col("t_first_lower")))
                .then(pl.lit("UP"))
                .when(pl.col("t_first_upper").is_not_null() & pl.col("t_first_lower").is_not_null() & (pl.col("t_first_lower") < pl.col("t_first_upper")))
                .then(pl.lit("DOWN"))
                .when(pl.col("t_first_upper").is_not_null() & pl.col("t_first_lower").is_not_null() & (pl.col("t_first_upper") == pl.col("t_first_lower")))
                .then(pl.lit("BOTH_SAME"))
                .otherwise(pl.lit("NONE")),
            both_hit = pl.col("hit_upper") & pl.col("hit_lower"),
            path_type = pl
                        .when(pl.col("hit_upper") & ~pl.col("hit_lower"))
                        .then(pl.lit("UP_ONLY"))
                        .when(pl.col("hit_lower") & ~pl.col("hit_upper"))
                        .then(pl.lit("DOWN_ONLY"))
                        .when(pl.col("hit_upper") & pl.col("hit_lower") & (pl.col("t_first_upper") < pl.col("t_first_lower")))
                        .then(pl.lit("UP_THEN_DOWN"))
                        .when(pl.col("hit_upper") & pl.col("hit_lower") & (pl.col("t_first_lower") < pl.col("t_first_upper")))
                        .then(pl.lit("DOWN_THEN_UP"))
                        .otherwise(pl.lit("NONE"))
        )

        .with_columns(
            t_bin=pl.when(pl.col("t_first").is_null()).then(pl.lit("NONE"))
                .when(pl.col("t_first") < 300).then(pl.lit("240-299"))
                .when(pl.col("t_first") < 420).then(pl.lit("300-419"))
                .when(pl.col("t_first") < 600).then(pl.lit("420-599"))
                .otherwise(pl.lit("600-899")),
            range_width_bps = (pl.col("upper_band") - pl.col("lower_band")) / pl.col("start_price") * 1e4,
            year=pl.col("bucket_start").dt.year()
        )

        .sort("bucket_start")
    )

    return out

## Run Features

In [26]:
feature = orb_features(df, cal_minutes=4).collect()

In [None]:
feature.tail(20)

## Deduce win-rates for the various Path - Types

In [27]:
feature.group_by("path_type").agg(
    n=pl.len(),
    mean_ret_bps=pl.col("ret_bps").mean(),
    winrate=(pl.col("ret_bps") > 0).mean(),
)

path_type,n,mean_ret_bps,winrate
str,u32,f64,f64
"""NONE""",9422,-0.184583,0.496391
"""DOWN_ONLY""",69945,-19.969504,0.089442
"""UP_THEN_DOWN""",22855,-8.024915,0.261037
"""DOWN_THEN_UP""",23543,8.287422,0.743108
"""UP_ONLY""",69977,20.275372,0.91603


In [28]:
prob_by_year = (
    feature
    .group_by(["year", "path_type"])
    .agg(
        n = pl.len(),
        winrate = (pl.col("ret_bps") > 0).mean(),
        mean_ret_bps = pl.col("ret_bps").mean(),
    )
    .sort(["year", "path_type"])
)

winrate_matrix = (
    prob_by_year
    .select("year", "path_type", "winrate")
    .pivot(
        values="winrate",
        index="year",
        on="path_type",
    )
    .sort("year")
)

In [29]:
winrate_matrix

year,DOWN_ONLY,DOWN_THEN_UP,NONE,UP_ONLY,UP_THEN_DOWN
i32,f64,f64,f64,f64,f64
2019,0.102213,0.721708,0.486198,0.901811,0.297822
2020,0.10085,0.727815,0.482675,0.916496,0.275774
2021,0.096658,0.735255,0.526777,0.912077,0.269898
2022,0.094901,0.740512,0.533632,0.913864,0.255242
2023,0.085813,0.756417,0.492472,0.922662,0.254727
2024,0.079848,0.769067,0.503076,0.92122,0.233375
2025,0.078237,0.741234,0.482663,0.918728,0.25275
2026,0.081055,0.698276,0.408284,0.925573,0.258398


## Now look at effect of first time to hit boundaries

In [None]:
tbl = (
    feature
    .filter(pl.col("path_type") != "NONE")  # optional; NONE handled separately
    .group_by(["path_type", "t_bin"])
    .agg(
        n=pl.len(),
        winrate=(pl.col("ret_bps") > 0).mean(),
        mean_ret_bps=pl.col("ret_bps").mean(),
    )
    .sort(["path_type", "t_bin"])
)

In [None]:
winrate_by_time = (
    tbl.select("path_type", "t_bin", "winrate")
       .pivot(values="winrate", index="t_bin", on="path_type")
)

In [None]:
tbl_year = (
    feature.filter(pl.col("path_type") != "NONE")
         .group_by(["year", "path_type", "t_bin"])
         .agg(n=pl.len(), winrate=(pl.col("ret_bps") > 0).mean())
)

In [None]:
winrate_year_time = (
    tbl_year
    .select("year", "t_bin", "path_type", "winrate")
    .pivot(
        values="winrate",
        index=["year", "t_bin"],
        on="path_type",
    )
    .sort(["year", "t_bin"])
)

In [None]:
winrate_by_time

In [None]:
winrate_year_time

## Similar analysis for size of ORB range

In [None]:
width_bins = (
    feature
    .filter(pl.col("path_type") != "NONE")
    .group_by("path_type")
    .agg(
        q1=pl.col("range_width_bps").quantile(0.33),
        q2=pl.col("range_width_bps").quantile(0.66),
    )
)

feat3 = (
    feature
    .join(width_bins, on="path_type", how="left")
    .with_columns(
         width_bin = pl.when(pl.col("range_width_bps") <= pl.col("q1")).then(pl.lit("NARROW"))
                      .when(pl.col("range_width_bps") <= pl.col("q2")).then(pl.lit("MEDIUM"))
                      .otherwise(pl.lit("WIDE"))
    )
)

In [None]:
width_tbl = (
    feat3
    .filter(pl.col("path_type") != "NONE")
    .group_by(["path_type", "width_bin"])
    .agg(
        n=pl.len(),
        winrate=(pl.col("ret_bps") > 0).mean(),
        mean_ret_bps=pl.col("ret_bps").mean(),
    )
    .sort(["path_type", "width_bin"])
)

In [None]:
width_winrate_matrix = (
    width_tbl
    .select("path_type", "width_bin", "winrate")
    .pivot(values="winrate", index="width_bin", on="path_type")
)

In [None]:
width_winrate_matrix

## Some vol properties on Returns

In [None]:
df2 = (
    df
    .filter(pl.col("open_timestamp") > dt.datetime(2023, 1, 1))
    .with_columns(
        dt=pl.from_epoch(pl.col("open_timestamp"), time_unit="us").alias("dt"),
    )
    .with_columns(
        bucket15=pl.col("dt").dt.truncate("15m"),
        minute_in_bucket=((pl.col("dt") - pl.col("dt").dt.truncate("15m")).dt.total_minutes()).cast(pl.Int8),
    )
    .sort("dt")
    .collect()
)

In [None]:
minute_metrics = (
    df2
    .group_by(["bucket15", "minute_in_bucket"], maintain_order=True)
    .agg(
        r = pl.col("close").log().diff(),
        hl_abs = pl.col("high").max() - pl.col("low").min(),
        hl_log = (pl.col("high").max() / pl.col("low").min()).log(),
    )
    .with_columns(
        avg_return = pl.col("r").list.mean(),
        avg_return_size = pl.col("r").list.eval(pl.element().abs()).list.mean(),
        rv_sec = (pl.col("r").list.eval(pl.element() ** 2).list.sum()).sqrt(),
    )
    .select([
        "bucket15",
        "minute_in_bucket",
        "avg_return",
        "avg_return_size",
        "rv_sec",
        "hl_abs",
        "hl_log",
    ])
)

In [None]:
minute_profile = (
    minute_metrics
    .group_by("minute_in_bucket")
    .agg(
        avg_return=pl.col("avg_return").mean(),
        avg_return_size=pl.col("avg_return_size").mean(),
        avg_rv_sec=pl.col("rv_sec").mean(),
        avg_hl_abs=pl.col("hl_abs").mean(),
        avg_hl_log=pl.col("hl_log").mean(),
        n_buckets=pl.len(),
    )
    .sort("minute_in_bucket")
)

In [None]:
(
    minute_profile
    .pipe(p9.ggplot)
    + p9.aes(x='minute_in_bucket', y='avg_hl_abs')
    + p9.geom_bar(stat='identity')
)

In [None]:
bucket15_metrics = (
    df2
    .group_by("bucket15", maintain_order=True)
    .agg(
        r=pl.col("close").log().diff(),
        hl_abs=(pl.col("high").max() - pl.col("low").min()),
        hl_log=(pl.col("high").max() / pl.col("low").min()).log(),
    )
    .with_columns(
        rv_sec= (pl.col("r").list.eval(pl.element() ** 2).list.sum()).sqrt(),  # 15m realized vol from 1s returns (not annualized)
    )
    .select(["bucket15", "rv_sec", "hl_abs", "hl_log"])
    .sort("bucket15")
)

In [None]:
bucket15_lagged = (
    bucket15_metrics
    .with_columns(
        rv_lag=pl.col("rv_sec").shift(1),
        hl_abs_lag=pl.col("hl_abs").shift(1),
        hl_log_lag=pl.col("hl_log").shift(1),
    )
    .drop_nulls(["rv_lag", "hl_abs_lag", "hl_log_lag"])
)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

pdf = bucket15_lagged.select(["rv_sec", "rv_lag", "hl_abs", "hl_abs_lag", "hl_log", "hl_log_lag"]).to_pandas()

def regplot(x, y, xlabel, ylabel, title):
    x = np.asarray(x)
    y = np.asarray(y)
    b1, b0 = np.polyfit(x, y, 1)  # y ≈ b1*x + b0

    plt.figure()
    plt.scatter(x, y, s=8)
    xs = np.linspace(x.min(), x.max(), 200)
    plt.plot(xs, b1*xs + b0)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(f"{title}  (slope={b1:.3g}, intercept={b0:.3g})")
    plt.show()

regplot(pdf["rv_lag"],    pdf["rv_sec"], "RV(t-1)",    "RV(t)",    "15m RV persistence")
regplot(pdf["hl_abs_lag"],pdf["hl_abs"], "HL_abs(t-1)","HL_abs(t)","15m HL_abs persistence")
regplot(pdf["hl_log_lag"],pdf["hl_log"], "HL_log(t-1)","HL_log(t)","15m HL_log persistence")

In [None]:
seasonality = (
    bucket15_metrics
    .with_columns(
        # minutes since midnight UTC
        tod_min = (
            pl.col("bucket15").dt.hour() * 60
            + pl.col("bucket15").dt.minute()
        ),

        # 15-minute index: 0..95
        tod_15 = (pl.col("bucket15").dt.hour() * 4
                  + (pl.col("bucket15").dt.minute() // 15))
    )
    .group_by("tod_15")
    .agg(
        avg_rv = pl.col("rv_sec").mean(),
        median_rv = pl.col("rv_sec").median(),
        avg_hl = pl.col("hl_log").mean(),
        n = pl.len(),
    )
    .sort("tod_15")
)

In [None]:
import matplotlib.pyplot as plt

pdf = seasonality.to_pandas()

plt.figure(figsize=(10,4))
plt.plot(pdf["tod_15"], pdf["avg_rv"], label="Avg RV (15m)")
plt.plot(pdf["tod_15"], pdf["median_rv"], linestyle="--", label="Median RV")
plt.xlabel("15-minute bucket in day (UTC)")
plt.ylabel("Realized volatility (15m)")
plt.title("Intraday 15-Minute Volatility Seasonality")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt

pdf = seasonality.to_pandas()

plt.figure(figsize=(10,4))
plt.plot(pdf["tod_15"], pdf["avg_rv"], label="Avg RV (15m)")
plt.plot(pdf["tod_15"], pdf["median_rv"], linestyle="--", label="Median RV")
plt.xlabel("15-minute bucket in day (UTC)")
plt.ylabel("Realized volatility (15m)")
plt.title("Intraday 15-Minute Volatility Seasonality")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()