# ASX Short Interest Signal

## Import Libraries

In [46]:
import pandas as pd
import datetime as dt
import tabula
import sys, subprocess
from pandas.tseries.offsets import BDay
import yfinance as yf
import time
import numpy as np

## Import Data
### Extracting and cleaning 52 weeks of ASX short interest data from ASIC. 

In [47]:
# --------- CONFIG ----------
WEEKS_TO_LOAD = 104
BASE = "https://download.asic.gov.au/short-selling/RR{datestr}-001-SSDailyAggShortPos.pdf"

# --------- Helpers to clean ASIC tables ----------
def _norm_cols(cols):
    """Normalize header strings so we can fuzzy-map column names reliably."""
    return [c.strip().lower().replace("\n", " ").replace("  ", " ") for c in cols]

def _num(s):
    """Convert strings like '1,234,567' to numbers; keep NaN on failure."""
    return pd.to_numeric(pd.Series(s).astype(str).str.replace(",", "").str.strip(), errors="coerce")

def _is_ticker(x):
    """ASX tickers are 1–6 alphanumeric chars (allow '.'), no spaces."""
    if pd.isna(x): return False
    s = str(x).strip()
    if s.lower().startswith("product code"): return False
    if " " in s: return False
    s2 = s.replace(".", "")
    return 1 <= len(s2) <= 6 and s2.isalnum()

def _rename_clean_one(df):
    """
    Take one raw page DataFrame from the PDF and:
      - fuzzy-rename to: Company, Ticker, Short Positions, Total Volume, Shorts (%)
      - drop header repeats / non-ticker rows
      - coerce numerics
    Return a clean subset with those 5 columns (missing ones added as NA).
    """
    raw = list(df.columns)
    norm = _norm_cols(raw)

    # map raw headers -> canonical headers
    cmap = {}
    for i, c in enumerate(norm):
        if c.startswith("product code"):
            cmap[raw[i]] = "Ticker"
        elif c.startswith("product"):
            cmap[raw[i]] = "Company"
        elif "reported short position" in c and "reported as short" not in c:
            cmap[raw[i]] = "Short Positions"
        elif "total product in issue" in c and "reported as short" not in c:
            cmap[raw[i]] = "Total Volume"
        elif "% of total product in issue reported as short positions" in c or c.endswith("short positions"):
            cmap[raw[i]] = "Shorts (%)"

    df = df.rename(columns=cmap)

    keep = ["Company", "Ticker", "Short Positions", "Total Volume", "Shorts (%)"]
    df = df[[c for c in keep if c in df.columns]].copy()

    if "Ticker" in df.columns:
        df = df[df["Ticker"].apply(_is_ticker)]

    if "Short Positions" in df.columns:
        df["Short Positions"] = _num(df["Short Positions"]).values
    if "Total Volume" in df.columns:
        df["Total Volume"] = _num(df["Total Volume"]).values
    if "Shorts (%)" in df.columns:
        df["Shorts (%)"] = pd.to_numeric(df["Shorts (%)"], errors="coerce")

    # compute % if missing and counts exist
    if "Shorts (%)" not in df.columns and {"Short Positions", "Total Volume"}.issubset(df.columns):
        df["Shorts (%)"] = (df["Short Positions"] / df["Total Volume"]) * 100

    for col in keep:
        if col not in df.columns:
            df[col] = pd.NA

    return df[keep]

def _read_all_tables(url):
    """Try both Tabula extraction modes; return list of tables for the PDF."""
    for kw in (dict(lattice=True), dict(stream=True)):
        try:
            t = tabula.read_pdf(url, pages="all", multiple_tables=True, **kw)
            if t and len(t) > 0:
                return t
        except Exception:
            pass
    return []

def most_recent_friday(ref_date=None):
    """Return the most recent Friday (including today if Friday)."""
    if ref_date is None:
        ref_date = dt.date.today()
    offset = (ref_date.weekday() - 4) % 7  # Mon=0 ... Fri=4
    return ref_date - dt.timedelta(days=offset)

def fetch_eow_for_friday(friday_date: dt.date, max_back_days: int = 6) -> pd.DataFrame:
    """
    Given a nominal Friday, try Fri, Thu, ... back to Sat (<= max_back_days)
    to find the ASIC short-interest PDF for that week.
    Returns a tidy DataFrame with:
      - ReleaseDate = actual PDF file date used (URL date)
      - Date = as-of trading date = ReleaseDate - 4 business days (T+4)
    """
    for delta in range(0, max_back_days + 1):
        day = friday_date - dt.timedelta(days=delta)
        ds = day.strftime("%Y%m%d")
        url = BASE.format(datestr=ds)

        tables = _read_all_tables(url)
        if not tables:
            continue

        cleaned = []
        for t in tables:
            if isinstance(t, pd.DataFrame) and len(t.columns) >= 4 and len(t) > 0:
                cleaned.append(_rename_clean_one(t.dropna(how="all")))
        if not cleaned:
            continue

        out = pd.concat(cleaned, ignore_index=True).drop_duplicates()
        if not out.empty:
            release_dt = pd.to_datetime(ds, format="%Y%m%d")     # PDF file date (publication)
            asof_dt    = (release_dt - BDay(4)).normalize()      # trading as-of (T+4 rule)

            out.insert(0, "Date", asof_dt)                       # <-- AS-OF DATE (what you asked for)
            out.insert(1, "ReleaseDate", release_dt)             # <-- RELEASE DATE (file date)
            print(f"✓ Week {friday_date} — using file {ds} | as-of {asof_dt.date()} — rows: {len(out)}")
            return out

    print(f"⚠️ No report found for week {friday_date} (tried Fri..Sat).")
    return pd.DataFrame()

# --------- MAIN: load latest N snapshots into one tidy table ----------
frames = []
anchor_friday = most_recent_friday()
for k in range(WEEKS_TO_LOAD):
    target_friday = anchor_friday - dt.timedelta(weeks=k)
    df_k = fetch_eow_for_friday(target_friday, max_back_days=6)
    if not df_k.empty:
        frames.append(df_k)

if not frames:
    raise RuntimeError("No ASIC EOW reports could be fetched. Check internet/Java (tabula).")

hist_df = (
    pd.concat(frames, ignore_index=True)
      .drop_duplicates(subset=["Date", "Ticker"])   # dedupe if same as-of/ticker appears twice
      .sort_values(["Date", "Ticker"])
      .reset_index(drop=True)
)

# Optional: MultiIndex for easy slicing by date/ticker
hist_idx = hist_df.set_index(["Date", "Ticker"]).sort_index()

# --------- Summary + examples ----------
print(
    f"\nLoaded {hist_df['Date'].dt.date.nunique()} as-of dates, "
    f"{hist_df['Ticker'].nunique()} unique tickers, "
    f"{len(hist_df)} total rows."
)

