In [None]:
# pip install pandas numpy yfinance pandas_market_calendars pyarrow
import os, math, json, time, warnings
from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
import pandas as pd
import yfinance as yf

# --------- CONFIG ---------
INPUT_CSV   = "oip_mega_wretvol.csv"
OUTPUT_CSV  = "oip_mega_boost1.csv"
DATE_COL_TRADE  = "trade_date"
DATE_COL_FILE   = "filing_date"
DATE_COL_MEBUY  = "mebuydate"       # optional; falls back to filing_date if missing
TICKER_COL      = "ticker"

# Optional: drop rows with excessive delay (set to None to keep all)
MAX_TD_DELAY = None  # e.g., 10

# Value/price columns (first available is used)
VALUE_COL_CANDIDATES = ["value", "transaction_value", "amount_usd", "amount"]
PRICE_COL_CANDIDATES = ["price", "avg_price", "price_per_share", "transaction_price", "px"]

# --------- CACHE ---------
CACHE_DIR     = "cache"
PRICES_CACHE  = os.path.join(CACHE_DIR, "prices.parquet")
SHARES_CACHE  = os.path.join(CACHE_DIR, "shares.json")
os.makedirs(CACHE_DIR, exist_ok=True)

# --------- UTILS ---------
def to_dt(s):
    if pd.isna(s):
        return pd.NaT
    return pd.to_datetime(s, errors="coerce").normalize()

def first_existing_col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