✓ Week 2025-08-29 — using file 20250826 | as-of 2025-08-20 — rows: 627
✓ Week 2025-08-22 — using file 20250822 | as-of 2025-08-18 — rows: 630
✓ Week 2025-08-15 — using file 20250815 | as-of 2025-08-11 — rows: 626
✓ Week 2025-08-08 — using file 20250808 | as-of 2025-08-04 — rows: 633
✓ Week 2025-08-01 — using file 20250801 | as-of 2025-07-28 — rows: 627
✓ Week 2025-07-25 — using file 20250725 | as-of 2025-07-21 — rows: 622
✓ Week 2025-07-18 — using file 20250718 | as-of 2025-07-14 — rows: 634
✓ Week 2025-07-11 — using file 20250711 | as-of 2025-07-07 — rows: 633
✓ Week 2025-07-04 — using file 20250704 | as-of 2025-06-30 — rows: 656
✓ Week 2025-06-27 — using file 20250627 | as-of 2025-06-23 — rows: 643
✓ Week 2025-06-20 — using file 20250620 | as-of 2025-06-16 — rows: 643
✓ Week 2025-06-13 — using file 20250613 | as-of 2025-06-09 — rows: 652
✓ Week 2025-06-06 — using file 20250606 | as-of 2025-06-02 — rows: 647
✓ Week 2025-05-30 — using file 20250530 | as-of 2025-05-26 — rows: 638
✓ Week

### Adding share price for both the date of data and the release date (delay), free float, and ADV

In [48]:
import numpy as np
import pandas as pd
import yfinance as yf

# ---------- helpers ----------
def _to_close_matrix(prices: pd.DataFrame) -> pd.DataFrame:
    """Return a (date x symbol) Close matrix from yfinance output."""
    if prices is None or prices.empty:
        return pd.DataFrame()

    if not isinstance(prices.columns, pd.MultiIndex):
        if "Close" in prices.columns:
            sym = getattr(prices, "_yfsym", None) or "SYMBOL"
            return pd.DataFrame({sym: prices["Close"]})

    lvl0 = [str(x).lower() for x in prices.columns.get_level_values(0)]
    lvl1 = [str(x).lower() for x in prices.columns.get_level_values(1)]

    if "close" in lvl0:
        try: return prices.xs("Close", axis=1, level=0)
        except Exception: pass
    if "close" in lvl1:
        try: return prices.xs("Close", axis=1, level=1)
        except Exception: pass
    for lvl in (0, 1):
        try: return prices.xs("Close", axis=1, level=lvl)
        except Exception: continue
    raise KeyError("Couldn't find 'Close' in yfinance result.")

def _to_field_matrix(prices: pd.DataFrame, field: str) -> pd.DataFrame:
    """Generic extractor for a field (e.g., 'Volume') as (date x symbol) matrix."""
    if prices is None or prices.empty:
        return pd.DataFrame()

    if not isinstance(prices.columns, pd.MultiIndex):
        if field in prices.columns:
            sym = getattr(prices, "_yfsym", None) or "SYMBOL"
            return pd.DataFrame({sym: prices[field]})

    lvl0 = [str(x).lower() for x in prices.columns.get_level_values(0)]
    lvl1 = [str(x).lower() for x in prices.columns.get_level_values(1)]
    f = field.lower()

    if f in lvl0:
        try: return prices.xs(field, axis=1, level=0)
        except Exception: pass
    if f in lvl1:
        try: return prices.xs(field, axis=1, level=1)
        except Exception: pass
    for lvl in (0, 1):
        try: return prices.xs(field, axis=1, level=lvl)
        except Exception: continue
    raise KeyError(f"Couldn't find '{field}' in yfinance result.")

# ---------- start from your existing hist_df ----------
work = hist_df.copy()
work["Symbol"] = work["Ticker"].astype(str).str.upper().str.strip() + ".AX"
work["Date_norm"] = pd.to_datetime(work["Date"]).dt.normalize()
work["ReleaseDate_norm"] = pd.to_datetime(work["ReleaseDate"]).dt.normalize()

syms = sorted(work["Symbol"].unique())
start = min(work["Date_norm"].min(), work["ReleaseDate_norm"].min()) - pd.Timedelta(days=2)
end   = max(work["Date_norm"].max(), work["ReleaseDate_norm"].max()) + pd.Timedelta(days=5)

# ---------- Yahoo snapshots: float + ADV(3M) ----------
def _safe_float_info(sym: str) -> dict:
    out = {"Symbol": sym, "floatShares": np.nan, "sharesOutstanding": np.nan, "heldPercentInsiders": np.nan}
    try:
        tkr = yf.Ticker(sym)
        try:
            info = tkr.get_info()
        except Exception:
            info = getattr(tkr, "info", {}) or {}
        fs = info.get("floatShares")
        so = info.get("sharesOutstanding") or info.get("impliedSharesOutstanding")
        hpi = info.get("heldPercentInsiders")
        out["floatShares"] = float(fs) if fs not in (None, "") else np.nan
        out["sharesOutstanding"] = float(so) if so not in (None, "") else np.nan
        out["heldPercentInsiders"] = float(hpi) if isinstance(hpi, (int, float)) else np.nan
    except Exception:
        pass
    return out

def _get_adv3m(sym: str) -> dict:
    adv = np.nan
    try:
        t = yf.Ticker(sym)
        try:
            fi = t.fast_info
            adv = (fi.get("threeMonthAverageVolume", np.nan)
                   if hasattr(fi, "get") else getattr(fi, "threeMonthAverageVolume", np.nan))
        except Exception:
            pass
        if pd.isna(adv) or adv is None:
            try:
                info = t.get_info()
            except Exception:
                info = getattr(t, "info", {}) or {}
            for k in ("averageDailyVolume3Month", "threeMonthAverageVolume", "averageVolume"):
                v = info.get(k)
                if v not in (None, ""):
                    adv = float(v); break
    except Exception:
        pass
    return {"Symbol": sym, "ADV 3M": adv}

float_df = pd.DataFrame(_safe_float_info(s) for s in syms)
adv3m_df = pd.DataFrame(_get_adv3m(s) for s in syms)

work = (work
        .merge(float_df, on="Symbol", how="left")
        .merge(adv3m_df, on="Symbol", how="left"))

# Free-float estimate & Shorts (Free Float %)
ff_from_insiders = work["sharesOutstanding"] * (1 - work["heldPercentInsiders"].fillna(0))
ff_candidates = pd.concat([work["floatShares"], ff_from_insiders], axis=1)
ff_est = ff_candidates.max(axis=1)
work["Date Free Float"] = np.minimum(ff_est, work["Total Volume"])

if "Shorts (%)" not in work.columns or work["Shorts (%)"].isna().any():
    m = work["Short Positions"].notna() & work["Total Volume"].gt(0)
    work.loc[m, "Shorts (%)"] = (work.loc[m, "Short Positions"] / work.loc[m, "Total Volume"]) * 100

m_ff = work["Short Positions"].notna() & work["Date Free Float"].gt(0)
work.loc[m_ff, "Shorts (Free Float %)"] = 100 * work.loc[m_ff, "Short Positions"] / work.loc[m_ff, "Date Free Float"]