def ensure_cols(df, needed):
    missing = [c for c in needed if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

def normalize_to_yahoo_symbol(x: str) -> str:
    # Yahoo uses '-' instead of '.' for share classes (e.g., BRK.B -> BRK-B)
    return str(x).strip().upper().replace(".", "-")

def get_trading_days_delay(trade_dates, file_dates):
    try:
        import pandas_market_calendars as pmc
        nyse = pmc.get_calendar("XNYS")
        start = min(trade_dates.min(), file_dates.min())
        end   = max(trade_dates.max(), file_dates.max())
        sched = nyse.schedule(start_date=start - pd.Timedelta(days=10),
                              end_date=end + pd.Timedelta(days=10))
        trading_days = nyse.valid_days(schedule=sched).tz_localize(None).normalize()
        trading_days = pd.Index(trading_days)
        pos = pd.Series(np.arange(len(trading_days), dtype="int64"), index=trading_days)
        trade_pos = trade_dates.map(lambda d: pos.get(d, np.nan))
        file_pos  = file_dates.map(lambda d: pos.get(d, np.nan))
        td = (file_pos - trade_pos).astype("Float64")
        return td.round().astype("Int64")
    except Exception:
        # Weekday-only fallback
        t = trade_dates.values.astype("datetime64[D]")
        f = file_dates.values.astype("datetime64[D]")
        return pd.Series(np.busday_count(t, f), index=trade_dates.index, dtype="int64")

# ---- replace parquet cache with pickle cache ----
PRICES_CACHE = os.path.join(CACHE_DIR, "prices.pkl")  # change extension

def load_prices_cache():
    try:
        return pd.read_pickle(PRICES_CACHE) if os.path.exists(PRICES_CACHE) else None
    except Exception:
        return None  # corrupt/old cache -> ignore

def save_prices_cache(df_prices):
    df_prices.to_pickle(PRICES_CACHE)

def append_prices_cache(new_df):
    old = load_prices_cache()
    if old is None or old.empty:
        save_prices_cache(new_df)
        return new_df
    # union columns & index, prefer new non-nulls
    df = old.combine_first(new_df).join(new_df, how="outer", rsuffix="_new")
    for c in new_df.columns:
        cn = c + "_new"
        if cn in df.columns:
            df[c] = df[c].fillna(df[cn])
            df.drop(columns=[cn], inplace=True)
    df.sort_index(inplace=True)
    save_prices_cache(df)
    return df


def load_shares_cache():
    return json.load(open(SHARES_CACHE)) if os.path.exists(SHARES_CACHE) else {}

def save_shares_cache(d):
    with open(SHARES_CACHE, "w") as f:
        json.dump(d, f)

def batch_yf_download(tickers, start, end, batch_size=80, pause=1.5):
    want = sorted(set(tickers))
    cached = load_prices_cache()
    need = want if cached is None else [t for t in want if t not in cached.columns]
    out = cached if cached is not None else pd.DataFrame()

    for i in range(0, len(need), batch_size):
        chunk = need[i:i+batch_size]
        for attempt in range(3):
            try:
                data = yf.download(
                    tickers=chunk, start=start, end=end + pd.Timedelta(days=2),
                    auto_adjust=True, progress=False, group_by="ticker", threads=True
                )
                break
            except Exception:
                time.sleep(2**attempt)
        if isinstance(data.columns, pd.MultiIndex):
            closes = data.xs("Close", axis=1, level=1)
        else:
            closes = pd.DataFrame({chunk[0]: data.get("Close")})
        closes.index = pd.to_datetime(closes.index).normalize()
        closes = closes.dropna(how="all").sort_index()

        out = closes if out is None or out.empty else out.join(closes, how="outer")
        out = out.sort_index()
        append_prices_cache(closes)
        print(f"Downloaded {i+len(chunk)}/{len(need)} tickers…")
        time.sleep(pause)
    return out

def fetch_one_shares(t):
    so = np.nan
    try:
        tk = yf.Ticker(t)
        try:
            so = getattr(tk.fast_info, "shares_outstanding", None)
        except Exception:
            so = None
        if not so:
            info = tk.info or {}
            so = info.get("sharesOutstanding") or info.get("impliedSharesOutstanding")
        return t, float(so) if so else np.nan
    except Exception:
        return t, np.nan

def get_shares_outstanding_cached(tickers, max_workers=8):
    cache = load_shares_cache()
    todo = [t for t in tickers if t not in cache or cache[t] in (None, 0, "nan")]
    if not todo:
        return cache
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = [ex.submit(fetch_one_shares, t) for t in todo]
        for n, f in enumerate(as_completed(futs), 1):
            t, so = f.result()
            cache[t] = so
            if n % 100 == 0:
                print(f"Shares fetched: {n}/{len(todo)}")
    save_shares_cache(cache)
    return cache

def nearest_previous_close(closes_df, date, ticker):
    if closes_df is None or closes_df.empty or pd.isna(date) or ticker not in closes_df.columns:
        return np.nan
    # exact
    try:
        if date in closes_df.index:
            v = closes_df.at[date, ticker]
            if pd.notna(v):
                return float(v)
    except Exception:
        pass
    # previous available
    loc = closes_df.index.searchsorted(date, side="right") - 1
    while loc >= 0:
        v = closes_df.iloc[loc][ticker]
        if pd.notna(v):
            return float(v)
        loc -= 1
    return np.nan

# --------- MAIN PIPELINE ---------
def main(input_csv=INPUT_CSV, output_csv=OUTPUT_CSV):
    df = pd.read_csv(input_csv)
    ensure_cols(df, [TICKER_COL, DATE_COL_TRADE, DATE_COL_FILE])

    # Dates
    df[DATE_COL_TRADE] = pd.to_datetime(df[DATE_COL_TRADE], errors="coerce").dt.normalize()
    df[DATE_COL_FILE]  = pd.to_datetime(df[DATE_COL_FILE],  errors="coerce").dt.normalize()
    if DATE_COL_MEBUY in df.columns:
        df[DATE_COL_MEBUY] = pd.to_datetime(df[DATE_COL_MEBUY], errors="coerce").dt.normalize()
    else:
        df[DATE_COL_MEBUY] = pd.NaT

    # Tickers → Yahoo format
    df[TICKER_COL] = df[TICKER_COL].astype(str).map(normalize_to_yahoo_symbol)
    tickers = sorted(set(df[TICKER_COL].dropna()))

    # Feature 1: trading‑day delay
    df["trade2file_td"] = get_trading_days_delay(df[DATE_COL_TRADE], df[DATE_COL_FILE])
    # Optional: clip absurd negatives (bad data) and warn
    bad = df["trade2file_td"] < 0
    if bad.any():
        warnings.warn(f"{bad.sum()} rows have filing before trade (negative delay). Clipping to 0.")
        df.loc[bad, "trade2file_td"] = 0

    # Optional filter by max delay
    if isinstance(MAX_TD_DELAY, (int, float)) and MAX_TD_DELAY >= 0:
        before = len(df)
        df = df[df["trade2file_td"].astype("float64") <= MAX_TD_DELAY].copy()
        print(f"Filtered by MAX_TD_DELAY={MAX_TD_DELAY}: {before} -> {len(df)} rows")

    # Price window
    end_dates = df[DATE_COL_MEBUY].fillna(df[DATE_COL_FILE])
    min_d = df[DATE_COL_TRADE].min()
    max_d = end_dates.max()

    # Batch download closes (resumable cache)
    closes = batch_yf_download(tickers, min_d - pd.Timedelta(days=5), max_d, batch_size=80, pause=1.2)
    if closes is None or closes.empty:
        warnings.warn("No prices downloaded; ret_trade_to_mebuy will be NaN.")

    # Column names normalized
    if closes is not None and not closes.empty:
        closes.columns = [normalize_to_yahoo_symbol(c) for c in closes.columns]

    # Feature 2: ret_trade_to_mebuy
    trade_px = np.empty(len(df)); trade_px[:] = np.nan
    mebuy_px = np.empty(len(df)); mebuy_px[:] = np.nan

    for i, (tkr, d_trade, d_mebuy, d_file) in enumerate(zip(
        df[TICKER_COL].values,
        df[DATE_COL_TRADE].values,
        df[DATE_COL_MEBUY].values,
        df[DATE_COL_FILE].values
    )):
        d_target = d_mebuy if pd.notna(d_mebuy) else d_file
        trade_px[i] = nearest_previous_close(closes, d_trade, tkr)
        mebuy_px[i] = nearest_previous_close(closes, d_target, tkr)
        if (i+1) % 5000 == 0:
            print(f"Prices looked up for {i+1} rows…")

    df["px_trade_close"] = trade_px
    df["px_mebuy_close"] = mebuy_px
    df["ret_trade_to_mebuy"] = np.where(
        (df["px_trade_close"] > 0) & (df["px_mebuy_close"] > 0),
        df["px_mebuy_close"] / df["px_trade_close"] - 1.0,
        np.nan
    )

    # Feature 3: purchase_pct_mcap
    value_col = first_existing_col(df, VALUE_COL_CANDIDATES)
    price_col = first_existing_col(df, PRICE_COL_CANDIDATES)

    # Coerce to numeric safely
    def num(s): return pd.to_numeric(s, errors="coerce")

    if value_col is not None:
        purchase_value = num(df[value_col])
    else:
        qty_col = "qty" if "qty" in df.columns else None
        if qty_col and price_col:
            purchase_value = num(df[qty_col]) * num(df[price_col])
        else:
            purchase_value = pd.Series(np.nan, index=df.index, dtype="float64")

    # Shares outstanding (cached, non‑historical approximation)
    shares_map = get_shares_outstanding_cached(tickers, max_workers=8)
    df["shares_outstanding_curr"] = df[TICKER_COL].map(shares_map)
    df["mcap_trade_approx"] = df["px_trade_close"] * df["shares_outstanding_curr"]

    df["purchase_pct_mcap"] = np.where(
        (df["mcap_trade_approx"] > 0) & (purchase_value > 0),
        100.0 * purchase_value / df["mcap_trade_approx"],
        np.nan
    )

    nan_rate = df["purchase_pct_mcap"].isna().mean()
    if nan_rate > 0.25:
        warnings.warn(f"'purchase_pct_mcap' NaN ratio = {nan_rate:.0%}. "
                      f"Likely missing prices or shares for some tickers.")

    # Save
    df.to_csv(output_csv, index=False)
    print(f"Enriched {len(df):,} rows -> {output_csv}")
    print(df[[TICKER_COL, DATE_COL_TRADE, DATE_COL_FILE, "trade2file_td",
              "ret_trade_to_mebuy", "purchase_pct_mcap"]].head(8))

if __name__ == "__main__":
    main()


ArrowKeyError: No type extension with name arrow.py_extension_type found