suspect = (
    (work["Shorts (Free Float %)"] > 100) |
    (work["Date Free Float"] < 0.10 * work["Total Volume"]) |
    (work["Date Free Float"] <= work["Short Positions"])
)
fallback_ff = np.minimum(work["sharesOutstanding"], work["Total Volume"])
use_fb = suspect & work["sharesOutstanding"].notna()
work.loc[use_fb, "Date Free Float"] = fallback_ff[use_fb]
m_ff2 = work["Short Positions"].notna() & work["Date Free Float"].gt(0)
work.loc[m_ff2, "Shorts (Free Float %)"] = 100 * work.loc[m_ff2, "Short Positions"] / work.loc[m_ff2, "Date Free Float"]

# ---------- Prices for Date & ReleaseDate ----------
prices = yf.download(
    syms,
    start=start.strftime("%Y-%m-%d"),
    end=end.strftime("%Y-%m-%d"),
    group_by="column",
    auto_adjust=False,
    progress=False,
    threads=True,
)
if not isinstance(prices.columns, pd.MultiIndex) and len(syms) == 1:
    prices._yfsym = syms[0]

close_df = _to_close_matrix(prices)
close_df.index = pd.to_datetime(close_df.index).tz_localize(None).normalize()
close_df = close_df.loc[:, ~close_df.columns.duplicated()]

asof_idx = pd.DatetimeIndex(sorted(work["Date_norm"].unique()))
asof_long = close_df.reindex(asof_idx, method="bfill").stack(dropna=False).reset_index()
asof_long.columns = ["Date_norm", "Symbol", "Date Share Price"]

rel_idx = pd.DatetimeIndex(sorted(work["ReleaseDate_norm"].unique()))
rel_long = close_df.reindex(rel_idx, method="bfill").stack(dropna=False).reset_index()
rel_long.columns = ["ReleaseDate_norm", "Symbol", "Release Date Share Price"]

# ---------- MERGE BACK ----------
hist_df = (work
    .merge(asof_long, how="left", on=["Date_norm", "Symbol"])
    .merge(rel_long,  how="left", on=["ReleaseDate_norm", "Symbol"])
    .drop(columns=[
        "Symbol","Date_norm","ReleaseDate_norm",
        "floatShares","sharesOutstanding","heldPercentInsiders"
    ])
    .sort_values(["Date", "Ticker"])
    .reset_index(drop=True)
)

# ---------- coverage + preview ----------
print("Prices on Date:",         hist_df["Date Share Price"].notna().sum(), "/", len(hist_df))
print("Prices on ReleaseDate:",  hist_df["Release Date Share Price"].notna().sum(), "/", len(hist_df))
print("Free float present:",     hist_df["Date Free Float"].notna().sum(), "/", len(hist_df))
print(">100% after fix:",        (hist_df["Shorts (Free Float %)"] > 100).sum())
print("ADV 3M snapshot coverage:", hist_df["ADV 3M"].notna().sum(), "/", len(hist_df))

display(hist_df.head(12)[[
    "Date","ReleaseDate","Ticker","Company",
    "Total Volume","Shorts (%)","Date Free Float","Shorts (Free Float %)",
    "Date Share Price","Release Date Share Price","ADV 3M"
]])


HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
HTTP Error 404: 
$5EADA.AX: possibly delisted; no price data found  (period=1y) (Y

Prices on Date: 59560 / 68328
Prices on ReleaseDate: 63282 / 68328
Free float present: 62334 / 68328
>100% after fix: 93
ADV 3M snapshot coverage: 63929 / 68328


  asof_long = close_df.reindex(asof_idx, method="bfill").stack(dropna=False).reset_index()
  rel_long = close_df.reindex(rel_idx, method="bfill").stack(dropna=False).reset_index()


Unnamed: 0,Date,ReleaseDate,Ticker,Company,Total Volume,Shorts (%),Date Free Float,Shorts (Free Float %),Date Share Price,Release Date Share Price,ADV 3M
0,2023-09-04,2023-09-08,1MC,MORELLA CORPORATION ORDINARY,6138655704,0.0,368429000.0,0.001147,,,223948.0
1,2023-09-04,2023-09-08,29M,29METALSLIMITED ORDINARY,659518854,4.81,659518900.0,4.805173,0.77,0.705,3771277.0
2,2023-09-04,2023-09-08,360,LIFE360 INC. CDI3:1 USPROHEXCLQIB,200494311,1.02,200494300.0,1.02445,9.12,8.54,773644.0
3,2023-09-04,2023-09-08,3DP,POINTERRA LIMITED ORDINARY,711800597,0.04,538145700.0,0.049868,0.094,0.096,727221.0
4,2023-09-04,2023-09-08,4DX,4DMEDICAL LIMITED ORDINARY,345611338,0.05,345611300.0,0.05192,0.61,0.605,1721948.0
5,2023-09-04,2023-09-08,5EA,5EADVANCED CDI 10:1,303996780,0.74,176936200.0,1.269906,8.969999,8.624999,32466.0
6,2023-09-04,2023-09-08,5GG,PENTANET ORDINARY,373727213,0.0,326845300.0,0.00051,0.082,0.077,647652.0
7,2023-09-04,2023-09-08,88E,88 ENERGY LTD ORDINARY,21451988376,0.05,1157350000.0,0.841984,0.15,0.175,1125407.0
8,2023-09-04,2023-09-08,92E,92ENERGY ORDINARY,106375102,0.45,,,,,
9,2023-09-04,2023-09-08,A1M,AIC MINES LTD ORDINARY,462470632,0.0,462470600.0,9.3e-05,0.335,0.35,1784116.0


### Generate specific ticker

In [49]:
ticker_to_display = "GYG"

display(hist_df[hist_df["Ticker"].str.strip().str.upper() == ticker_to_display])

Unnamed: 0,Date,ReleaseDate,Company,Ticker,Short Positions,Total Volume,Shorts (%),ADV 3M,Date Free Float,Shorts (Free Float %),Date Share Price,Release Date Share Price
27788,2024-06-17,2024-06-21,GUZMAN Y GOMEZ LTD ORDINARY,GYG,72981,101352914,0.07,344525.0,66023390.07,0.110538,,29.000000
28462,2024-06-24,2024-06-28,GUZMAN Y GOMEZ LTD ORDINARY,GYG,1870984,101352914,1.85,344525.0,66023390.07,2.833820,28.799999,27.420000
29121,2024-07-01,2024-07-05,GUZMAN Y GOMEZ LTD ORDINARY,GYG,456846,101402414,0.45,344525.0,66023390.07,0.691946,24.830000,27.750000
29772,2024-07-08,2024-07-12,GUZMAN Y GOMEZ LTD ORDINARY,GYG,570461,101402414,0.56,344525.0,66023390.07,0.864029,28.879999,26.969999
30430,2024-07-15,2024-07-19,GUZMAN Y GOMEZ LTD ORDINARY,GYG,629971,101402414,0.62,344525.0,66023390.07,0.954163,26.889999,27.200001
...,...,...,...,...,...,...,...,...,...,...,...,...
65461,2025-07-28,2025-08-01,GUZMAN Y GOMEZ LTD ORDINARY,GYG,6794210,102860548,6.61,344525.0,66023390.07,10.290611,27.950001,27.530001
66086,2025-08-04,2025-08-08,GUZMAN Y GOMEZ LTD ORDINARY,GYG,7392793,102860548,7.19,344525.0,66023390.07,11.197233,27.670000,28.430000
66719,2025-08-11,2025-08-15,GUZMAN Y GOMEZ LTD ORDINARY,GYG,8196830,102860548,7.97,344525.0,66023390.07,12.415040,28.650000,27.879999
67345,2025-08-18,2025-08-22,GUZMAN Y GOMEZ LTD ORDINARY,GYG,8533044,102860548,8.30,344525.0,66023390.07,12.924274,27.940001,23.700001


### Generating the largest short (% of free float) companies

In [50]:
N_WEEKS = 52  # <-- change window length here

df = hist_df.copy()

# Normalize
df["Ticker"] = df["Ticker"].astype(str).str.upper().str.strip()
df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
df["ReleaseDate"] = pd.to_datetime(df["ReleaseDate"], errors="coerce")

# Compute Shorts (Free Float %) if missing; avoid div-by-zero
if "Shorts (Free Float %)" not in df.columns:
    m = df["Short Positions"].notna() & df["Date Free Float"].gt(0)
    df.loc[m, "Shorts (Free Float %)"] = (
        100.0 * df.loc[m, "Short Positions"] / df.loc[m, "Date Free Float"]
    )

# Window: use the as-of Date (preferred); if all NaN, you can fall back to ReleaseDate
cutoff = df["Date"].max() - pd.Timedelta(weeks=N_WEEKS)
window = df[df["Date"] >= cutoff].dropna(subset=["Shorts (Free Float %)"]).copy()

# For each ticker, pick the row where Shorts (Free Float %) is maximal in the window
idx = window.groupby("Ticker")["Shorts (Free Float %)"].idxmax()

peaks = (
    window.loc[idx, [
        "Ticker","Company","Date","ReleaseDate",
        "Shorts (Free Float %)","Shorts (%)","Short Positions",
        "Date Free Float","Total Volume",
        "Date Share Price","Release Date Share Price"
    ]]
    .sort_values("Shorts (Free Float %)", ascending=False)
    .reset_index(drop=True)
)

print(
    f"Peaks over last {N_WEEKS} weeks "
    f"(window start {cutoff.date()}): {len(peaks)} tickers"
)
display(peaks.head(25))  # top 25; remove .head(...) to see all

Peaks over last 52 weeks (window start 2024-08-21): 1040 tickers


Unnamed: 0,Ticker,Company,Date,ReleaseDate,Shorts (Free Float %),Shorts (%),Short Positions,Date Free Float,Total Volume,Date Share Price,Release Date Share Price
0,IMU,IMUGENE LIMITED ORDINARY,2025-02-17,2025-02-21,164.168019,6.46,481718243,293430000.0,7457433979,1.394,1.292
1,GOLD,GBLX GOLD GBLX GOLD,2025-04-18,2025-04-24,68.70052,0.16,171820,250100.0,109178150,,48.099998
2,BCB,BOWEN COAL LIMITED ORDINARY,2024-10-28,2024-11-01,34.493471,1.27,37169475,107758000.0,2930489703,0.8,0.8
3,PEN,PENINSULA ENERGY LTD ORDINARY,2024-09-02,2024-09-06,28.152873,1.9,60710260,215645000.0,3186968010,1.56,1.48
4,BOE,BOSS ENERGY LTD ORDINARY,2025-04-11,2025-04-17,26.810297,25.72,106707600,398009800.0,414921511,2.63,2.58
5,PLS,PILBARA MIN LTD ORDINARY,2024-09-09,2024-09-13,20.385863,20.39,613868286,3011245000.0,3011245080,2.41,2.9
6,PBH,POINTSBET HOLDINGS ORDINARY,2024-09-02,2024-09-06,19.535372,5.46,18083092,92565900.0,331338695,0.495,0.51
7,PDN,PALADIN ENERGY LTD ORDINARY,2025-08-18,2025-08-22,19.10022,18.71,74661038,390891000.0,399063809,6.6,6.61
8,DMP,DOMINO PIZZA ENTERPR ORDINARY,2025-01-13,2025-01-17,18.989525,13.55,12532636,65997630.0,92496790,28.75,29.18
9,MIN,MINERAL RESOURCES. ORDINARY,2025-06-09,2025-06-13,18.017817,15.37,30204653,167637700.0,196518604,,23.9


### Adding variables which we can use to determine short signals

In [51]:
def decide_short_entries(
    df: pd.DataFrame,
    release_date: pd.Timestamp | None = None,
    current_positions: set[str] | None = None,
    max_names: int = 15,
    method: str = "hybrid",        # "hybrid" | "rules" | "rank" | "quantile"
    score_quantile: float = 0.80,  # fallback if rules sparse
    # relaxed rule thresholds
    si_min: float = 6.0,
    dsi4_min: float = 0.25,        # pp over 4w
    dsi1_min: float = 0.50,        # pp WoW
    dtc_min: float = 0.50,
    price_min: float = 0.50,
    notional_adv_min: float = 5e5,
    dtc_cap_for_size: float = 8.0,
    debug: bool = True,
):
    """
    Decide ENTER short or DO NOTHING for the chosen ReleaseDate.
    Returns: (entries_df, entries_array, snap)
    """
    # Full, scored snapshot (already has liquidity + high guards)
    _top, snap = rank_shorts_at_release(
        df=df,
        release_date=release_date,
        N=10_000,
        price_min=price_min,
        notional_adv_min=notional_adv_min,
        dtc_cap_for_size=dtc_cap_for_size,
    )

    if snap.empty:
        return pd.DataFrame(), np.empty((0, 2), dtype=object), pd.DataFrame(columns=["enter_short"])

    # Eligible = not already held
    cur = {t.strip().upper() for t in (current_positions or set())}
    snap["eligible"] = ~snap["Ticker"].str.upper().isin(cur)

    # Strict mask (your originals)
    strict_mask = (snap["rule_high_rising"] | snap["rule_spike"] | snap["rule_persist_10pc_2w"]) & (~snap["flag_extreme_DTC"])

    # Relaxed mask (looser, and no mandatory ret_4w <= 0)
    prev_si = snap.groupby("Ticker")["SI"].shift(1)
    loose_mask = (
        (
            (snap["SI"] >= si_min) &
            ((snap["dSI_4w"] >= dsi4_min) | (snap["dSI_1w"] >= dsi1_min)) &
            (snap["DTC"] >= dtc_min)
        ) |
        (
            (snap["SI"] >= si_min) & (prev_si >= si_min)  # persistence at lower SI
        )
    ) & (~snap["flag_extreme_DTC"])

    base_elig = snap["eligible"]
    candidates = pd.DataFrame()

    if method == "rules":
        candidates = snap[base_elig & (strict_mask | loose_mask)].sort_values("Score", ascending=False)
    elif method == "quantile":
        cut = snap["Score"].quantile(score_quantile)
        candidates = snap[base_elig & (snap["Score"] >= cut)].sort_values("Score", ascending=False)
    elif method == "rank":
        candidates = snap[base_elig].sort_values("Score", ascending=False)
    elif method == "hybrid":
        # 1) relaxed/strict rules first
        candidates = snap[base_elig & (strict_mask | loose_mask)].sort_values("Score", ascending=False)
        # 2) fallback to percentile
        if candidates.empty:
            cut = snap["Score"].quantile(score_quantile)
            candidates = snap[base_elig & (snap["Score"] >= cut)].sort_values("Score", ascending=False)
        # 3) final fallback: top by Score
        if candidates.empty:
            candidates = snap[base_elig].sort_values("Score", ascending=False)
    else:
        raise ValueError("method must be 'hybrid', 'rules', 'rank', or 'quantile'")

    need = max(0, max_names - len(cur))
    chosen = candidates.head(need).copy()

    snap["enter_short"] = snap["Ticker"].isin(chosen["Ticker"])

    entries_df = (
        chosen[["Ticker","Company","ReleaseDate","Date","Date Share Price","Score",
                "rule_high_rising","rule_spike","rule_persist_10pc_2w"]]
        .rename(columns={"Date Share Price": "enter_price"})
        .reset_index(drop=True)
    )
    entries_array = entries_df[["Ticker","enter_price"]].to_numpy(dtype=object)

    if debug:
        print("---- entry debug ----")
        print("snapshot rows:", len(snap))
        print("eligible:", int(base_elig.sum()))
        print("strict hits:", int((base_elig & strict_mask).sum()))
        print("loose  hits:", int((base_elig & loose_mask).sum()))
        if "cut" in locals():
            print(f"score >= {score_quantile:.0%} quantile:", int((base_elig & (snap['Score'] >= cut)).sum()))
        print("chosen:", len(entries_df))

    return entries_df, entries_array, snap

### Create array that enters short positions

In [52]:
def decide_short_entries(
    df: pd.DataFrame,
    release_date: pd.Timestamp | None = None,
    current_positions: set[str] | None = None,
    max_names: int = 15,
    method: str = "hybrid",        # "hybrid" | "rules" | "rank" | "quantile"
    score_quantile: float = 0.80,  # fallback if rules sparse
    # relaxed rule thresholds
    si_min: float = 6.0,
    dsi4_min: float = 0.25,        # pp over 4w
    dsi1_min: float = 0.50,        # pp WoW
    dtc_min: float = 0.50,
    price_min: float = 0.50,
    notional_adv_min: float = 5e5,
    dtc_cap_for_size: float = 8.0,
    debug: bool = True,
):
    """
    Decide ENTER short or DO NOTHING for the chosen ReleaseDate.
    Returns: (entries_df, entries_array, snap)
    """
    # Full, scored snapshot (already has liquidity + high guards)
    _top, snap = rank_shorts_at_release(
        df=df,
        release_date=release_date,
        N=10_000,
        price_min=price_min,
        notional_adv_min=notional_adv_min,
        dtc_cap_for_size=dtc_cap_for_size,
    )

    if snap.empty:
        return pd.DataFrame(), np.empty((0, 2), dtype=object), pd.DataFrame(columns=["enter_short"])

    # Eligible = not already held
    cur = {t.strip().upper() for t in (current_positions or set())}
    snap["eligible"] = ~snap["Ticker"].str.upper().isin(cur)

    # Strict mask (your originals)
    strict_mask = (snap["rule_high_rising"] | snap["rule_spike"] | snap["rule_persist_10pc_2w"]) & (~snap["flag_extreme_DTC"])

    # Relaxed mask (looser, and no mandatory ret_4w <= 0)
    prev_si = snap.groupby("Ticker")["SI"].shift(1)
    loose_mask = (
        (
            (snap["SI"] >= si_min) &
            ((snap["dSI_4w"] >= dsi4_min) | (snap["dSI_1w"] >= dsi1_min)) &
            (snap["DTC"] >= dtc_min)
        ) |
        (
            (snap["SI"] >= si_min) & (prev_si >= si_min)  # persistence at lower SI
        )
    ) & (~snap["flag_extreme_DTC"])

    base_elig = snap["eligible"]
    candidates = pd.DataFrame()

    if method == "rules":
        candidates = snap[base_elig & (strict_mask | loose_mask)].sort_values("Score", ascending=False)
    elif method == "quantile":
        cut = snap["Score"].quantile(score_quantile)
        candidates = snap[base_elig & (snap["Score"] >= cut)].sort_values("Score", ascending=False)
    elif method == "rank":
        candidates = snap[base_elig].sort_values("Score", ascending=False)
    elif method == "hybrid":
        # 1) relaxed/strict rules first
        candidates = snap[base_elig & (strict_mask | loose_mask)].sort_values("Score", ascending=False)
        # 2) fallback to percentile
        if candidates.empty:
            cut = snap["Score"].quantile(score_quantile)
            candidates = snap[base_elig & (snap["Score"] >= cut)].sort_values("Score", ascending=False)
        # 3) final fallback: top by Score
        if candidates.empty:
            candidates = snap[base_elig].sort_values("Score", ascending=False)
    else:
        raise ValueError("method must be 'hybrid', 'rules', 'rank', or 'quantile'")

    need = max(0, max_names - len(cur))
    chosen = candidates.head(need).copy()

    snap["enter_short"] = snap["Ticker"].isin(chosen["Ticker"])

    entries_df = (
        chosen[["Ticker","Company","ReleaseDate","Date","Date Share Price","Score",
                "rule_high_rising","rule_spike","rule_persist_10pc_2w"]]
        .rename(columns={"Date Share Price": "enter_price"})
        .reset_index(drop=True)
    )
    entries_array = entries_df[["Ticker","enter_price"]].to_numpy(dtype=object)

    if debug:
        print("---- entry debug ----")
        print("snapshot rows:", len(snap))
        print("eligible:", int(base_elig.sum()))
        print("strict hits:", int((base_elig & strict_mask).sum()))
        print("loose  hits:", int((base_elig & loose_mask).sum()))
        if "cut" in locals():
            print(f"score >= {score_quantile:.0%} quantile:", int((base_elig & (snap['Score'] >= cut)).sum()))
        print("chosen:", len(entries_df))

    return entries_df, entries_array, snap


## Create the rolling loop

### Params 

In [53]:
# --- global knobs ---
PRICE_MIN          = 0.50
NOTIONAL_ADV_MIN   = 5e5     # A$0.5m
DTC_CAP_FOR_SIZE   = 8.0
MAX_NAMES          = 15
TC_BPS             = 5       # trans. cost per side (set 0 if you want)

# exit rules
EXIT_IF_DSI_NONPOS = True                  # ΔSI_4w <= 0 and ret_4w > 0
EXIT_IF_DTC_COMPRESS = 0.30                # 30% compression vs entry
MAX_HOLD_WEEKS     = 4                     # time stop
STOP_LOSS          = 0.15                  # 15% adverse move on short

# entry style
ENTRY_METHOD       = "hybrid"              # "hybrid" | "rules" | "quantile" | "rank"
ENTRY_Q            = 0.80                  # fallback quantile for hybrid/quantile
SI_MIN             = 6.0                   # relaxed rule thresholds
DSI4_MIN           = 0.25
DSI1_MIN           = 0.50
DTC_MIN            = 0.50


### Snapshot scorer

In [54]:
def snapshot_with_scores(df_feat: pd.DataFrame, release_date):
    """
    Cross-sectional scoring for ONE ReleaseDate using your guards.
    """
    snap = df_feat.loc[df_feat["ReleaseDate"] == pd.to_datetime(release_date)].copy()
    if snap.empty:
        return snap

    snap["ADV_notional"] = snap["ADV_20"] * snap["Date Share Price"]
    snap = snap[(snap["Date Share Price"] >= PRICE_MIN) & (snap["ADV_notional"] >= NOTIONAL_ADV_MIN)].copy()

    # treat 'is_3m_high' as False when not enough history
    snap = snap[~snap["is_3m_high"].fillna(False)].copy()

    def z_cs(s):
        std = s.std(ddof=0)
        z = (s - s.mean()) / (std if std > 0 else 1.0)
        return z.clip(-4, 4)

    snap["z_SI"]   = z_cs(snap["SI"])
    snap["z_dSI4"] = z_cs(snap["dSI_4w"])
    snap["z_DTC"]  = z_cs(snap["DTC"])
    snap["z_r4"]   = z_cs(snap["ret_4w"])

    snap["Score"] = snap["z_SI"] + snap["z_dSI4"] + snap["z_DTC"] - 0.5*snap["z_r4"]
    snap["flag_extreme_DTC"] = snap["DTC"] > DTC_CAP_FOR_SIZE

    # original rule flags (kept for visibility)
    snap["rule_high_rising"]     = (snap["SI"] >= 10.0) & (snap["dSI_4w"] >= 1.0) & (snap["DTC"] >= 3) & (snap["ret_4w"] <= 0)
    snap["rule_persist_10pc_2w"] = (snap["SI"] >= 10.0) & (snap.groupby("Ticker")["SI"].shift(1) >= 10.0)
    snap["rule_spike"]           = (snap["dSI_1w"] >= 1.5) & (snap["ret_4w"] <= 0)

    return snap


### Entry decision (relaxed + hybrid)

In [55]:
def decide_entries_for_date(df_feat: pd.DataFrame, release_date, current_positions: set[str]):
    """
    Decide which new shorts to ENTER at 'release_date', given current open positions.
    Returns entries_df (with enter_price), and the full snapshot with a 'enter_short' flag.
    """
    snap = snapshot_with_scores(df_feat, release_date)
    if snap.empty:
        return pd.DataFrame(), snap.assign(enter_short=False)

    cur = {t.strip().upper() for t in (current_positions or set())}
    snap["eligible"] = ~snap["Ticker"].str.upper().isin(cur)

    strict_mask = (snap["rule_high_rising"] | snap["rule_spike"] | snap["rule_persist_10pc_2w"]) & (~snap["flag_extreme_DTC"])

    prev_si = snap.groupby("Ticker")["SI"].shift(1)
    loose_mask = (
        (
            (snap["SI"] >= SI_MIN) &
            ((snap["dSI_4w"] >= DSI4_MIN) | (snap["dSI_1w"] >= DSI1_MIN)) &
            (snap["DTC"] >= DTC_MIN)
        ) |
        (
            (snap["SI"] >= SI_MIN) & (prev_si >= SI_MIN)
        )
    ) & (~snap["flag_extreme_DTC"])

    base_elig = snap["eligible"]

    # selection: hybrid -> rules → percentile → top by Score
    if ENTRY_METHOD in ("hybrid", "rules"):
        candidates = snap[base_elig & (strict_mask | loose_mask)].sort_values("Score", ascending=False)
    else:
        candidates = pd.DataFrame()

    if (ENTRY_METHOD in ("hybrid", "quantile")) and candidates.empty:
        cut = snap["Score"].quantile(ENTRY_Q)
        candidates = snap[base_elig & (snap["Score"] >= cut)].sort_values("Score", ascending=False)

    if (ENTRY_METHOD in ("hybrid", "rank")) and candidates.empty:
        candidates = snap[base_elig].sort_values("Score", ascending=False)

    need = max(0, MAX_NAMES - len(cur))
    chosen = candidates.head(need).copy()

    snap["enter_short"] = snap["Ticker"].isin(chosen["Ticker"])

    entries = (
        chosen[["Ticker","Company","ReleaseDate","Date","Date Share Price","Score"]]
        .rename(columns={"Date Share Price": "enter_price"})
        .reset_index(drop=True)
    )
    return entries, snap


### Exit rules on the new week’s snapshot

In [56]:
def decide_exits_for_date(df_feat: pd.DataFrame, release_date, positions: dict):
    """
    Evaluate exits using ONLY info known at 'release_date'.
    'positions' is a dict: ticker -> {'entry_date','entry_price','last_price','entry_DTC','weeks_held'}
    Returns: (to_exit: list[(ticker, reason)], pricedict: ticker->price_at_t)
    """
    snap = snapshot_with_scores(df_feat, release_date)
    price_map = snap.set_index("Ticker")["Date Share Price"].to_dict()
    dsi4_map  = snap.set_index("Ticker")["dSI_4w"].to_dict()
    r4_map    = snap.set_index("Ticker")["ret_4w"].to_dict()
    dtc_map   = snap.set_index("Ticker")["DTC"].to_dict()

    to_exit = []
    for tik, st in positions.items():
        px_t = price_map.get(tik, np.nan)

        # 1) primary “trend deteriorates” exit
        cond_trend = EXIT_IF_DSI_NONPOS and (dsi4_map.get(tik, 0) <= 0) and (r4_map.get(tik, 0) > 0)

        # 2) DTC compression exit
        entry_dtc = st.get("entry_DTC", np.nan)
        now_dtc   = dtc_map.get(tik, np.nan)
        cond_dtc  = (EXIT_IF_DTC_COMPRESS is not None) and np.isfinite(entry_dtc) and np.isfinite(now_dtc) and (now_dtc <= (1-EXIT_IF_DTC_COMPRESS)*entry_dtc)

        # 3) time stop
        cond_time = (MAX_HOLD_WEEKS is not None) and (st.get("weeks_held", 0) >= MAX_HOLD_WEEKS)

        # 4) stop loss (mark vs entry)
        cond_sl = False
        if (STOP_LOSS is not None) and np.isfinite(px_t) and np.isfinite(st.get("entry_price", np.nan)):
            cond_sl = (px_t / st["entry_price"] - 1.0) >= STOP_LOSS

        if cond_trend or cond_dtc or cond_time or cond_sl or (not np.isfinite(px_t)):
            reason = ",".join([k for k, v in {
                "trend": cond_trend, "dtc": cond_dtc, "time": cond_time, "sl": cond_sl, "na": (not np.isfinite(px_t))
            }.items() if v])
            to_exit.append((tik, reason or "exit"))

    return to_exit, price_map

### One “week step”: mark-to-market, exit, enter

In [57]:
def step_one_week(df_feat: pd.DataFrame, release_date, positions: dict):
    """
    Do one weekly step at 'release_date':
      - mark PnL from last_price -> price_t on open shorts
      - decide exits at t (close at price_t)
      - decide entries at t (enter at price_t) to refill up to MAX_NAMES
    Returns: (new_positions, period_ret, trades_df)
    """
    trades = []
    period_ret = 0.0

    # --- mark-to-market open positions to price at t ---
    snap = snapshot_with_scores(df_feat, release_date)
    price_map = snap.set_index("Ticker")["Date Share Price"].to_dict()

    if positions:
        w = 1.0 / len(positions)
        for tik, st in list(positions.items()):
            px_t = price_map.get(tik, np.nan)
            if np.isfinite(px_t) and np.isfinite(st.get("last_price", np.nan)):
                leg_ret = (st["last_price"] - px_t) / st["last_price"]  # short return
                period_ret += w * leg_ret
                st["last_price"] = float(px_t)
                st["weeks_held"] = st.get("weeks_held", 0) + 1

    # --- exits at t (close at price_t) ---
    to_exit, pxmap_for_exit = decide_exits_for_date(df_feat, release_date, positions)
    for tik, reason in to_exit:
        st = positions.pop(tik, None)
        if st is None: 
            continue
        exit_px = float(pxmap_for_exit.get(tik, st["last_price"]))
        trades.append({"date": release_date, "ticker": tik, "action": "EXIT", "price": exit_px, "reason": reason})
        # exit transaction cost
        period_ret -= (TC_BPS/1e4) * (1.0 / max(1, MAX_NAMES))

    # --- entries at t (fill up to MAX_NAMES) ---
    current = set(positions.keys())
    entries_df, _snap = decide_entries_for_date(df_feat, release_date, current_positions=current)
    for _, r in entries_df.iterrows():
        tik = r["Ticker"]
        px  = float(r["enter_price"])
        positions[tik] = {
            "entry_date": release_date,
            "entry_price": px,
            "last_price":  px,
            "entry_DTC":   float(_snap.loc[_snap["Ticker"]==tik, "DTC"].iloc[0]) if "DTC" in _snap.columns else np.nan,
            "weeks_held":  0,
            "entry_score": float(r["Score"])
        }
        trades.append({"date": release_date, "ticker": tik, "action": "ENTER", "price": px, "score": float(r["Score"])})
        # entry transaction cost
        period_ret -= (TC_BPS/1e4) * (1.0 / MAX_NAMES)

    return positions, period_ret, pd.DataFrame(trades)


### Parameters

In [58]:
# ==== USER PARAMS ====
TRAIN_WEEKS   = 40   # X: warm-up weeks (no trading)
TEST_WEEKS    = 52   # Y: weeks to trade after warm-up

MAX_NAMES     = 15   # max concurrent shorts
PRICE_MIN     = 0.50
NOTIONAL_ADV_MIN = 5e5   # A$0.5m
TC_BPS        = 5        # per side, set 0 to ignore

# Simple exits
MAX_HOLD_WEEKS = 4
EXIT_TREND     = True     # exit if dSI_4w <= 0 AND ret_4w > 0
EXIT_DTC_COMP  = 0.30     # exit if DTC compresses >=30% from entry (set None to disable)

# Optional guard against shorting fresh highs
USE_HIGH_GUARD = False    # keep False for simplicity


### 1) Features

In [59]:
def build_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # Types
    for c in ["Date", "ReleaseDate"]:
        df[c] = pd.to_datetime(df[c], errors="coerce")
    num_cols = [
        "Shorts (Free Float %)", "Shorts (%)", "Short Positions",
        "Total Volume", "Date Share Price", "Release Date Share Price",
        "Date Free Float"
    ]
    for c in num_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # SI source (prefer free-float)
    df["SI"] = df["Shorts (Free Float %)"] if "Shorts (Free Float %)" in df.columns else df["Shorts (%)"]

    df = df.sort_values(["Ticker", "Date"])
    g  = df.groupby("Ticker", group_keys=False)

    # Returns from weekly close
    df["ret_1w"] = g["Date Share Price"].pct_change(periods=1, fill_method=None)
    df["ret_4w"] = g["Date Share Price"].pct_change(periods=4, fill_method=None)

    # Short-interest changes
    df["dSI_1w"] = g["SI"].diff(1)
    df["dSI_4w"] = g["SI"].diff(4)

    # ADV and DTC
    roll_vol_4w = g["Total Volume"].rolling(4, min_periods=2).sum().reset_index(level=0, drop=True)
    df["ADV_20"] = roll_vol_4w / 20.0
    df["DTC"]    = df["Short Positions"] / df["ADV_20"]

    # 12-week high guard (optional)
    roll_max_12w = g["Date Share Price"].rolling(12, min_periods=12).max().reset_index(level=0, drop=True)
    df["is_12w_high"] = (df["Date Share Price"] >= roll_max_12w * (1 - 1e-9))

    return df


### 2) One-week cross-section scoring

In [60]:
def score_snapshot(feat: pd.DataFrame, date) -> pd.DataFrame:
    """Return a scored cross-section for a single ReleaseDate."""
    snap = feat[feat["ReleaseDate"] == pd.to_datetime(date)].copy()
    if snap.empty:
        return snap

    # Liquidity
    snap["ADV_notional"] = snap["ADV_20"] * snap["Date Share Price"]
    snap = snap[
        (snap["Date Share Price"] >= PRICE_MIN) &
        (snap["ADV_notional"] >= NOTIONAL_ADV_MIN)
    ].copy()

    # Optional: drop fresh highs (treat NaN as False)
    if USE_HIGH_GUARD:
        snap = snap[~snap["is_12w_high"].fillna(False)].copy()

    # Cross-sectional z-scores (clipped)
    def z(s):
        std = s.std(ddof=0)
        out = (s - s.mean()) / (std if std > 0 else 1.0)
        return out.clip(-4, 4)

    snap["z_SI"]   = z(snap["SI"])
    snap["z_dSI4"] = z(snap["dSI_4w"])
    snap["z_DTC"]  = z(snap["DTC"])
    snap["z_r4"]   = z(snap["ret_4w"])

    # Composite: high, rising SI & tight liquidity; penalize positive 4w return
    snap["Score"] = snap["z_SI"] + snap["z_dSI4"] + snap["z_DTC"] - 0.5*snap["z_r4"]

    return snap.sort_values("Score", ascending=False)


### Simple entry & exit logic

In [61]:
def pick_entries(snap: pd.DataFrame, current_positions: set) -> pd.DataFrame:
    """Pick top names by Score not already held, up to MAX_NAMES."""
    if snap.empty: 
        return snap
    eligible = ~snap["Ticker"].str.upper().isin({t.upper() for t in current_positions})
    need = max(0, MAX_NAMES - len(current_positions))
    return snap[eligible].head(need).copy()

def decide_exits(feat: pd.DataFrame, date, positions: dict) -> list[tuple[str, str]]:
    """
    Decide which open shorts to exit at 'date'.
    positions: {ticker: {'entry_price','last_price','entry_DTC','weeks_held'}}
    Returns list of (ticker, reason)
    """
    snap = score_snapshot(feat, date)
    px  = snap.set_index("Ticker")["Date Share Price"].to_dict()
    dsi = snap.set_index("Ticker")["dSI_4w"].to_dict()
    r4  = snap.set_index("Ticker")["ret_4w"].to_dict()
    dtc = snap.set_index("Ticker")["DTC"].to_dict()

    to_exit = []
    for tkr, st in positions.items():
        price_t = px.get(tkr, np.nan)

        conds = []

        # Trend deterioration
        if EXIT_TREND:
            conds.append((dsi.get(tkr, 0) <= 0) and (r4.get(tkr, 0) > 0))

        # DTC compression
        if EXIT_DTC_COMP is not None and np.isfinite(st.get("entry_DTC", np.nan)) and np.isfinite(dtc.get(tkr, np.nan)):
            conds.append(dtc[tkr] <= (1 - EXIT_DTC_COMP) * st["entry_DTC"])

        # Max hold
        if MAX_HOLD_WEEKS is not None:
            conds.append(st.get("weeks_held", 0) >= MAX_HOLD_WEEKS)

        # Missing price → exit defensively
        if not np.isfinite(price_t):
            conds.append(True)

        if any(conds):
            reason = ",".join([k for k, v in {
                "trend": EXIT_TREND and (dsi.get(tkr, 0) <= 0) and (r4.get(tkr, 0) > 0),
                "dtc": (EXIT_DTC_COMP is not None) and np.isfinite(st.get("entry_DTC", np.nan)) and np.isfinite(dtc.get(tkr, np.nan)) and (dtc[tkr] <= (1 - EXIT_DTC_COMP)*st["entry_DTC"]),
                "time": (MAX_HOLD_WEEKS is not None) and (st.get("weeks_held", 0) >= MAX_HOLD_WEEKS),
                "na": not np.isfinite(price_t)
            }.items() if v])
            to_exit.append((tkr, reason or "exit"))

    return to_exit


### 4) One step per week (mark -> exit -> enter)

In [62]:
def step_week(feat: pd.DataFrame, date, positions: dict):
    """
    At week 'date':
      - mark PnL on open shorts (close-to-close)
      - exit per rules
      - enter new names up to MAX_NAMES
    Returns: (new_positions, period_ret, trades_df)
    """
    trades = []
    snap   = score_snapshot(feat, date)
    price_map = snap.set_index("Ticker")["Date Share Price"].to_dict()

    # Mark-to-market open shorts
    period_ret = 0.0
    if positions:
        w = 1.0 / len(positions)
        for tkr, st in list(positions.items()):
            p_new = price_map.get(tkr, np.nan)
            if np.isfinite(p_new) and np.isfinite(st["last_price"]):
                leg_ret = (st["last_price"] - p_new) / st["last_price"]  # short
                period_ret += w * leg_ret
                st["last_price"] = float(p_new)
                st["weeks_held"] = st.get("weeks_held", 0) + 1

    # Exits
    for tkr, reason in decide_exits(feat, date, positions):
        exit_px = price_map.get(tkr, positions[tkr]["last_price"])
        trades.append({"date": date, "ticker": tkr, "action": "EXIT", "price": float(exit_px), "reason": reason})
        positions.pop(tkr, None)
        period_ret -= (TC_BPS/1e4) * (1.0 / max(1, MAX_NAMES))

    # Entries (fill up to MAX_NAMES)
    entries = pick_entries(snap, set(positions.keys()))
    for _, r in entries.iterrows():
        tkr = r["Ticker"]
        px  = float(r["Date Share Price"])
        positions[tkr] = {
            "entry_date": date,
            "entry_price": px,
            "last_price":  px,
            "entry_DTC":   float(r["DTC"]) if "DTC" in r else np.nan,
            "weeks_held":  0,
            "entry_score": float(r["Score"])
        }
        trades.append({"date": date, "ticker": tkr, "action": "ENTER", "price": px, "score": float(r["Score"])})
        period_ret -= (TC_BPS/1e4) * (1.0 / MAX_NAMES)

    return positions, period_ret, pd.DataFrame(trades)

### 5) Backtest

In [63]:
def run_backtest(hist_df: pd.DataFrame, train_weeks: int, test_weeks: int):
    feat = build_features(hist_df)
    dates = sorted(feat["ReleaseDate"].dropna().unique())

    if len(dates) < train_weeks + test_weeks:
        raise ValueError(f"Need at least {train_weeks + test_weeks} weeks; you have {len(dates)}.")

    # Define windows
    train_dates = dates[:train_weeks]                          # warm-up only (no trading)
    test_dates  = dates[train_weeks : train_weeks + test_weeks]# rolling trading window

    print(f"Train (warm-up): {len(train_dates)} weeks — {train_dates[0].date()} → {train_dates[-1].date()}")
    print(f"Test (trading):  {len(test_dates)} weeks — {test_dates[0].date()} → {test_dates[-1].date()}")

    positions = {}
    equity_rows = []
    all_trades  = []

    # Walk test window only
    for d in test_dates:
        positions, period_ret, trades = step_week(feat, d, positions)
        equity_rows.append({"date": d, "period_ret": period_ret, "num_pos": len(positions)})
        all_trades.append(trades)

    eq = pd.DataFrame(equity_rows).sort_values("date")
    eq["cum_ret"] = (1 + eq["period_ret"].fillna(0)).cumprod() - 1
    trades_df = pd.concat(all_trades, ignore_index=True) if all_trades else pd.DataFrame(columns=["date","ticker","action","price"])

    return feat, eq, trades_df, positions


### 6) Run it

In [64]:
feat, eq, trades, live_pos = run_backtest(hist_df, TRAIN_WEEKS, TEST_WEEKS)

display(eq.head(3)); display(eq.tail(3))
print("Final cumulative return:", f"{eq['cum_ret'].iloc[-1]*100:.2f}%")
print("Total trades:", len(trades))
display(trades.head(10))
print("Open positions at end:", list(live_pos.keys()))


Train (warm-up): 40 weeks — 2023-09-08 → 2024-06-07
Test (trading):  52 weeks — 2024-06-14 → 2025-06-06


Unnamed: 0,date,period_ret,num_pos,cum_ret
0,2024-06-14,0.0,0,0.0
1,2024-06-21,-0.0005,15,-0.0005
2,2024-06-28,0.030417,15,0.029901


Unnamed: 0,date,period_ret,num_pos,cum_ret
49,2025-05-23,0.010562,15,0.026642
50,2025-05-30,0.002492,15,0.029201
51,2025-06-06,0.055264,15,0.086079


Final cumulative return: 8.61%
Total trades: 449


Unnamed: 0,date,ticker,action,price,score,reason
0,2024-06-21,PLS,ENTER,3.14,7.54785,
1,2024-06-21,LTR,ENTER,1.0,7.18251,
2,2024-06-21,CHN,ENTER,1.27,6.523287,
3,2024-06-21,FLT,ENTER,19.540001,5.345567,
4,2024-06-21,WGX,ENTER,2.26,4.972388,
5,2024-06-21,LYC,ENTER,6.12,4.96381,
6,2024-06-21,IEL,ENTER,15.46,4.750128,
7,2024-06-21,BOE,ENTER,4.01,4.652303,
8,2024-06-21,CTT,ENTER,2.28,4.623425,
9,2024-06-21,IMU,ENTER,1.972,4.166781,


Open positions at end: ['BRG', 'LTR', 'IMU', 'PLS', 'CTD', 'NUF', 'MIN', 'IGO', 'MSB', 'EVN', 'PNV', 'LIC', 'RMS', 'NAN', 'PDN']
