## Preflight & Bootstrap (added)
These cells load cached data and prevent accidental re-downloads on rerun.

In [None]:
# === PREFLIGHT (expanded cache status + no-scrape flags) ===
from pathlib import Path
import pandas as pd

OUT = Path("outputs")

paths = {
    "prices_wide.parquet"        : OUT / "prices_wide.parquet",
    "meta_yq.parquet"            : OUT / "meta_yq.parquet",
    "meta_yq_progress.parquet"   : OUT / "meta_yq_progress.parquet",
    "sec_recent.parquet"         : OUT / "sec_recent.parquet",
    "sec_cache/"                 : OUT / "sec_cache",
    "sec_facts/"                 : OUT / "sec_facts",
    "yq_symbol_cache/"           : OUT / "yq_symbol_cache",
    "split_events.parquet"       : OUT / "split_events.parquet",
    "shares_events.parquet"      : OUT / "shares_events.parquet",
    "yf_snapshot_cache/"         : OUT / "yf_snapshot_cache",
    "yq_float_snapshot.parquet"  : OUT / "yq_float_snapshot.parquet",
    "analysis_enriched.parquet"  : OUT / "analysis_enriched.parquet",
}

def _size(p: Path):
    try:
        if p.is_dir():
            return f"{len(list(p.glob('*')))} files"
        return f"{p.stat().st_size/1_048_576:.1f} MB"
    except Exception:
        return "-"

for name, p in paths.items():
    status = "FOUND" if p.exists() else "—"
    print(f"{name:<28} {status:>6}    ({_size(p)})")

# ===== Run-mode flags (keep these False to avoid re-scraping) =====
REBUILD_PRICES        = False
REBUILD_META          = False
REBUILD_SEC           = False
REBUILD_YQ_FLOAT      = False     # do not rebuild Yahoo float snapshot
YQ_FLOAT_CACHE_ONLY   = True      # use snapshot/cache only, no network for float backfill

print("\nMode:",
      "prices:", "LOAD" if not REBUILD_PRICES else "REBUILD", "|",
      "meta:",   "LOAD" if not REBUILD_META   else "REBUILD", "|",
      "sec:",    "LOAD" if not REBUILD_SEC    else "REBUILD", "|",
      "yq_float:", "LOAD" if not REBUILD_YQ_FLOAT else "REBUILD",
      "| cache_only:", YQ_FLOAT_CACHE_ONLY)

# Optional master kill-switch if you like:
NO_NETWORK = True  # leave True for normal reruns; set False only when you explicitly want to fetch new data

print("\nTip: leave REBUILD_PRICES=REBUILD_META=REBUILD_SEC=False so the bootstrap loads from disk only.")

prices_wide.parquet           FOUND    (102.0 MB)
meta_yq.parquet               FOUND    (0.1 MB)
meta_yq_progress.parquet      FOUND    (0.0 MB)
sec_recent.parquet            FOUND    (0.1 MB)
sec_cache/                    FOUND    (3357 files)
sec_facts/                    FOUND    (3269 files)
yq_symbol_cache/              FOUND    (3180 files)
split_events.parquet          FOUND    (0.2 MB)
shares_events.parquet         FOUND    (1.7 MB)
yf_snapshot_cache/            FOUND    (2653 files)
yq_float_snapshot.parquet     FOUND    (0.0 MB)
analysis_enriched.parquet     FOUND    (80.9 MB)

Mode: prices: LOAD | meta: LOAD | sec: LOAD | yq_float: LOAD | cache_only: True

Tip: leave REBUILD_PRICES=REBUILD_META=REBUILD_SEC=False so the bootstrap loads from disk only.


In [17]:
# ---- run-safe defaults (no scraping) ----
REBUILD_PRICES = False
REBUILD_META   = False
REBUILD_SEC    = False

# yq float snapshot / cache behavior
REBUILD_YQ_FLOAT    = False      # don't rebuild snapshot
YQ_FLOAT_CACHE_ONLY = True       # use cache/snapshot only; no network
NO_NETWORK          = False      # leave False unless you want strict "airplane mode"

# yahooquery metadata pass
RUN_YQ_BACKFILL = False          # skip yahooquery scraping pass

In [18]:
# --- Global run toggles (fast defaults) ---
FAST_MODE        = True        # prefer cached artifacts, avoid recomputation
NO_NETWORK       = True        # hard no network: skip all scraping calls
REBUILD_META     = False
REBUILD_SEC      = False
REBUILD_PRICES   = False

# YahooQuery/YF controls
RUN_YQ_BACKFILL  = False       # metadata backfill off by default
YQ_FLOAT_CACHE_ONLY = True     # read cached float snapshots only

OUT = Path("outputs"); OUT.mkdir(exist_ok=True)

# Snapshots we’ll reuse to keep re-runs fast
P_META_YQ          = OUT / "meta_yq.parquet"
P_META_YQ_MORE     = OUT / "meta_yq_more.parquet"
P_YQ_SYM_DIR       = OUT / "yq_symbol_cache"
P_YQ_FLOAT_SNAP    = OUT / "yq_float_snapshot.parquet"
P_SEC_FACTS_DIR    = OUT / "sec_facts"
P_SEC_ENRICHED_PX  = OUT / "sec_enriched_px.parquet"  # SEC public-float→shares (as-of joined to price)

In [19]:
# === BOOTSTRAP / RESUME GUARD ===
from pathlib import Path
import pandas as pd, numpy as np, json

# Paths & flags
OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
P_CHUNKS   = OUT / "chunks"
P_WIDE_PQ  = OUT / "prices_wide.parquet"
P_WIDE_PKL = OUT / "prices_wide.pkl"
P_SEC_CACHE = OUT / "sec_cache"
P_META_YQ   = OUT / "meta_yq.parquet"
P_META_PROG = OUT / "meta_yq_progress.parquet"
P_YQ_SYM    = OUT / "yq_symbol_cache"

# --- Yahoo float snapshot (consolidated) ---
P_YQ_FLOAT_SNAP = OUT / "yq_float_snapshot.parquet"

# Controls for the Yahoo float backfill step
REBUILD_YQ_FLOAT   = False   # False = never re-scrape if a snapshot exists
YQ_FLOAT_CACHE_ONLY = True   # True = read per-ticker cache files only; no network

REBUILD_PRICES = False
REBUILD_META   = False
REBUILD_SEC    = False

# --- Prices: load-or-resume ---
def _load_prices_raw():
    if P_WIDE_PQ.exists():  return pd.read_parquet(P_WIDE_PQ)
    if P_WIDE_PKL.exists(): return pd.read_pickle(P_WIDE_PKL)
    parts = sorted(P_CHUNKS.glob("prices_wide_*.parquet"))
    if parts: return pd.concat([pd.read_parquet(p) for p in parts], axis=1).sort_index()
    return None

prices_raw = None if REBUILD_PRICES else _load_prices_raw()

# --- SEC: load recent + SIC from cache (prefer saved file if present) ---
def _load_sec_recent():
    sec_parq = OUT / "sec_recent.parquet"
    sec_csv  = OUT / "sec_recent.csv"
    if sec_parq.exists():
        return pd.read_parquet(sec_parq).assign(ticker=lambda d: d['ticker'].str.upper())
    if sec_csv.exists():
        return pd.read_csv(sec_csv).assign(ticker=lambda d: d['ticker'].str.upper())
    try:
        return sec_recent_df.assign(ticker=lambda d: d['ticker'].str.upper())
    except NameError:
        return pd.DataFrame(columns=['ticker','cik','recent_form','recent_filing_date'])

def _load_sic_from_cache(sec_df):
    rows = []
    for _, r in sec_df.dropna(subset=['cik']).iterrows():
        fp = P_SEC_CACHE / f"{r['cik']}.json"
        if not fp.exists(): 
            continue
        try:
            j = json.loads(fp.read_text())
            rows.append({
                'ticker': r['ticker'].upper(),
                'cik': r['cik'],
                'sic': j.get('sic'),
                'sic_desc': j.get('sicDescription')
            })
        except Exception:
            pass
    return pd.DataFrame(rows).drop_duplicates('ticker')

sec_recent_df = _load_sec_recent()
sic_df = _load_sic_from_cache(sec_recent_df)

def _sic_to_sector(s):
    try: s = int(s)
    except Exception: return None
    if 1 <= s <= 9:    return 'Agriculture'
    if 10 <= s <= 14:  return 'Mining'
    if 15 <= s <= 17:  return 'Construction'
    if 20 <= s <= 39:  return 'Manufacturing'
    if 40 <= s <= 49:  return 'Transportation & Utilities'
    if 50 <= s <= 51:  return 'Wholesale'
    if 52 <= s <= 59:  return 'Retail'
    if 60 <= s <= 67:  return 'Finance'
    if 70 <= s <= 89:  return 'Services'
    if 91 <= s <= 99:  return 'Public Administration'
    return None

if not sic_df.empty:
    sic_df['sector_sic'] = sic_df['sic'].apply(_sic_to_sector)

# --- Yahoo meta: load consolidated (parquet + progress + per-symbol cache) ---
def _load_meta_yq():
    frames = []
    if P_META_YQ.exists():   frames.append(pd.read_parquet(P_META_YQ))
    if P_META_PROG.exists(): frames.append(pd.read_parquet(P_META_PROG))
    if P_YQ_SYM.exists():
        rows = []
        for fp in P_YQ_SYM.glob("*.json"):
            try:
                j = json.loads(fp.read_text())
                rows.append({
                    'ticker': str(j.get('symbol','')).upper(),
                    'market_cap': j.get('market_cap'),
                    'sector': j.get('sector'),
                    'industry': j.get('industry'),
                })
            except Exception:
                pass
        if rows: frames.append(pd.DataFrame(rows))
    if not frames:
        return pd.DataFrame(columns=['ticker','market_cap','sector','industry'])
    out = (pd.concat(frames, ignore_index=True)
             .dropna(subset=['ticker'])
             .assign(ticker=lambda d: d['ticker'].str.upper())
             .drop_duplicates('ticker', keep='first'))
    return out

meta_yq = None if REBUILD_META else _load_meta_yq()
if meta_yq is None:
    meta_yq = pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

# --- Fundamentals (idempotent): prefer Yahoo, backfill with SEC SIC sector ---
fundamentals = (
    meta_yq
    .merge(sic_df[['ticker','sic','sic_desc','sector_sic']] if not sic_df.empty else pd.DataFrame(columns=['ticker','sic','sic_desc','sector_sic']),
           on='ticker', how='left')
    .merge(sec_recent_df[['ticker','cik','recent_form','recent_filing_date']] if not sec_recent_df.empty else pd.DataFrame(columns=['ticker','cik','recent_form','recent_filing_date']),
           on='ticker', how='left')
)
if 'sector' not in fundamentals.columns:
    fundamentals['sector'] = pd.NA
fundamentals['sector'] = fundamentals['sector'].where(fundamentals['sector'].notna(), fundamentals.get('sector_sic'))

_keep = ['ticker','market_cap','sector','industry','cik','sic','sic_desc','recent_form','recent_filing_date']
for c in _keep:
    if c not in fundamentals.columns: fundamentals[c] = pd.NA
fundamentals = fundamentals[_keep].drop_duplicates('ticker')

# --- Prices → tidy + returns (only if prices_raw already loaded) ---
def tidy_from_yf(wide: pd.DataFrame) -> pd.DataFrame:
    if wide is None or wide.empty:
        return pd.DataFrame(columns=['date','ticker','open','high','low','close','adj close','volume'])
    df = wide.copy(); df.index.name = 'date'
    tidy = (df.stack(level=1, future_stack=True)
              .rename_axis(index=['date','ticker'])
              .reset_index()
              .sort_values(['date','ticker'])
              .reset_index(drop=True))
    tidy.columns = [c.lower() if isinstance(c, str) else c for c in tidy.columns]
    return tidy

prices_tidy = tidy_from_yf(prices_raw)
if not prices_tidy.empty:
    adj = (prices_tidy[['date','ticker','adj close']].dropna()
             .sort_values(['ticker','date']))
    adj['ret']    = adj.groupby('ticker')['adj close'].pct_change()
    adj['logret'] = np.log(adj['adj close'] / adj.groupby('ticker')['adj close'].shift(1))
    returns = adj.dropna(subset=['ret','logret'])
    analysis_df = (prices_tidy[['date','ticker','open','high','low','close','adj close','volume']]
                   .merge(returns[['date','ticker','ret','logret']], on=['date','ticker'], how='left'))
else:
    analysis_df = pd.DataFrame(columns=['date','ticker','open','high','low','close','adj close','volume','ret','logret'])

# --- Final merge + save (safe to re-run) ---
analysis_enriched = (
    analysis_df
    .merge(fundamentals, on='ticker', how='left')
    .sort_values(['ticker','date'])
    .reset_index(drop=True)
)

analysis_enriched.to_parquet(OUT / "analysis_enriched.parquet")
analysis_enriched.to_csv(OUT / "analysis_enriched.csv", index=False)

print("prices_raw:", None if prices_raw is None else prices_raw.shape)
print("fundamentals tickers:", fundamentals['ticker'].nunique())
print("analysis_enriched rows:", len(analysis_enriched))

prices_raw: (751, 30726)
fundamentals tickers: 5118
analysis_enriched rows: 3845871


In [20]:
# Safe defaults for reruns
if 'YQ_FLOAT_CACHE_ONLY' not in globals(): YQ_FLOAT_CACHE_ONLY = False
if 'NO_NETWORK' not in globals():          NO_NETWORK = False

In [21]:
# Derive active_tickers from prices_raw (upper-cased), for later harvesters
if 'prices_raw' in globals() and prices_raw is not None:
    if isinstance(prices_raw.columns, pd.MultiIndex) and 'Adj Close' in prices_raw.columns.get_level_values(0):
        active_tickers = [t for t in prices_raw['Adj Close'].columns if prices_raw['Adj Close'][t].notna().any()]
    else:
        active_tickers = list(prices_raw.columns)
    active_tickers = [str(t).upper() for t in active_tickers]
else:
    active_tickers = []
print("active_tickers:", len(active_tickers))

active_tickers: 5118


## Added: Split flags + Time‑varying shares/free float
The next cells were inserted to compute:

- `performed_split` and `performed_reverse_split` (per-date flags)
- `shares_outstanding` (time‑varying, SEC)
- `float_shares` / `free_float` (time‑varying, SEC public float ÷ price)


In [22]:
# === S1 (final): Split events (cached) + per-date flags WITHOUT merge_asof ===
from pathlib import Path
import pandas as pd, numpy as np, json, time, re, logging
import yfinance as yf

# silence yfinance info prints
logging.getLogger("yfinance").setLevel(logging.ERROR)

OUT = Path("outputs")
SPLITS_DIR = OUT / "split_events"; SPLITS_DIR.mkdir(parents=True, exist_ok=True)
ALL_SPLITS_PQ = OUT / "split_events.parquet"
SPLIT_FLAGS_PQ = OUT / "split_flags.parquet"

# Warrants/rights/units often trigger 'period max' warnings & never split anyway
_SKIP_SUFFIX_RE = re.compile(r".*(?:W|WS|WT|W[A-D]?|U|R)$")  # tweak if you want to try them

def _split_df_from_actions(acts: pd.DataFrame) -> pd.DataFrame:
    """Extract a tidy split DF from yfinance get_actions() result."""
    if acts is None or acts.empty or ("Stock Splits" not in acts.columns):
        return pd.DataFrame(columns=['date','ratio'])
    s = acts["Stock Splits"].dropna()
    if s.empty:
        return pd.DataFrame(columns=['date','ratio'])
    idx = pd.to_datetime(s.index).tz_localize(None).normalize()
    return pd.DataFrame({"date": idx, "ratio": pd.to_numeric(s.values, errors="coerce")}).dropna(subset=["ratio"])

def fetch_split_events_one(ticker: str) -> pd.DataFrame:
    """
    Returns rows: date, ticker, ratio, performed_split, performed_reverse_split
    Caches to outputs/split_events/<TICKER>.parquet for fast reruns.
    """
    t = str(ticker).upper()
    fp = SPLITS_DIR / f"{t}.parquet"
    if fp.exists():
        try:
            return pd.read_parquet(fp)
        except Exception:
            pass

    # optional skip for non-common-stock suffixes
    if _SKIP_SUFFIX_RE.match(t):
        df = pd.DataFrame(columns=['date','ticker','ratio','performed_split','performed_reverse_split'])
        try: df.to_parquet(fp, index=False)
        except Exception: pass
        return df

    acts = None
    tk = yf.Ticker(t)
    try:
        acts = tk.get_actions(period="max")
    except Exception:
        try:
            acts = tk.get_actions(period="5d")  # allowed even for odd instruments
        except Exception:
            acts = None

    base = _split_df_from_actions(acts)
    if base.empty:
        df = pd.DataFrame(columns=['date','ticker','ratio','performed_split','performed_reverse_split'])
    else:
        df = base.copy()
        df['ticker'] = t
        df['performed_split'] = (df['ratio'] > 1).astype('int8')
        df['performed_reverse_split'] = ((df['ratio'] > 0) & (df['ratio'] < 1)).astype('int8')
        df = df[['date','ticker','ratio','performed_split','performed_reverse_split']]

    # cache
    try: df.to_parquet(fp, index=False)
    except Exception: pass
    time.sleep(0.08)  # be polite
    return df

def build_split_events(symbols) -> pd.DataFrame:
    parts = [fetch_split_events_one(t) for t in symbols]
    if not parts:
        return pd.DataFrame(columns=['date','ticker','ratio','performed_split','performed_reverse_split'])
    out = pd.concat(parts, ignore_index=True)
    return out.drop_duplicates(['ticker','date'])

# Load/build full split event table once
if ALL_SPLITS_PQ.exists():
    split_events = pd.read_parquet(ALL_SPLITS_PQ)
else:
    split_events = build_split_events(active_tickers)
    split_events.to_parquet(ALL_SPLITS_PQ, index=False)

# ---------- Map to first trading day ON/AFTER event date (no merge_asof) ----------
# Prepare trading calendar from your price table
trading = (
    analysis_df[['date','ticker']]
    .drop_duplicates()
    .assign(
        date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize(),
        ticker=lambda d: d['ticker'].astype(str),
    )
    .sort_values(['ticker','date'])
)

# Prepare events with normalized types
events = (
    split_events[['date','ticker','performed_split','performed_reverse_split']]
    .rename(columns={'date':'event_date'})
    .assign(
        event_date=lambda d: pd.to_datetime(d['event_date']).dt.tz_localize(None).dt.normalize(),
        ticker=lambda d: d['ticker'].astype(str),
    )
    .dropna(subset=['event_date'])
    .sort_values(['ticker','event_date'])
)

# fast per-ticker trading date arrays
dates_by_ticker = (
    trading.groupby('ticker')['date']
           .apply(lambda s: s.to_numpy())   # numpy datetime64[ns] array
           .to_dict()
)

def next_trading_date(tkr: str, dt) -> pd.Timestamp:
    arr = dates_by_ticker.get(tkr)
    if arr is None or arr.size == 0:
        return pd.NaT
    i = arr.searchsorted(dt)  # first trading date >= event_date
    return pd.NaT if i >= arr.size else arr[i]

# map each event to its effective trading day
events = events.copy()
events['effective_date'] = [
    next_trading_date(t, d) for t, d in zip(events['ticker'].to_numpy(), events['event_date'].to_numpy())
]
events_eff = events.dropna(subset=['effective_date'])

split_flags = (
    events_eff
    .rename(columns={'effective_date':'date'})
    [['ticker','date','performed_split','performed_reverse_split']]
    .drop_duplicates(['ticker','date'])
    .sort_values(['ticker','date'])
    .reset_index(drop=True)
)

try: split_flags.to_parquet(SPLIT_FLAGS_PQ, index=False)
except Exception: pass

print("split_flags rows:", len(split_flags), "| unique tickers:", split_flags['ticker'].nunique())

split_flags rows: 19301 | unique tickers: 2505


In [23]:
# === T1 (full, no-merge_asof): SEC companyfacts -> shares_outstanding & float_shares (time-varying) ===
from pathlib import Path
import pandas as pd, numpy as np, json, time

OUT = Path("outputs")
SEC_FACTS_DIR = OUT / "sec_facts"; SEC_FACTS_DIR.mkdir(parents=True, exist_ok=True)
ALL_SHARES_EVENTS_PQ = OUT / "shares_events.parquet"

# ---- Reuse existing SEC session/headers if available; else create them ----
try:
    SESSION, SEC_HEADERS
except NameError:
    import requests
    from requests.adapters import HTTPAdapter, Retry
    SEC_HEADERS = {"User-Agent": "Ozkan Gelincik <ozkangelincik@gmail.com>", "Accept": "application/json"}
    def make_session():
        sess = requests.Session()
        retries = Retry(total=5, connect=3, read=3, backoff_factor=0.5,
                        status_forcelist=[429,500,502,503,504], allowed_methods=["GET"])
        adapter = HTTPAdapter(max_retries=retries, pool_connections=30, pool_maxsize=30)
        sess.mount("https://", adapter)
        sess.headers.update(SEC_HEADERS)
        return sess
    SESSION = make_session()

# ---- Cache-first fetch for companyfacts ----
def fetch_company_facts(cik: str):
    cik = str(cik).zfill(10)
    fp = SEC_FACTS_DIR / f"{cik}.json"
    if fp.exists():
        try:
            return json.loads(fp.read_text())
        except Exception:
            pass
    url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json"
    try:
        r = SESSION.get(url, timeout=30)
        if r.status_code != 200:
            return None
        j = r.json()
        try: fp.write_text(json.dumps(j))
        except Exception: pass
        time.sleep(0.08)  # be polite
        return j
    except Exception:
        return None

def _collect_values(facts, tax, name, unit=None):
    try:
        units = facts['facts'][tax][name]['units']
    except Exception:
        return []
    out = []
    for u, arr in (units or {}).items():
        if unit and u != unit:
            continue
        for d in arr or []:
            end = d.get('end'); val = d.get('val')
            if end and val is not None:
                out.append({"date": pd.to_datetime(end).normalize(), "val": float(val), "unit": u, "form": d.get("form")})
    return out

def extract_shares_series(facts_json: dict) -> pd.DataFrame:
    keys = [
        ('us-gaap',  'CommonStockSharesOutstanding'),
        ('dei',      'EntityCommonStockSharesOutstanding'),
        ('ifrs-full','NumberOfSharesOutstanding'),
        ('ifrs-full','IssuedCapitalNumberOfShares'),
    ]
    rows = []
    for tax, name in keys:
        rows += _collect_values(facts_json, tax, name, unit='shares')
    if not rows:
        return pd.DataFrame(columns=['date','shares_outstanding'])
    df = (pd.DataFrame(rows).sort_values('date').drop_duplicates('date', keep='last')
            .rename(columns={'val':'shares_outstanding'})[['date','shares_outstanding']])
    return df

def extract_public_float_usd(facts_json: dict) -> pd.DataFrame:
    rows = _collect_values(facts_json, 'dei', 'EntityPublicFloat', unit='USD')
    if not rows:
        return pd.DataFrame(columns=['date','entity_public_float_usd'])
    df = (pd.DataFrame(rows).sort_values('date').drop_duplicates('date', keep='last')
            .rename(columns={'val':'entity_public_float_usd'})[['date','entity_public_float_usd']])
    return df

def build_shares_events_for_universe(fundamentals_df: pd.DataFrame) -> pd.DataFrame:
    parts = []
    for _, r in fundamentals_df.dropna(subset=['cik']).iterrows():
        cik = str(r['cik']).zfill(10); tkr = str(r['ticker']).upper()
        facts = fetch_company_facts(cik)
        if not isinstance(facts, dict):
            continue
        sh = extract_shares_series(facts)
        pf = extract_public_float_usd(facts)
        if sh.empty and pf.empty:
            continue
        ev = sh.merge(pf, on='date', how='outer').sort_values('date')
        ev['ticker'] = tkr
        parts.append(ev[['date','ticker','shares_outstanding','entity_public_float_usd']])
    if not parts:
        return pd.DataFrame(columns=['date','ticker','shares_outstanding','entity_public_float_usd'])
    return (pd.concat(parts, ignore_index=True)
              .sort_values(['ticker','date'])
              .drop_duplicates(['ticker','date'], keep='last'))

# ---- Load or build the SEC-derived events ----
if ALL_SHARES_EVENTS_PQ.exists():
    shares_events = pd.read_parquet(ALL_SHARES_EVENTS_PQ)
else:
    shares_events = build_shares_events_for_universe(fundamentals)
    shares_events.to_parquet(ALL_SHARES_EVENTS_PQ, index=False)

# ---- Normalize & de-dup both sides (dates tz-naive midnight; tickers as str) ----
shares_events = (
    shares_events
    .assign(
        date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize(),
        ticker=lambda d: d['ticker'].astype(str)
    )
    .dropna(subset=['date','ticker'])
    .drop_duplicates(['ticker','date'], keep='last')
    .sort_values(['ticker','date'])
)

price_ev = (
    analysis_df[['date','ticker','adj close']]
    .rename(columns={'adj close':'adj_close'})
    .assign(
        date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize(),
        ticker=lambda d: d['ticker'].astype(str)
    )
    .dropna(subset=['date','ticker','adj_close'])
    .drop_duplicates(['ticker','date'], keep='last')
    .sort_values(['ticker','date'])
)

# ---- Compute price on/as-of filing date per ticker (no merge_asof, no searchsorted) ----
# For each ticker: union the price index and filing dates, forward-fill, and then pick exact filing dates.
events_enriched_list = []
g_prices = dict(tuple(price_ev.groupby('ticker')))  # ticker -> df(date, adj_close)

for tkr, g in shares_events.groupby('ticker', sort=False):
    # filing dates for this ticker
    filing = g[['date']].drop_duplicates().set_index('date')
    # price series for this ticker (may be empty)
    p = g_prices.get(tkr)
    if p is None or p.empty:
        # no prices: join will yield NaN adj_close
        g2 = g.copy()
        g2['adj_close'] = np.nan
        events_enriched_list.append(g2)
        continue
    p = p.set_index('date')['adj_close'].sort_index()
    # union index and forward-fill
    union_idx = p.index.union(filing.index).sort_values()
    p_ffill = p.reindex(union_idx).ffill()
    # take values at filing dates
    adj_on_filing = p_ffill.reindex(filing.index)
    g2 = g.copy()
    g2['adj_close'] = adj_on_filing.values
    events_enriched_list.append(g2)

events_enriched = pd.concat(events_enriched_list, ignore_index=True) if events_enriched_list else shares_events.copy()
events_enriched['float_shares'] = np.where(
    events_enriched.get('entity_public_float_usd').notna() & events_enriched['adj_close'].notna(),
    events_enriched['entity_public_float_usd'] / events_enriched['adj_close'],
    np.nan
)

events_enriched = events_enriched[['date','ticker','shares_outstanding','float_shares']].sort_values(['ticker','date'])
print("events_enriched rows:", len(events_enriched), "| tickers:", events_enriched['ticker'].nunique())

events_enriched rows: 197010 | tickers: 3762


# All‑US Listings EDA with `yfinance`

This notebook pulls **NASDAQ + NYSE** tickers, cleans the list, downloads prices with **`yfinance`**, and gives you a clean starting point for EDA.

### What you'll get
- Robust ticker collection via `yahoo_fin` (with a fallback to NasdaqTrader FTP files)
- Exclusions for ETFs, warrants, preferreds, rights, and odd tickers
- Batched downloads with `yf.download()` for speed, retry/backoff logic
- A tidy daily returns dataset ready for analysis
- Optional market‑cap join via `yahoo_fin` (when available)

In [24]:
# === PREFLIGHT (what will be used on rerun) ===
from pathlib import Path
import pandas as pd

OUT = Path("outputs")
paths = {
    "prices_wide.parquet": OUT / "prices_wide.parquet",
    "meta_yq.parquet":     OUT / "meta_yq.parquet",
    "meta_yq_progress.parquet": OUT / "meta_yq_progress.parquet",
    "sec_recent.parquet":  OUT / "sec_recent.parquet",
    "sec_cache/":          OUT / "sec_cache",
    "yq_symbol_cache/":    OUT / "yq_symbol_cache",
}

def _size(p: Path):
    try:
        return f"{p.stat().st_size/1_048_576:.1f} MB"
    except Exception:
        return "-"

for name, p in paths.items():
    if p.suffix:  # file
        print(f"{name:<28} {'FOUND' if p.exists() else 'missing':<8} {('('+_size(p)+')') if p.exists() else ''}")
    else:         # directory
        exists = p.exists()
        n = sum(1 for _ in p.glob('*')) if exists else 0
        print(f"{name:<28} {'FOUND' if exists else 'missing':<8} {f'({n} files)' if exists else ''}")

print("\nTip: leave REBUILD_PRICES=REBUILD_META=REBUILD_SEC=False so the bootstrap loads from disk only.")

prices_wide.parquet          FOUND    (102.0 MB)
meta_yq.parquet              FOUND    (0.1 MB)
meta_yq_progress.parquet     FOUND    (0.0 MB)
sec_recent.parquet           FOUND    (0.1 MB)
sec_cache/                   FOUND    (3357 files)
yq_symbol_cache/             FOUND    (3180 files)

Tip: leave REBUILD_PRICES=REBUILD_META=REBUILD_SEC=False so the bootstrap loads from disk only.


In [25]:
import pandas as _pd
if 'prices_raw' not in globals() or prices_raw is None or (isinstance(prices_raw, _pd.DataFrame) and prices_raw.empty):
    # === BOOTSTRAP / RESUME GUARD ===
    from pathlib import Path
    import pandas as pd, numpy as np, json

    # Paths & flags
    OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
    P_CHUNKS   = OUT / "chunks"
    P_WIDE_PQ  = OUT / "prices_wide.parquet"
    P_WIDE_PKL = OUT / "prices_wide.pkl"
    P_SEC_CACHE = OUT / "sec_cache"
    P_META_YQ   = OUT / "meta_yq.parquet"
    P_META_PROG = OUT / "meta_yq_progress.parquet"
    P_YQ_SYM    = OUT / "yq_symbol_cache"

    REBUILD_PRICES = False
    REBUILD_META   = False
    REBUILD_SEC    = False

    # --- Prices: load-or-resume ---
    def _load_prices_raw():
        if P_WIDE_PQ.exists():  return pd.read_parquet(P_WIDE_PQ)
        if P_WIDE_PKL.exists(): return pd.read_pickle(P_WIDE_PKL)
        parts = sorted(P_CHUNKS.glob("prices_wide_*.parquet"))
        if parts: return pd.concat([pd.read_parquet(p) for p in parts], axis=1).sort_index()
        return None

    prices_raw = None if REBUILD_PRICES else _load_prices_raw()

    # --- SEC: load recent + SIC from cache (if available) ---
    def _load_sec_recent():
        # If you saved sec_recent_df previously, load it here; otherwise start empty.
        try:
            return sec_recent_df.assign(ticker=lambda d: d['ticker'].str.upper())
        except NameError:
            return pd.DataFrame(columns=['ticker','cik','recent_form','recent_filing_date'])

    def _load_sic_from_cache(sec_df):
        rows = []
        for _, r in sec_df.dropna(subset=['cik']).iterrows():
            fp = P_SEC_CACHE / f"{r['cik']}.json"
            if not fp.exists(): 
                continue
            try:
                j = json.loads(fp.read_text())
                rows.append({
                    'ticker': r['ticker'].upper(),
                    'cik': r['cik'],
                    'sic': j.get('sic'),
                    'sic_desc': j.get('sicDescription')
                })
            except Exception:
                pass
        return pd.DataFrame(rows).drop_duplicates('ticker')

    sec_recent_df = _load_sec_recent()
    sic_df = _load_sic_from_cache(sec_recent_df)

    def _sic_to_sector(s):
        try: s = int(s)
        except Exception: return None
        if 1 <= s <= 9:    return 'Agriculture'
        if 10 <= s <= 14:  return 'Mining'
        if 15 <= s <= 17:  return 'Construction'
        if 20 <= s <= 39:  return 'Manufacturing'
        if 40 <= s <= 49:  return 'Transportation & Utilities'
        if 50 <= s <= 51:  return 'Wholesale'
        if 52 <= s <= 59:  return 'Retail'
        if 60 <= s <= 67:  return 'Finance'
        if 70 <= s <= 89:  return 'Services'
        if 91 <= s <= 99:  return 'Public Administration'
        return None

    if not sic_df.empty:
        sic_df['sector_sic'] = sic_df['sic'].apply(_sic_to_sector)

    # --- Yahoo meta: load consolidated (parquet + progress + per-symbol cache) ---
    def _load_meta_yq():
        frames = []
        if P_META_YQ.exists():   frames.append(pd.read_parquet(P_META_YQ))
        if P_META_PROG.exists(): frames.append(pd.read_parquet(P_META_PROG))
        if P_YQ_SYM.exists():
            rows = []
            for fp in P_YQ_SYM.glob("*.json"):
                try:
                    j = json.loads(fp.read_text())
                    rows.append({
                        'ticker': str(j.get('symbol','')).upper(),
                        'market_cap': j.get('market_cap'),
                        'sector': j.get('sector'),
                        'industry': j.get('industry'),
                    })
                except Exception:
                    pass
            if rows: frames.append(pd.DataFrame(rows))
        if not frames:
            return pd.DataFrame(columns=['ticker','market_cap','sector','industry'])
        out = (pd.concat(frames, ignore_index=True)
                 .dropna(subset=['ticker'])
                 .assign(ticker=lambda d: d['ticker'].str.upper())
                 .drop_duplicates('ticker', keep='first'))
        return out

    meta_yq = None if REBUILD_META else _load_meta_yq()
    if meta_yq is None:
        meta_yq = pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

    # --- Fundamentals (idempotent): prefer Yahoo, backfill with SEC SIC sector ---
    fundamentals = (
        meta_yq
        .merge(sic_df[['ticker','sic','sic_desc','sector_sic']] if not sic_df.empty else pd.DataFrame(columns=['ticker','sic','sic_desc','sector_sic']),
               on='ticker', how='left')
        .merge(sec_recent_df[['ticker','cik','recent_form','recent_filing_date']] if not sec_recent_df.empty else pd.DataFrame(columns=['ticker','cik','recent_form','recent_filing_date']),
               on='ticker', how='left')
    )
    if 'sector' not in fundamentals.columns:
        fundamentals['sector'] = pd.NA
    fundamentals['sector'] = fundamentals['sector'].where(fundamentals['sector'].notna(), fundamentals.get('sector_sic'))

    _keep = ['ticker','market_cap','sector','industry','cik','sic','sic_desc','recent_form','recent_filing_date']
    for c in _keep:
        if c not in fundamentals.columns: fundamentals[c] = pd.NA
    fundamentals = fundamentals[_keep].drop_duplicates('ticker')

    # --- Prices → tidy + returns (only if prices_raw already loaded) ---
    def tidy_from_yf(wide: pd.DataFrame) -> pd.DataFrame:
        if wide is None or wide.empty:
            return pd.DataFrame(columns=['date','ticker','open','high','low','close','adj close','volume'])
        df = wide.copy(); df.index.name = 'date'
        tidy = (df.stack(level=1, future_stack=True)
                  .rename_axis(index=['date','ticker'])
                  .reset_index()
                  .sort_values(['date','ticker'])
                  .reset_index(drop=True))
        tidy.columns = [c.lower() if isinstance(c, str) else c for c in tidy.columns]
        return tidy

    prices_tidy = tidy_from_yf(prices_raw)
    if not prices_tidy.empty:
        adj = (prices_tidy[['date','ticker','adj close']].dropna()
                 .sort_values(['ticker','date']))
        adj['ret']    = adj.groupby('ticker')['adj close'].pct_change()
        adj['logret'] = np.log(adj['adj close'] / adj.groupby('ticker')['adj close'].shift(1))
        returns = adj.dropna(subset=['ret','logret'])
        analysis_df = (prices_tidy[['date','ticker','open','high','low','close','adj close','volume']]
                       .merge(returns[['date','ticker','ret','logret']], on=['date','ticker'], how='left'))
    else:
        analysis_df = pd.DataFrame(columns=['date','ticker','open','high','low','close','adj close','volume','ret','logret'])

    # --- Final merge + save (safe to re-run) ---
    analysis_enriched = (
        analysis_df
        .merge(fundamentals, on='ticker', how='left')
        .sort_values(['ticker','date'])
        .reset_index(drop=True)
    )

    analysis_enriched.to_parquet(OUT / "analysis_enriched.parquet")
    analysis_enriched.to_csv(OUT / "analysis_enriched.csv", index=False)

    print("prices_raw:", None if prices_raw is None else prices_raw.shape)
    print("fundamentals tickers:", fundamentals['ticker'].nunique())
    print("analysis_enriched rows:", len(analysis_enriched))
else:
    print('Skipping: prices_raw already loaded from cache (Bootstrap).')


Skipping: prices_raw already loaded from cache (Bootstrap).


In [26]:
# !pip3 install yfinance yahoo_fin pandas numpy pyarrow tqdm requests

In [27]:
# !pip3 install requests_html

In [28]:
# %%capture
# If needed, install dependencies in your environment (uncomment if running locally):
# !pip install yfinance yahoo_fin pandas numpy pyarrow tqdm requests

import re
import time
from typing import List
import numpy as np
import pandas as pd
from tqdm import tqdm # type: ignore

import yfinance as yf # type: ignore
try:
    from yahoo_fin import stock_info as si # type: ignore
    YAHOO_FIN_OK = True
except Exception:
    YAHOO_FIN_OK = False
import requests

pd.set_option('display.max_rows', 10)
pd.set_option('display.width', 120)

             requires requests_html, which is not installed.
             
             Install using: 
             pip install requests_html
             
             After installation, you may have to restart your Python session.


## 1) Get NASDAQ & NYSE tickers

Primary method uses `yahoo_fin`. If unavailable, we fall back to NasdaqTrader FTP symbol directories.

In [29]:
def get_exchange_tickers() -> pd.DataFrame:
    """Return a DataFrame with columns [ticker, exchange] for NASDAQ and NYSE.
    Uses yahoo_fin first; falls back to NasdaqTrader FTP text files.
    """
    tickers = []
    if YAHOO_FIN_OK:
        try:
            nasdaq = si.tickers_nasdaq()
            tickers.extend([(t, 'NASDAQ') for t in nasdaq])
        except Exception:
            pass
        try:
            nyse = si.tickers_nyse()
            tickers.extend([(t, 'NYSE') for t in nyse])
        except Exception:
            pass
    if not tickers:
        # Fallback: NasdaqTrader
        base = "https://ftp.nasdaqtrader.com/SymbolDirectory/"
        urls = {
            'NASDAQ': base + 'nasdaqlisted.txt',
            'OTHER': base + 'otherlisted.txt'
        }
        for ex, url in urls.items():
            r = requests.get(url, timeout=30)
            r.raise_for_status()
            df = pd.read_csv(pd.compat.StringIO(r.text), sep='|')
            if ex == 'NASDAQ':
                syms = df['Symbol'].tolist()
                tickers.extend([(t, 'NASDAQ') for t in syms])
            else:
                syms = df['ACT Symbol'].tolist()
                # OTHER includes NYSE + NYSE MKT + BATS etc., we’ll label as NYSE-ish
                tickers.extend([(t, 'NYSE') for t in syms])
    out = pd.DataFrame(tickers, columns=['ticker', 'exchange']).drop_duplicates('ticker')
    return out

tickers_df = get_exchange_tickers()
print(tickers_df.head())
print(f"Total raw tickers: {len(tickers_df)}")

  ticker exchange
0   AACB   NASDAQ
1  AACBR   NASDAQ
2  AACBU   NASDAQ
3   AACG   NASDAQ
4   AACI   NASDAQ
Total raw tickers: 5136


## 2) Clean the tickers

Remove ETFs, warrants, preferreds, rights, units, and other non‑common equities. Feel free to tweak these rules.

In [30]:
EXCLUDE_PATTERNS = [
    r"-\w+$",         # suffixes like -W, -U, -R, -P etc.
    r"\^",            # symbols with carets
    r"\.\w+$",       # BRK.B style dots (optional, keep if you want)
]

ETF_HINTS = [
    ' ETF', ' Trust', ' Fund', ' ETN'
]

def looks_like_etf(name: str) -> bool:
    if not isinstance(name, str):
        return False
    up = name.upper()
    return any(h in up for h in ETF_HINTS)

def clean_tickers(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    # If yahoo_fin exists, we can try fetching company names; otherwise, keep as is
    # Here we only filter by symbol patterns to keep it fast and general.
    pat = re.compile("|".join(EXCLUDE_PATTERNS), re.IGNORECASE)
    df['exclude'] = df['ticker'].astype(str).str.contains(pat)
    cleaned = df[~df['exclude']].drop(columns=['exclude'])
    cleaned = cleaned[~cleaned['ticker'].str.contains(r"[^A-Z\.]", regex=True, na=False)]
    cleaned = cleaned[cleaned['ticker'].str.len() <= 5]  # optional: US equities typically <=5 chars
    cleaned = cleaned.drop_duplicates('ticker').reset_index(drop=True)
    print(f"After cleaning: {len(cleaned)} tickers remaining")
    return cleaned

tickers_clean = clean_tickers(tickers_df)
tickers_clean.head()

After cleaning: 5135 tickers remaining


Unnamed: 0,ticker,exchange
0,AACB,NASDAQ
1,AACBR,NASDAQ
2,AACBU,NASDAQ
3,AACG,NASDAQ
4,AACI,NASDAQ


## 3) Batch download price history

- We use `yf.download()` with *lists* of tickers for speed
- Chunking avoids URL length limits and rate‑limit errors
- Simple retry/backoff to be resilient

In [31]:
import pandas as _pd
if 'prices_raw' not in globals() or prices_raw is None or (isinstance(prices_raw, _pd.DataFrame) and prices_raw.empty):
    def chunk(lst: List[str], n: int):
        for i in range(0, len(lst), n):
            yield lst[i:i+n]

    def download_prices(tickers: List[str], period='5y', interval='1d', chunk_size=100, max_retries=3, pause=1.0):
        frames = []
        for ch in tqdm(list(chunk(tickers, chunk_size))):
            for attempt in range(max_retries):
                try:
                    df = yf.download(ch, period=period, interval=interval, auto_adjust=False, threads=True)
                    frames.append(df)
                    break
                except Exception as e:
                    if attempt == max_retries - 1:
                        print(f"Failed on chunk starting {ch[0]}: {e}")
                    else:
                        time.sleep(pause * (attempt + 1))
        if not frames:
            return pd.DataFrame()
        out = pd.concat(frames, axis=1)
        # Ensure expected multi-index columns (PriceField, Ticker)
        if not isinstance(out.columns, pd.MultiIndex):
            # Single ticker case will produce flat columns; promote to MultiIndex
            out.columns = pd.MultiIndex.from_product([out.columns, [tickers[0]]])
        return out.sort_index()

    # Use a manageable subset for first pass (e.g., first 300 tickers)
    subset = tickers_clean['ticker'].tolist()[:300]
    prices_raw = download_prices(subset, period='3y', interval='1d', chunk_size=100)
    prices_raw.tail()
else:
    print('Skipping: prices_raw already loaded from cache (Bootstrap).')


Skipping: prices_raw already loaded from cache (Bootstrap).


In [32]:
import pandas as _pd
if 'prices_raw' not in globals() or prices_raw is None or (isinstance(prices_raw, _pd.DataFrame) and prices_raw.empty):
    tickers_all = tickers_clean['ticker'].tolist()

    prices_raw = download_prices(
        tickers_all,
        period='3y',          # or '5y' / 'max'
        interval='1d',
        chunk_size=100,       # 50–150 is a good range
        max_retries=3,
        pause=1.5             # increase if you see throttling
    )
else:
    print('Skipping: prices_raw already loaded from cache (Bootstrap).')


Skipping: prices_raw already loaded from cache (Bootstrap).


## 4) Tidy format & returns

We pivot the wide OHLCV MultiIndex to a tidy DataFrame: one row per date‑ticker, with columns for price fields.

In [33]:
def tidy_from_yf(wide: pd.DataFrame) -> pd.DataFrame:
    """Return a tidy frame with columns: date, ticker, [Adj Close, Close, High, Low, Open, Volume]."""
    if wide.empty:
        return pd.DataFrame(columns=['date','ticker','adj close','close','high','low','open','volume'])

    df = wide.copy()

    # Make sure the row index has a stable name we control
    df.index.name = 'date'

    # Stack the ticker level; keep price fields as columns
    tidy = (
        df
        .stack(level=1, future_stack=True)
        .rename_axis(index=['date', 'ticker'])   # <-- avoids the fragile 'level_0' rename
        .reset_index()
        .sort_values(['date', 'ticker'])
        .reset_index(drop=True)
    )

    # normalize column names to lower case
    tidy.columns = [c.lower() if isinstance(c, str) else c for c in tidy.columns]
    return tidy

In [34]:
print("index name:", prices_raw.index.name)            # should be None or 'Date'
print("column names:", prices_raw.columns.names)       # expected ['Attributes','Symbols'] or similar

index name: Date
column names: ['Price', 'Ticker']


In [35]:
# Build tidy from the wide MultiIndex frame
prices_tidy = tidy_from_yf(prices_raw)
display(prices_tidy.head())

# (Optional) ultra-tidy “long” version with one value column
prices_long = prices_tidy.melt(
    id_vars=['date', 'ticker'],
    var_name='field',
    value_name='value'
)
display(prices_long.head())

prices_long = (
    prices_tidy.melt(id_vars=['date','ticker'], var_name='field', value_name='value')
)

Unnamed: 0,date,ticker,adj close,close,high,low,open,volume
0,2022-10-03,AACB,,,,,,
1,2022-10-03,AACBR,,,,,,
2,2022-10-03,AACBU,,,,,,
3,2022-10-03,AACG,1.75,1.75,1.75,1.7,1.7,1600.0
4,2022-10-03,AACI,,,,,,


Unnamed: 0,date,ticker,field,value
0,2022-10-03,AACB,adj close,
1,2022-10-03,AACBR,adj close,
2,2022-10-03,AACBU,adj close,
3,2022-10-03,AACG,adj close,1.75
4,2022-10-03,AACI,adj close,


In [36]:
# Daily returns using adj close from the tidy table
adj = (
    prices_tidy[['date', 'ticker', 'adj close']]
    .dropna()
    .sort_values(['ticker', 'date'])
)

adj['ret']    = adj.groupby('ticker')['adj close'].pct_change()
adj['logret'] = np.log(adj['adj close'] / adj.groupby('ticker')['adj close'].shift(1))

returns = adj.dropna(subset=['ret', 'logret'])
display(returns.head())

Unnamed: 0,date,ticker,adj close,ret,logret
3226230,2025-04-08,AACB,9.95,0.007085,0.00706
3231351,2025-04-09,AACB,9.9,-0.005025,-0.005038
3236472,2025-04-10,AACB,9.89,-0.00101,-0.001011
3241593,2025-04-11,AACB,9.885,-0.000506,-0.000506
3246714,2025-04-14,AACB,9.9,0.001517,0.001516


In [37]:
print("index name:", prices_raw.index.name)          # should be 'Date' (now handled)
print("column names:", prices_raw.columns.names)     # ['Price','Ticker'] is perfect
print(prices_tidy.shape, prices_long.shape, returns.shape)

index name: Date
column names: ['Price', 'Ticker']
(3845871, 8) (23075226, 4) (2875035, 5)


### Daily returns (Adj Close)
Compute simple and log returns, dropping thin/empty tickers.

In [38]:
def compute_returns(wide: pd.DataFrame) -> pd.DataFrame:
    if wide.empty:
        return pd.DataFrame()
    adj = wide['Adj Close'].copy()
    # Drop columns with too many NaNs
    keep = adj.columns[adj.notna().sum() > 50]
    adj = adj[keep]
    rets = adj.pct_change().rename_axis(index='date')
    logrets = np.log(adj/adj.shift(1))
    out = rets.stack().reset_index().rename(columns={'Adj Close':'ret','level_1':'ticker'})
    out['logret'] = logrets.stack().reset_index(drop=True)[0]
    return out

returns = compute_returns(prices_raw)
returns.head()

  rets = adj.pct_change().rename_axis(index='date')


Unnamed: 0,date,Ticker,0,logret
0,2022-10-04,AACG,0.04,0.039221
1,2022-10-04,AADR,0.02572,0.039221
2,2022-10-04,AAL,0.086409,0.039221
3,2022-10-04,AAME,0.0,0.039221
4,2022-10-04,AAOI,0.069091,0.039221


## 5) Coverage report & quick EDA

A few quick summaries to sanity‑check the dataset.

In [39]:
import pandas as pd

# figure out fields & tickers present in prices_raw
if isinstance(prices_raw.columns, pd.MultiIndex):
    lvl0 = prices_raw.columns.get_level_values(0)
    lvl1 = prices_raw.columns.get_level_values(1)
    n_fields = int(pd.unique(lvl0).size)
    n_tickers_downloaded = int(pd.unique(lvl1).size)
else:
    n_fields = 1
    n_tickers_downloaded = int(prices_raw.shape[1])

# how many tickers did we intend to cover?
n_requested = (
    len(active_tickers) if 'active_tickers' in globals() and active_tickers
    else (len(tickers_clean) if 'tickers_clean' in globals() else n_tickers_downloaded)
)

summary = pd.DataFrame([{
    'n_tickers_requested': int(n_requested),
    'n_rows_prices_wide': int(prices_raw.shape[0]),
    'n_fields': n_fields,
    'n_tickers_downloaded': n_tickers_downloaded,
    'coverage_pct': round(100 * n_tickers_downloaded / max(1, n_requested), 2),
}])
summary

Unnamed: 0,n_tickers_requested,n_rows_prices_wide,n_fields,n_tickers_downloaded,coverage_pct
0,5118,751,6,5121,100.06


In [40]:
fund_cov = pd.DataFrame([{
    'tickers_with_fundamentals': int(fundamentals['ticker'].nunique()) if 'fundamentals' in globals() else 0,
    'pct_meta_vs_prices': (
        (fundamentals['ticker'].nunique() / n_tickers_downloaded)
        if 'fundamentals' in globals() and n_tickers_downloaded else None
    ),
}])
fund_cov

Unnamed: 0,tickers_with_fundamentals,pct_meta_vs_prices
0,5118,0.999414


### Top/bottom average returns (last 1 year)
This is a quick sanity check, not investment advice.

In [41]:
# --- make sure returns has the columns we expect ---
def ensure_returns_schema(df):
    out = df.copy()
    # bring index levels out if needed
    if 'ticker' not in out.columns and (out.index.names and 'ticker' in [str(n).lower() for n in out.index.names]):
        out = out.reset_index()
    # normalize/rename common variants
    ren = {}
    for c in out.columns:
        cl = str(c).lower()
        if cl in ('symbol','symbols','level_1'): ren[c] = 'ticker'
        if cl in ('date','datetime'):            ren[c] = 'date'
    if ren:
        out = out.rename(columns=ren)
    out.columns = [str(c).lower() for c in out.columns]
    out['date'] = pd.to_datetime(out['date'])
    # if logret missing, compute from adj close
    if 'logret' not in out.columns and {'adj close','ticker','date'}.issubset(out.columns):
        out = out.sort_values(['ticker','date'])
        out['logret'] = np.log(out.groupby('ticker')['adj close'].apply(lambda s: s / s.shift(1))).reset_index(level=0, drop=True)
    return out

returns_ = ensure_returns_schema(returns)

# --- 1-year perf (top/bottom) ---
cutoff = returns_['date'].max() - pd.Timedelta(days=365)
lastyr = returns_[returns_['date'] >= cutoff]

perf = (lastyr
        .groupby('ticker', as_index=False)['logret']
        .sum()
        .sort_values('logret'))

top10    = perf.tail(10).set_index('ticker').rename(columns={'logret':'logret_1y'})
bottom10 = perf.head(10).set_index('ticker').rename(columns={'logret':'logret_1y'})

display(top10)
display(bottom10)

Unnamed: 0_level_0,logret_1y
ticker,Unnamed: 1_level_1
FWONA,9.844406
FWONK,9.844406
FWRD,9.844406
FWRG,9.844406
FXNC,9.844406
FYBR,9.844406
FYC,9.844406
FYT,9.844406
FUND,9.844406
ZYXI,9.844406


Unnamed: 0_level_0,logret_1y
ticker,Unnamed: 1_level_1
WTG,1.961037
KCHV,1.961037
MJID,2.000258
LAWR,2.039479
KMRK,2.078699
SOCAU,2.078699
AUGO,2.078699
MGRT,2.078699
PDDL,2.11792
OTGL,2.11792


In [42]:
print(returns.columns)       # columns present
print(returns.index.names)   # index levels (is 'ticker' hiding here?)

Index(['date', 'Ticker', 0, 'logret'], dtype='object')
[None]


## 6) (Optional) Add market caps
You can pull market caps for many tickers via `yahoo_fin.get_quote_table` or `si.get_market_cap`. This is slower and may miss some tickers.

In [43]:
def get_market_caps(tickers: List[str]) -> pd.DataFrame:
    if not YAHOO_FIN_OK:
        print("yahoo_fin not available; skipping market caps.")
        return pd.DataFrame(columns=['ticker','market_cap'])
    caps = []
    for t in tqdm(tickers):
        try:
            cap = si.get_market_cap(t)
            caps.append((t, cap))
        except Exception:
            caps.append((t, np.nan))
    return pd.DataFrame(caps, columns=['ticker','market_cap'])

# Example (small subset to keep it quick):
# caps = get_market_caps(subset[:50])
# caps.head()

## 7) Save outputs
Persist your data for future analysis.

In [44]:
# utils for saving
import os
from pathlib import Path

OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# 1) tickers
tickers_clean.to_csv(OUTPUT_DIR / "tickers_clean.csv", index=False)

# 2) prices (wide)
if "prices_raw" in globals() and not prices_raw.empty:
    # always succeeds
    prices_raw.to_pickle(OUTPUT_DIR / "prices_wide.pkl")
    # parquet if available
    try:
        prices_raw.to_parquet(OUTPUT_DIR / "prices_wide.parquet")
    except Exception as e:
        print("Skipped prices_wide.parquet:", e)

# 3) prices (tidy)
if "prices_tidy" in globals() and not prices_tidy.empty:
    prices_tidy.to_pickle(OUTPUT_DIR / "prices_tidy.pkl")
    try:
        prices_tidy.to_parquet(OUTPUT_DIR / "prices_tidy.parquet")
    except Exception as e:
        print("Skipped prices_tidy.parquet:", e)

# 4) returns
if "returns" in globals() and not returns.empty:
    returns.to_pickle(OUTPUT_DIR / "returns.pkl")
    try:
        returns.to_parquet(OUTPUT_DIR / "returns.parquet")
    except Exception as e:
        print("Skipped returns.parquet:", e)

print("Saved available outputs in", OUTPUT_DIR.resolve())

Saved available outputs in /Users/ozkangelincik/Downloads/EDA yfinance in Python/outputs


  table = self.api.Table.from_pandas(df, **from_pandas_kwargs)


## Next steps
- Join market caps/industries and slice performance by size/sector
- Compute rolling volatility, drawdowns, and momentum signals
- Detect outliers (suspicious zero volumes, stale prices)
- Visualize top sectors and factor tilts over time

In [45]:
prices_tidy 

Unnamed: 0,date,ticker,adj close,close,high,low,open,volume
0,2022-10-03,AACB,,,,,,
1,2022-10-03,AACBR,,,,,,
2,2022-10-03,AACBU,,,,,,
3,2022-10-03,AACG,1.750,1.750,1.750000,1.700,1.700000,1600.0
4,2022-10-03,AACI,,,,,,
...,...,...,...,...,...,...,...,...
3845866,2025-09-30,ZXZZT,12.175,12.175,12.175000,12.110,12.110000,5222.0
3845867,2025-09-30,ZYBT,2.620,2.620,4.380000,2.133,2.319000,11202500.0
3845868,2025-09-30,ZYME,17.080,17.080,17.389999,16.875,17.190001,653100.0
3845869,2025-09-30,ZYN,19.517,19.517,19.719000,19.517,19.719000,200.0


In [46]:
import time, requests, pandas as pd, numpy as np
import yfinance as yf

try:
    from yahoo_fin import stock_info as si
    YAHOO_FIN_OK = True
except Exception:
    YAHOO_FIN_OK = False

# <-- Put your email here; the SEC requires a descriptive User-Agent with contact info
SEC_HEADERS = {"User-Agent": "Ozkan Gelincik ozkangelincik@example.com"}

In [47]:
def fetch_yahoo_meta(ticker: str):
    mcap = sector = industry = None

    # 1) yahoo_fin (fast & simple when available)
    if YAHOO_FIN_OK:
        try:
            qd = si.get_quote_data(ticker)             # dict
            mcap = qd.get("marketCap", mcap)
        except Exception:
            pass
        try:
            info_df = si.get_company_info(ticker)      # DataFrame with index 'sector','industry'
            if isinstance(info_df, pd.DataFrame) and "Value" in info_df.columns:
                if "sector" in info_df.index:   sector   = info_df.loc["sector", "Value"]
                if "industry" in info_df.index: industry = info_df.loc["industry", "Value"]
        except Exception:
            pass

    # 2) yfinance fallback
    try:
        t = yf.Ticker(ticker)
        if mcap is None:
            fi = getattr(t, "fast_info", None)
            if fi is not None:
                mcap = getattr(fi, "market_cap", mcap)
        if sector is None or industry is None:
            # .info can be slower but fills sector/industry for many names
            info = t.info
            sector   = sector   or info.get("sector")
            industry = industry or info.get("industry")
    except Exception:
        pass

    return {"ticker": ticker, "market_cap": mcap, "sector": sector, "industry": industry}

def get_yahoo_meta_batch(tickers, pause=0.05):
    rows = []
    for t in tickers:
        rows.append(fetch_yahoo_meta(t))
        time.sleep(pause)     # be polite; avoid throttling
    return pd.DataFrame(rows)

In [48]:
# !pip3 install yahooquery

In [49]:
import pandas as _pd
if 'prices_raw' not in globals() or prices_raw is None or (isinstance(prices_raw, _pd.DataFrame) and prices_raw.empty):
    from pathlib import Path
    import pandas as pd

    OUTPUT_DIR = Path("outputs")

    prices_raw = None

    # Try single-file saves first
    for p in [OUTPUT_DIR/"prices_wide.parquet", OUTPUT_DIR/"prices_wide.pkl"]:
        if p.exists():
            prices_raw = (pd.read_parquet(p) if p.suffix==".parquet" else pd.read_pickle(p))
            break

    # If not found, try reassembling from chunked downloads
    if prices_raw is None:
        chunk_dir = OUTPUT_DIR / "chunks"
        parts = sorted(chunk_dir.glob("prices_wide_*.parquet"))
        if parts:
            prices_raw = pd.concat([pd.read_parquet(p) for p in parts], axis=1).sort_index()

    if prices_raw is None:
        raise RuntimeError("Couldn't find saved prices. If you didn't save before, re-run the chunked downloader.")
    
    # sanity checks
    print("prices_raw shape:", getattr(prices_raw, "shape", None))
    print("column names:", getattr(prices_raw, "columns", None).names if hasattr(prices_raw, "columns") else None)
else:
    print('Skipping: prices_raw already loaded from cache (Bootstrap).')


Skipping: prices_raw already loaded from cache (Bootstrap).


In [50]:
print("len(active_tickers) =", len(active_tickers))
print("sample:", active_tickers[:5])
from yahooquery import Ticker
tq = Ticker(active_tickers[:3], asynchronous=False)
print("types:", type(tq.price), type(tq.summary_profile))

len(active_tickers) = 5118
sample: ['AACB', 'AACBR', 'AACBU', 'AACG', 'AACI']
types: <class 'dict'> <class 'dict'>


In [51]:
import time, json
from pathlib import Path
import pandas as pd
from yahooquery import Ticker
from tqdm import tqdm

SYMS_UP = [s.upper() for s in active_tickers]
CACHE = Path("outputs/yq_cache"); CACHE.mkdir(parents=True, exist_ok=True)

def _mods_from_entry(entry):
    # normalize yahooquery entry → modules dict
    if isinstance(entry, dict):
        qs = entry.get('quoteSummary')
        if isinstance(qs, dict):
            res = qs.get('result')
            if isinstance(res, list) and res and isinstance(res[0], dict):
                return res[0]
        return entry
    return {}

def _fetch_batch(symbols, asynchronous=True, max_workers=8, formatted=False):
    tq = Ticker(symbols, asynchronous=asynchronous, max_workers=max_workers, formatted=formatted)
    data = tq.get_modules('price,assetProfile,summaryProfile')  # dict keyed by symbol; may contain 'error'
    if not isinstance(data, dict):
        return {}
    return data

def _rows_from_data(data):
    rows = []
    for sym, entry in (data or {}).items():
        if not sym or str(sym).lower() == 'error':   # drop junk
            continue
        mods = _mods_from_entry(entry)
        if not isinstance(mods, dict) or not mods:
            continue
        price   = mods.get('price') or {}
        profile = mods.get('assetProfile') or mods.get('summaryProfile') or {}
        ticker  = (price.get('symbol') if isinstance(price, dict) else None) or str(sym)
        rows.append({
            'ticker'    : str(ticker).upper(),
            'market_cap': price.get('marketCap'),
            'sector'    : profile.get('sector'),
            'industry'  : profile.get('industry'),
        })
    return rows

def fetch_meta_multi_pass(symbols, passes=None):
    """Try larger → smaller chunks with growing pause; cache raw responses per pass/chunk; resume on rerun."""
    if passes is None:
        passes = [
            dict(chunk_size=200, pause=0.6, retries=1, async_=True,  workers=8),
            dict(chunk_size=80,  pause=0.8, retries=2, async_=True,  workers=6),
            dict(chunk_size=20,  pause=1.0, retries=2, async_=False, workers=4),  # conservative final pass
        ]

    remaining = [s for s in symbols]
    all_rows  = []

    for pi, p in enumerate(passes, start=1):
        if not remaining:
            break
        chunk_size = p['chunk_size']; pause = p['pause']; retries = p['retries']
        async_ = p['async_']; workers = p['workers']

        print(f"\nPass {pi}: chunks={chunk_size}, pause={pause}s, retries={retries}, async={async_}, workers={workers}")
        got_this_pass = set()

        for i in tqdm(range(0, len(remaining), chunk_size), desc=f"pass {pi}"):
            batch = remaining[i:i+chunk_size]
            key = f"p{pi}_i{i:05d}_{len(batch)}.json"
            fp  = CACHE / key

            if fp.exists():
                data = json.loads(fp.read_text())
            else:
                last_e = None; data = {}
                for a in range(retries + 1):
                    try:
                        data = _fetch_batch(batch, asynchronous=async_, max_workers=workers, formatted=False)
                        break
                    except Exception as e:
                        last_e = e; time.sleep(pause*(a+1))
                if not isinstance(data, dict):
                    data = {}
                # cache raw
                try:
                    fp.write_text(json.dumps(data))
                except Exception:
                    pass

                time.sleep(pause)  # be polite between batches

            rows = _rows_from_data(data)
            all_rows.extend(rows)
            got_this_pass.update(r['ticker'] for r in rows if r.get('ticker'))

        # prepare next pass on the ones still missing
        got = got_this_pass
        remaining = [s for s in remaining if s.upper() not in got]
        print(f"  collected this pass: {len(got_this_pass)} | remaining: {len(remaining)}")

    df = pd.DataFrame(all_rows)
    if not df.empty:
        df = (df[df['ticker'].isin([s.upper() for s in symbols])]
                .drop_duplicates('ticker', keep='first'))
    return df

meta_yq = fetch_meta_multi_pass(SYMS_UP)

print("meta_yq rows:", len(meta_yq), "| unique tickers:", meta_yq['ticker'].nunique())
display(meta_yq.head())


Pass 1: chunks=200, pause=0.6s, retries=1, async=True, workers=8


pass 1: 100%|██████████| 26/26 [00:00<00:00, 4311.20it/s]


  collected this pass: 0 | remaining: 5118

Pass 2: chunks=80, pause=0.8s, retries=2, async=True, workers=6


pass 2: 100%|██████████| 64/64 [00:00<00:00, 6719.29it/s]


  collected this pass: 0 | remaining: 5118

Pass 3: chunks=20, pause=1.0s, retries=2, async=False, workers=4


pass 3: 100%|██████████| 256/256 [00:00<00:00, 7157.23it/s]

  collected this pass: 338 | remaining: 4780
meta_yq rows: 338 | unique tickers: 338





Unnamed: 0,ticker,market_cap,sector,industry
0,ASTL,414686700.0,Basic Materials,Steel
1,ASTLW,,Basic Materials,Steel
2,ASTS,17736480000.0,Technology,Communication Equipment
3,ASUR,224885100.0,Technology,Software - Application
4,ASYS,135123800.0,Technology,Semiconductor Equipment & Materials


In [52]:
import pandas as pd

# map UPPER -> original for stable requests
upper_to_orig = {s.upper(): s for s in active_tickers}

have_up = set(meta_yq['ticker'].str.upper())
remaining_up = [u for u in upper_to_orig if u not in have_up]
remaining_orig = [upper_to_orig[u] for u in remaining_up]

print("already have:", len(have_up), "| remaining:", len(remaining_orig))
print(remaining_orig[:5])

already have: 338 | remaining: 4780
['AACB', 'AACBR', 'AACBU', 'AACG', 'AACI']


In [53]:
import time
import pandas as pd
from yahooquery import Ticker

def _rows_from_obj(obj, fields, prefer_col_symbol=True):
    """
    Robustly convert yahooquery output (dict | DataFrame | other) into a DataFrame
    with columns: ['ticker'] + fields (always present, even if empty).
    Skips entries that aren't dict-like.
    """
    cols = ['ticker'] + list(fields)
    rows = []

    if isinstance(obj, pd.DataFrame):
        df = obj.copy()
        # Use a 'symbol' column if present; otherwise use the index
        if prefer_col_symbol and 'symbol' in df.columns:
            take = ['symbol'] + [c for c in fields if c in df.columns]
            for r in df[take].to_dict('records'):
                sym = str(r.pop('symbol')).upper()
                row = {'ticker': sym}
                for f in fields: row[f] = r.get(f)
                rows.append(row)
        else:
            have = [c for c in fields if c in df.columns]
            for sym, r in df[have].iterrows():
                row = {'ticker': str(sym).upper()}
                for f in fields: row[f] = r.get(f) if f in r else None
                rows.append(row)
        return pd.DataFrame(rows, columns=cols)

    if isinstance(obj, dict):
        for sym, val in obj.items():
            if not isinstance(val, dict):
                # sometimes 'error' or a string -> skip
                continue
            row = {'ticker': str(sym).upper()}
            for f in fields: row[f] = val.get(f)
            rows.append(row)
        return pd.DataFrame(rows, columns=cols)

    # anything else -> empty with expected columns
    return pd.DataFrame(columns=cols)

def fetch_meta_simple_safe(symbols, chunk_size=120, pause=0.8, async_=True, workers=6):
    """Chunked yahooquery pull that tolerates mixed shapes and guarantees 'ticker' exists."""
    frames = []
    for i in range(0, len(symbols), chunk_size):
        batch = symbols[i:i+chunk_size]  # ORIGINAL CASE for requests
        tq = Ticker(batch, asynchronous=async_, max_workers=workers)

        # price -> marketCap
        price_df = _rows_from_obj(tq.price, ['marketCap'])
        if 'marketCap' in price_df.columns:
            price_df = price_df.rename(columns={'marketCap': 'market_cap'})
        if 'market_cap' not in price_df.columns:
            price_df['market_cap'] = pd.NA
        price_df = price_df[['ticker','market_cap']]

        # summary_profile -> sector, industry
        prof_df = _rows_from_obj(tq.summary_profile, ['sector', 'industry'])
        for c in ('sector','industry'):
            if c not in prof_df.columns:
                prof_df[c] = pd.NA
        prof_df = prof_df[['ticker','sector','industry']]

        # safe outer-join; both sides now guaranteed to have 'ticker'
        chunk_df = (price_df.merge(prof_df, on='ticker', how='outer')
                               .drop_duplicates('ticker'))

        if not chunk_df.empty:
            want = {s.upper() for s in batch}
            chunk_df = chunk_df[chunk_df['ticker'].isin(want)]
            frames.append(chunk_df)

        time.sleep(pause)  # be polite

    if not frames:
        return pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

    out = (pd.concat(frames, ignore_index=True)
             .drop_duplicates('ticker', keep='first'))
    return out

In [54]:
import time
import pandas as pd
from pathlib import Path

# empty-frame factory for convenience
def _empty_meta():
    return pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

def _safe_concat(a, b):
    if a is None or len(a)==0: return b if b is not None else _empty_meta()
    if b is None or len(b)==0: return a
    return (pd.concat([a, b], ignore_index=True)
              .drop_duplicates('ticker', keep='first'))

def try_batch_resilient(batch, async_fast=True, workers_fast=6, subsize=10, workers_slow=2):
    """
    Try one batch fast; on timeout, fall back to smaller, synchronous chunks.
    Returns a meta DataFrame (may be empty).
    """
    try:
        # fast path (your tolerant fetcher)
        got = fetch_meta_simple_safe(batch, chunk_size=len(batch), pause=0.0, async_=async_fast, workers=workers_fast)
        return got
    except Exception as e:
        msg = str(e)
        print(f"[warn] batch timeout/err ({len(batch)}): {msg[:120]} ... -> splitting to {subsize}")
        # slow, reliable path: split into tiny sub-batches synchronously
        out = _empty_meta()
        for j in range(0, len(batch), subsize):
            sub = batch[j:j+subsize]
            try:
                got = fetch_meta_simple_safe(sub, chunk_size=len(sub), pause=0.0, async_=False, workers=workers_slow)
                out = _safe_concat(out, got)
            except Exception as e2:
                print(f"[skip] sub-batch failed ({sub[:1]}…): {e2}")
        return out

def fill_remaining_with_retries(remain_symbols,
                                chunk_size=120,
                                pause=0.8,
                                async_fast=True,
                                workers_fast=6,
                                subsize_on_timeout=10,
                                workers_slow=2,
                                save_every=5,
                                progress_path=Path("outputs/meta_yq_progress.parquet")):
    """
    Processes remain_symbols in chunks, resilient to timeouts.
    Saves progress periodically to progress_path (parquet).
    """
    progress_path.parent.mkdir(parents=True, exist_ok=True)
    collected = _empty_meta()
    batches_done = 0
    for i in range(0, len(remain_symbols), chunk_size):
        batch = remain_symbols[i:i+chunk_size]
        got = try_batch_resilient(batch,
                                  async_fast=async_fast, workers_fast=workers_fast,
                                  subsize=subsize_on_timeout, workers_slow=workers_slow)
        if not got.empty:
            collected = _safe_concat(collected, got)
        batches_done += 1
        if batches_done % save_every == 0:
            try:
                collected.to_parquet(progress_path)
                print(f"[saved] {len(collected)} rows to {progress_path}")
            except Exception as e:
                print("[warn] save failed:", e)
        time.sleep(pause)
    # final save
    if len(collected):
        try:
            collected.to_parquet(progress_path)
            print(f"[saved-final] {len(collected)} rows to {progress_path}")
        except Exception as e:
            print("[warn] final save failed:", e)
    return collected

In [55]:
# # Only backfill yahooquery metadata if anything is actually missing
# from pathlib import Path
# import pandas as pd

# # make sure meta_yq exists (load from disk if needed)
# if 'meta_yq' not in globals() or meta_yq is None or meta_yq.empty:
#     p = Path("outputs/meta_yq.parquet")
#     meta_yq = pd.read_parquet(p) if p.exists() else pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

# filled = meta_yq.copy()

# # compute remaining in a case-robust way
# upper_to_orig = {s.upper(): s for s in active_tickers}
# have_up = set(filled['ticker'].astype(str).str.upper())
# remaining_up = [u for u in upper_to_orig if u not in have_up]
# remaining_orig = [upper_to_orig[u] for u in remaining_up]

# print("have:", len(have_up), "| remaining:", len(remaining_orig))

# # hard stop if nothing to do
# if not remaining_orig:
#     print("All covered — skipping yahooquery backfill.")
# else:
#     # OPTIONAL toggle to avoid accidental long runs
#     RUN_YQ_BACKFILL = True

#     if not RUN_YQ_BACKFILL:
#         print("Backfill disabled (set RUN_YQ_BACKFILL=True to run).")
#     else:
#         gotA = fill_remaining_with_retries(
#             remaining_orig,
#             chunk_size=100, pause=1.0, async_fast=True, workers_fast=6,
#             subsize_on_timeout=12, workers_slow=2,
#             save_every=4, progress_path=Path("outputs/meta_yq_progress.parquet")
#         )
#         filled = (pd.concat([filled, gotA], ignore_index=True)
#                     .drop_duplicates('ticker', keep='first'))

#         # recompute remaining
#         have_up = set(filled['ticker'].astype(str).str.upper())
#         remaining_up   = [u for u in upper_to_orig if u not in have_up]
#         remaining_orig = [upper_to_orig[u] for u in remaining_up]
#         print("after pass A, still remaining:", len(remaining_orig))

#         if remaining_orig:
#             gotB = fill_remaining_with_retries(
#                 remaining_orig,
#                 chunk_size=30, pause=1.2, async_fast=False, workers_fast=4,
#                 subsize_on_timeout=8, workers_slow=2,
#                 save_every=8, progress_path=Path("outputs/meta_yq_progress.parquet")
#             )
#             filled = (pd.concat([filled, gotB], ignore_index=True)
#                         .drop_duplicates('ticker', keep='first'))

#         meta_yq = filled.assign(ticker=lambda d: d['ticker'].str.upper())
#         print("TOTAL meta_yq:", len(meta_yq), "| unique:", meta_yq['ticker'].nunique())
#         meta_yq.to_parquet("outputs/meta_yq.parquet")
#         meta_yq.to_csv("outputs/meta_yq.csv", index=False)

In [56]:
from pathlib import Path
import pandas as pd

# Load existing meta; do not scrape if we have it
if 'meta_yq' not in globals() or meta_yq is None or meta_yq.empty:
    if P_META_YQ.exists():
        meta_yq = pd.read_parquet(P_META_YQ)
    else:
        meta_yq = pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

filled = meta_yq.copy()

# compute remaining robustly
upper_to_orig = {s.upper(): s for s in active_tickers}
have_up = set(filled['ticker'].astype(str).str.upper())
remaining_up   = [u for u in upper_to_orig if u not in have_up]
remaining_orig = [upper_to_orig[u] for u in remaining_up]

print("have:", len(have_up), "| remaining:", len(remaining_orig))

# FAST EXIT: skip if nothing missing OR network disabled
if not remaining_orig or NO_NETWORK or not RUN_YQ_BACKFILL:
    reason = ("nothing missing" if not remaining_orig else 
              "NO_NETWORK=True" if NO_NETWORK else 
              "RUN_YQ_BACKFILL=False")
    print(f"Skipping yahooquery backfill ({reason}).")
else:
    gotA = fill_remaining_with_retries(
        remaining_orig,
        chunk_size=100, pause=1.0, async_fast=True, workers_fast=6,
        subsize_on_timeout=12, workers_slow=2,
        save_every=4, progress_path=OUT/"meta_yq_progress.parquet"
    )
    filled = (pd.concat([filled, gotA], ignore_index=True)
                .drop_duplicates('ticker', keep='first'))

    # recompute once
    have_up = set(filled['ticker'].astype(str).str.upper())
    remaining_up   = [u for u in upper_to_orig if u not in have_up]
    remaining_orig = [upper_to_orig[u] for u in remaining_up]
    print("after pass A, still remaining:", len(remaining_orig))

    if remaining_orig:
        gotB = fill_remaining_with_retries(
            remaining_orig,
            chunk_size=30, pause=1.2, async_fast=False, workers_fast=4,
            subsize_on_timeout=8, workers_slow=2,
            save_every=8, progress_path=OUT/"meta_yq_progress.parquet"
        )
        filled = (pd.concat([filled, gotB], ignore_index=True)
                    .drop_duplicates('ticker', keep='first'))

    meta_yq = filled.assign(ticker=lambda d: d['ticker'].str.upper())
    print("TOTAL meta_yq:", len(meta_yq), "| unique:", meta_yq['ticker'].nunique())
    meta_yq.to_parquet(P_META_YQ)
    meta_yq.to_csv(OUT/"meta_yq.csv", index=False)

have: 338 | remaining: 4780
Skipping yahooquery backfill (NO_NETWORK=True).


In [57]:
import pandas as pd
from pathlib import Path

# start from what's in memory
filled = meta_yq.copy()
filled['ticker'] = filled['ticker'].str.upper()

# union with progress parquet (incremental batches you saved)
prog_path = Path("outputs/meta_yq_progress.parquet")
if prog_path.exists():
    prog = pd.read_parquet(prog_path)
    prog['ticker'] = prog['ticker'].str.upper()
    filled = (pd.concat([filled, prog], ignore_index=True)
                .drop_duplicates('ticker', keep='first'))

print("metadata rows:", len(filled))

metadata rows: 368


In [58]:
# from pathlib import Path
# import pandas as pd, json, time, random
# from yahooquery import Ticker
# from tqdm import tqdm

# # Ensure meta_yq is loaded
# if 'meta_yq' not in globals() or meta_yq is None or meta_yq.empty:
#     meta_yq = pd.read_parquet("outputs/meta_yq.parquet") if Path("outputs/meta_yq.parquet").exists() \
#               else pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

# # Build missing list robustly (upper-case on both sides)
# upper_to_orig = {s.upper(): s for s in active_tickers}
# have_up = set(meta_yq['ticker'].astype(str).str.upper())
# missing_up   = sorted(set(upper_to_orig) - have_up)
# missing_orig = [upper_to_orig[u] for u in missing_up]
# print("Missing:", len(missing_orig))

# # Hard stop if nothing to do
# if not missing_orig:
#     print("All tickers already cached — skipping yahooquery harvest.")
# else:
#     CACHE = Path("outputs/yq_symbol_cache"); CACHE.mkdir(parents=True, exist_ok=True)

#     def fetch_one_yq(sym):
#         """Fetch market_cap, sector, industry for one symbol; cache to disk."""
#         fp = CACHE / f"{sym.upper()}.json"
#         if fp.exists():
#             try: return json.loads(fp.read_text())
#             except Exception: pass

#         data = {'symbol': sym.upper(), 'market_cap': None, 'sector': None, 'industry': None}
#         try:
#             t = Ticker(sym, asynchronous=False, formatted=False)
#             pr = t.price
#             if isinstance(pr, dict):
#                 rec = pr.get(sym) or (next(iter(pr.values())) if pr else {})
#                 if isinstance(rec, dict):
#                     data['market_cap'] = rec.get('marketCap')
#                     data['symbol'] = (rec.get('symbol') or sym).upper()
#             # Try both profiles; either may be present
#             for attr in ('summary_profile', 'asset_profile'):
#                 try:
#                     prof = getattr(t, attr)
#                     if isinstance(prof, dict):
#                         rec = prof.get(sym) or (next(iter(prof.values())) if prof else {})
#                         if isinstance(rec, dict):
#                             data['sector']   = data['sector']   or rec.get('sector')
#                             data['industry'] = data['industry'] or rec.get('industry')
#                 except Exception:
#                     pass
#         except Exception:
#             pass

#         try: fp.write_text(json.dumps(data))
#         except Exception: pass
#         time.sleep(0.25 + random.random()*0.2)  # polite jitter
#         return data

#     # Sequential harvest (reliable; you can stop and resume anytime)
#     rows = []
#     for i, sym in enumerate(tqdm(missing_orig, desc="harvesting")):
#         rows.append(fetch_one_yq(sym))
#         if (i+1) % 200 == 0:
#             tmp = (pd.DataFrame(rows)
#                      .rename(columns={'symbol':'ticker'})
#                      .assign(ticker=lambda d: d['ticker'].str.upper()))
#             tmp.to_parquet("outputs/meta_yq_more.parquet")
#             print("saved partial:", len(tmp))

#     meta_more = (pd.DataFrame(rows)
#                    .rename(columns={'symbol':'ticker'})
#                    .assign(ticker=lambda d: d['ticker'].str.upper())
#                    [['ticker','market_cap','sector','industry']]
#                    .drop_duplicates('ticker'))

#     meta_yq = (pd.concat([meta_yq, meta_more], ignore_index=True)
#                  .drop_duplicates('ticker', keep='first'))
#     print("New total meta_yq:", len(meta_yq))
#     meta_yq.to_parquet("outputs/meta_yq.parquet")
#     meta_yq.to_csv("outputs/meta_yq.csv", index=False)

In [59]:
from pathlib import Path
import pandas as pd, json, time, random
from yahooquery import Ticker
from tqdm import tqdm
import re

if 'meta_yq' not in globals() or meta_yq is None or meta_yq.empty:
    meta_yq = pd.read_parquet(P_META_YQ) if P_META_YQ.exists() else pd.DataFrame(columns=['ticker','market_cap','sector','industry'])

upper_to_orig = {s.upper(): s for s in active_tickers}
have_up = set(meta_yq['ticker'].astype(str).str.upper())
missing_up   = sorted(set(upper_to_orig) - have_up)
missing_orig = [upper_to_orig[u] for u in missing_up]
print("Missing:", len(missing_orig))

# FAST EXIT: skip if no work or network disabled
if not missing_orig or NO_NETWORK:
    print("Skipping per-symbol harvest (no work or NO_NETWORK=True).")
else:
    CACHE = P_YQ_SYM_DIR; CACHE.mkdir(parents=True, exist_ok=True)

    # optional: skip warrants/units/rights to save time
    _SKIP_SUFFIX_RE = re.compile(r".*(?:W|WS|WT|W[A-D]?|U|R)$")

    def fetch_one_yq(sym):
        """Fetch market_cap/sector/industry for one symbol; cache."""
        t = sym.upper()
        if _SKIP_SUFFIX_RE.match(t):
            fp = CACHE / f"{t}.json"
            if not fp.exists():
                fp.write_text(json.dumps({'symbol': t, 'market_cap': None, 'sector': None, 'industry': None}))
            return {'symbol': t, 'market_cap': None, 'sector': None, 'industry': None}

        fp = CACHE / f"{t}.json"
        if fp.exists():
            try: return json.loads(fp.read_text())
            except Exception: pass

        data = {'symbol': t, 'market_cap': None, 'sector': None, 'industry': None}
        try:
            tq = Ticker(sym, asynchronous=False, formatted=False)
            pr = tq.price
            if isinstance(pr, dict):
                rec = pr.get(sym) or (next(iter(pr.values())) if pr else {})
                if isinstance(rec, dict):
                    data['market_cap'] = rec.get('marketCap')
                    data['symbol'] = (rec.get('symbol') or t).upper()
            for attr in ('summary_profile', 'asset_profile'):
                try:
                    prof = getattr(tq, attr)
                    if isinstance(prof, dict):
                        rec = prof.get(sym) or (next(iter(prof.values())) if prof else {})
                        if isinstance(rec, dict):
                            data['sector']   = data['sector']   or rec.get('sector')
                            data['industry'] = data['industry'] or rec.get('industry')
                except Exception:
                    pass
        except Exception:
            pass

        try: fp.write_text(json.dumps(data))
        except Exception: pass
        time.sleep(0.25 + random.random()*0.2)
        return data

    rows = []
    for i, sym in enumerate(tqdm(missing_orig, desc="harvesting")):
        rows.append(fetch_one_yq(sym))
        if (i+1) % 200 == 0:
            tmp = (pd.DataFrame(rows).rename(columns={'symbol':'ticker'})
                                     .assign(ticker=lambda d: d['ticker'].str.upper()))
            tmp.to_parquet(OUT/"meta_yq_more.parquet")
            print("saved partial:", len(tmp))

    meta_more = (pd.DataFrame(rows)
                   .rename(columns={'symbol':'ticker'})
                   .assign(ticker=lambda d: d['ticker'].str.upper())
                   [['ticker','market_cap','sector','industry']]
                   .drop_duplicates('ticker'))

    meta_yq = (pd.concat([meta_yq, meta_more], ignore_index=True)
                 .drop_duplicates('ticker', keep='first'))
    meta_yq.to_parquet(P_META_YQ)
    meta_yq.to_csv(OUT/"meta_yq.csv", index=False)
    print("New total meta_yq:", len(meta_yq))

Missing: 4780
Skipping per-symbol harvest (no work or NO_NETWORK=True).


In [60]:
meta_yq.head()

Unnamed: 0,ticker,market_cap,sector,industry
0,ASTL,414686700.0,Basic Materials,Steel
1,ASTLW,,Basic Materials,Steel
2,ASTS,17736480000.0,Technology,Communication Equipment
3,ASUR,224885100.0,Technology,Software - Application
4,ASYS,135123800.0,Technology,Semiconductor Equipment & Materials


In [61]:
prices_raw.head()

Price,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,Adj Close,...,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume,Volume
Ticker,AACB,AACBR,AACBU,AACG,AACI,AACIU,AACIW,AADR,AAL,AALG,...,ZUMZ,ZURA,ZVRA,ZVZZT,ZWZZT,ZXZZT,ZYBT,ZYME,ZYN,ZYXI
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
2022-10-03,,,,1.75,,,,43.794346,11.92,,...,373000,,214700,,,,,958200,,167500
2022-10-04,,,,1.82,,,,44.920731,12.95,,...,390300,,317300,,,,,1142000,,114300
2022-10-05,,,,1.83,,,,44.814838,12.87,,...,319600,,190100,,,,,890900,,132600
2022-10-06,,,,1.96,,,,44.246822,12.73,,...,387700,,125300,,,,,555400,,94600
2022-10-07,,,,1.89,,,,43.649937,12.18,,...,201500,,269600,,,,,1057400,,141300


RUN THIS BELOW NEXT AFTER THE 

In [62]:
import json, time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter, Retry
import requests
import pandas as pd
from tqdm import tqdm

SEC_HEADERS = {"User-Agent": "Ozkan Gelincik <ozkangelincik@gmail.com>", "Accept": "application/json"}
CACHE = Path("outputs/sec_cache"); CACHE.mkdir(parents=True, exist_ok=True)

# Retry-capable session
def make_session():
    sess = requests.Session()
    retries = Retry(
        total=5, connect=3, read=3, backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"]
    )
    adapter = HTTPAdapter(max_retries=retries, pool_connections=30, pool_maxsize=30)
    sess.mount("https://", adapter)
    sess.headers.update(SEC_HEADERS)
    return sess

SESSION = make_session()

# Ticker -> CIK (single download)
def load_cik_map():
    r = SESSION.get("https://www.sec.gov/files/company_tickers.json", timeout=30)
    r.raise_for_status()
    m = r.json()
    return {v["ticker"].upper(): str(v["cik_str"]).zfill(10) for v in m.values()}

CIK_MAP = load_cik_map()

def fetch_cik_json(cik: str):
    """Return dict or None; never raise on JSON decode."""
    url = f"https://data.sec.gov/submissions/CIK{cik}.json"
    try:
        r = SESSION.get(url, timeout=30)
        if r.status_code != 200:
            return None
        try:
            return r.json()
        except Exception:
            return None
    except Exception:
        return None

def sec_recent_one(ticker: str):
    """Robust per-ticker; caches; never raises."""
    t_up = ticker.upper()
    cik = CIK_MAP.get(t_up)
    base = {"ticker": t_up, "cik": cik, "recent_form": None, "recent_filing_date": None}
    if not cik:
        return base

    fp = CACHE / f"{cik}.json"
    if fp.exists():
        try:
            j = json.loads(fp.read_text())
        except Exception:
            j = None
    else:
        j = fetch_cik_json(cik)
        if j:
            try:
                fp.write_text(json.dumps(j))
            except Exception:
                pass
        time.sleep(0.08)  # be polite

    if not isinstance(j, dict):
        return base

    recent = j.get("filings", {}).get("recent", {})
    forms = recent.get("form", []) or []
    dates = recent.get("filingDate", []) or []
    if forms and dates:
        base["recent_form"] = forms[0]
        base["recent_filing_date"] = dates[0]
    return base

def sec_recent_parallel(tickers, max_workers=3):
    """Parallel but tolerant: exceptions are captured, progress continues."""
    rows = []
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs = [ex.submit(sec_recent_one, t) for t in tickers]
        for f in tqdm(as_completed(futs), total=len(futs), desc="SEC"):
            try:
                rows.append(f.result())
            except Exception as e:
                rows.append({"ticker": None, "cik": None, "recent_form": None, "recent_filing_date": None, "error": str(e)[:120]})
    return pd.DataFrame(rows)

In [63]:
# 1) Figure out which tickers still need a SEC JSON file
upper_to_orig = {t.upper(): t for t in active_tickers}
to_fetch = []
for u, orig in upper_to_orig.items():
    cik = CIK_MAP.get(u)
    if cik and not (CACHE / f"{cik}.json").exists():
        to_fetch.append(orig)

print("cache-missing:", len(to_fetch), "of", len(active_tickers))

cache-missing: 0 of 5118


In [64]:
# 2) Fetch the missing ones (polite, retrying, parallel)
sec_new = sec_recent_parallel(to_fetch, max_workers=3)
print("fetched rows:", len(sec_new))

SEC: 0it [00:00, ?it/s]

fetched rows: 0





In [65]:

# === M1: Build analysis_enriched with splits + time-varying shares/float ===
from pathlib import Path
import pandas as pd

# Start from daily prices + fundamentals
analysis_enriched = (
    analysis_df
      .merge(fundamentals, on='ticker', how='left')
      .sort_values(['ticker','date'])
      .reset_index(drop=True)
)

# Add split flags (1 on event day, else 0)
analysis_enriched = analysis_enriched.merge(split_flags, on=['ticker','date'], how='left')
for c in ['performed_split','performed_reverse_split']:
    if c not in analysis_enriched.columns:
        analysis_enriched[c] = 0
    analysis_enriched[c] = analysis_enriched[c].fillna(0).astype('int8')

# Add time-varying shares & float_shares (SEC), forward-fill between filings
analysis_enriched = analysis_enriched.merge(events_enriched, on=['ticker','date'], how='left')

# Optional fallbacks from snapshots if present (avoid _x/_y residue)
if 'shares_outstanding_x' in analysis_enriched.columns and 'shares_outstanding_y' in analysis_enriched.columns:
    analysis_enriched['shares_outstanding'] = analysis_enriched['shares_outstanding_x'].where(
        analysis_enriched['shares_outstanding_x'].notna(),
        analysis_enriched['shares_outstanding_y']
    )
    analysis_enriched.drop(columns=['shares_outstanding_x','shares_outstanding_y'], inplace=True)

if 'float_shares_x' in analysis_enriched.columns and 'float_shares_y' in analysis_enriched.columns:
    analysis_enriched['float_shares'] = analysis_enriched['float_shares_x'].where(
        analysis_enriched['float_shares_x'].notna(),
        analysis_enriched['float_shares_y']
    )
    analysis_enriched.drop(columns=['float_shares_x','float_shares_y'], inplace=True)

# Forward-fill time series within each ticker
for col in ['shares_outstanding','float_shares']:
    if col in analysis_enriched.columns:
        analysis_enriched[col] = analysis_enriched.groupby('ticker')[col].ffill()

# Alias
analysis_enriched['free_float'] = analysis_enriched.get('float_shares')

# Save
OUT = Path("outputs")
analysis_enriched.to_parquet(OUT / "analysis_enriched.parquet")
analysis_enriched.to_csv(OUT / "analysis_enriched.csv", index=False)

print("rows:", len(analysis_enriched),
      "| tickers:", analysis_enriched['ticker'].nunique(),
      "| has cols:", {'shares_outstanding','float_shares','free_float','performed_split','performed_reverse_split'}.issubset(analysis_enriched.columns))


rows: 3845871 | tickers: 5121 | has cols: True


In [66]:
sec_recent_df.to_parquet("outputs/sec_recent.parquet", index=False)
sec_recent_df.to_csv("outputs/sec_recent.csv", index=False)

def sic_to_sector(s):
    try: s = int(s)
    except: return None
    if 1<=s<=9:   return 'Agriculture'
    if 10<=s<=14: return 'Mining'
    if 15<=s<=17: return 'Construction'
    if 20<=s<=39: return 'Manufacturing'
    if 40<=s<=49: return 'Transportation & Utilities'
    if 50<=s<=51: return 'Wholesale'
    if 52<=s<=59: return 'Retail'
    if 60<=s<=67: return 'Finance'
    if 70<=s<=89: return 'Services'
    if 91<=s<=99: return 'Public Administration'
    return None

sec_recent_df["sector_sic"] = sec_recent_df["sic"].apply(sic_to_sector)

In [67]:
import pandas as _pd
if 'prices_raw' not in globals() or prices_raw is None or (isinstance(prices_raw, _pd.DataFrame) and prices_raw.empty):
    # Rebuild analysis_df from prices_raw
    import pandas as pd, numpy as np
    from pathlib import Path

    # If prices_raw isn't in memory, try to load it
    if 'prices_raw' not in globals() or prices_raw is None:
        p = Path("outputs/prices_wide.parquet")
        if p.exists():
            prices_raw = pd.read_parquet(p)
        else:
            # try assembling from chunks
            parts = sorted((Path("outputs")/"chunks").glob("prices_wide_*.parquet"))
            if parts:
                prices_raw = pd.concat([pd.read_parquet(x) for x in parts], axis=1).sort_index()
            else:
                raise RuntimeError("prices_raw not loaded and no saved files found.")

    def tidy_from_yf(wide: pd.DataFrame) -> pd.DataFrame:
        df = wide.copy(); df.index.name = 'date'
        tidy = (df.stack(level=1, future_stack=True)
                  .rename_axis(['date','ticker'])
                  .reset_index()
                  .sort_values(['ticker','date'])
                  .reset_index(drop=True))
        # lower-case column names to be consistent
        tidy.columns = [c.lower() if isinstance(c, str) else c for c in tidy.columns]
        return tidy

    prices_tidy = tidy_from_yf(prices_raw)

    # daily returns
    adj = prices_tidy[['date','ticker','adj close']].dropna().sort_values(['ticker','date']).copy()
    adj['ret']    = adj.groupby('ticker')['adj close'].pct_change()
    adj['logret'] = np.log(adj['adj close'] / adj.groupby('ticker')['adj close'].shift(1))

    analysis_df = (
        prices_tidy[['date','ticker','open','high','low','close','adj close','volume']]
        .merge(adj[['date','ticker','ret','logret']], on=['date','ticker'], how='left')
    )
else:
    print('Skipping: prices_raw already loaded from cache (Bootstrap).')


Skipping: prices_raw already loaded from cache (Bootstrap).


In [68]:
import pandas as pd

# normalize tickers
meta_yq['ticker'] = meta_yq['ticker'].str.upper()
sec_recent_df['ticker'] = sec_recent_df['ticker'].str.upper()
analysis_df['ticker'] = analysis_df['ticker'].str.upper()

# build fundamentals (prefer Yahoo, backfill with SIC sector if present)
keep_sec_cols = ['ticker','cik','recent_form','recent_filing_date','sic','sic_desc']
if 'sector_sic' in sec_recent_df.columns:
    keep_sec_cols.append('sector_sic')

fundamentals = meta_yq.merge(sec_recent_df[keep_sec_cols], on="ticker", how="left")

if 'sector_sic' in fundamentals.columns:
    fundamentals['sector'] = fundamentals['sector'].where(fundamentals['sector'].notna(), fundamentals['sector_sic'])

# tidy columns & save
cols = ['ticker','market_cap','sector','industry','cik','sic','sic_desc','recent_form','recent_filing_date']
for c in cols:
    if c not in fundamentals.columns:
        fundamentals[c] = pd.NA
fundamentals = fundamentals[cols].drop_duplicates('ticker')

fundamentals.to_parquet("outputs/fundamentals.parquet")
fundamentals.to_csv("outputs/fundamentals.csv", index=False)
print("fundamentals tickers:", fundamentals['ticker'].nunique())

# merge into analysis & save
analysis_enriched = (
    analysis_df.merge(fundamentals, on="ticker", how="left")
               .sort_values(["ticker","date"])
               .reset_index(drop=True)
)
analysis_enriched.to_parquet("outputs/analysis_enriched.parquet")
analysis_enriched.to_csv("outputs/analysis_enriched.csv", index=False)
print("analysis_enriched rows:", len(analysis_enriched))
display(analysis_enriched.head())

fundamentals tickers: 338
analysis_enriched rows: 3845871


Unnamed: 0,date,ticker,open,high,low,close,adj close,volume,ret,logret,market_cap,sector,industry,cik,sic,sic_desc,recent_form,recent_filing_date
0,2022-10-03,AACB,,,,,,,,,,,,,,,,
1,2022-10-04,AACB,,,,,,,,,,,,,,,,
2,2022-10-05,AACB,,,,,,,,,,,,,,,,
3,2022-10-06,AACB,,,,,,,,,,,,,,,,
4,2022-10-07,AACB,,,,,,,,,,,,,,,,


In [69]:
# === FORCE FINALIZE (safe, idempotent) ===
from pathlib import Path
import pandas as pd, numpy as np

OUT = Path("outputs"); OUT.mkdir(exist_ok=True)

# ---- 1) Base tables must exist in memory (from earlier cells)
assert 'analysis_df' in globals(), "analysis_df is missing – run the bootstrap cells first"
assert 'fundamentals' in globals(), "fundamentals is missing – run the bootstrap cells first"

# ---- 2) Load split_flags from memory or disk
SPLIT_FLAGS_PQ = OUT / "split_flags.parquet"
if 'split_flags' not in globals():
    if SPLIT_FLAGS_PQ.exists():
        split_flags = pd.read_parquet(SPLIT_FLAGS_PQ)
    else:
        split_flags = pd.DataFrame(columns=['ticker','date','performed_split','performed_reverse_split'])

# Normalize types
split_flags = (
    split_flags.assign(
        ticker=lambda d: d['ticker'].astype(str),
        date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize()
    )
    .drop_duplicates(['ticker','date'])
)

# ---- 3) Build events_enriched from SEC shares_events + price table (robust, no merge_asof)
SHARES_EVENTS_PQ = OUT / "shares_events.parquet"
if 'events_enriched' in globals():
    ee = events_enriched.copy()
else:
    if SHARES_EVENTS_PQ.exists():
        shares_events = pd.read_parquet(SHARES_EVENTS_PQ)
    else:
        shares_events = pd.DataFrame(columns=['date','ticker','shares_outstanding','entity_public_float_usd'])

    shares_events = (
        shares_events.assign(
            ticker=lambda d: d['ticker'].astype(str),
            date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize()
        )
        .dropna(subset=['ticker','date'])
        .drop_duplicates(['ticker','date'])
        .sort_values(['ticker','date'])
    )

    price_ev = (
        analysis_df[['date','ticker','adj close']]
        .rename(columns={'adj close':'adj_close'})
        .assign(
            ticker=lambda d: d['ticker'].astype(str),
            date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize()
        )
        .dropna(subset=['ticker','date','adj_close'])
        .drop_duplicates(['ticker','date'])
        .sort_values(['ticker','date'])
    )

    # map last price <= filing date per ticker via index union + ffill
    events_enriched_list = []
    g_prices = dict(tuple(price_ev.groupby('ticker')))
    for tkr, g in shares_events.groupby('ticker', sort=False):
        filing_idx = g[['date']].drop_duplicates().set_index('date')
        p = g_prices.get(tkr)
        if p is None or p.empty:
            g2 = g.copy()
            g2['adj_close'] = np.nan
            events_enriched_list.append(g2)
            continue
        p = p.set_index('date')['adj_close'].sort_index()
        union_idx = p.index.union(filing_idx.index).sort_values()
        p_ffill = p.reindex(union_idx).ffill()
        adj_on_filing = p_ffill.reindex(filing_idx.index)
        g2 = g.copy()
        g2['adj_close'] = adj_on_filing.values
        events_enriched_list.append(g2)

    ee = (pd.concat(events_enriched_list, ignore_index=True) if events_enriched_list else shares_events.copy())
    ee['float_shares'] = np.where(
        ee.get('entity_public_float_usd').notna() & ee['adj_close'].notna(),
        ee['entity_public_float_usd'] / ee['adj_close'],
        np.nan
    )
    ee = ee[['date','ticker','shares_outstanding','float_shares']].sort_values(['ticker','date'])

# ---- 4) Rebuild analysis_enriched with new columns (and don’t overwrite older file)
ae = (
    analysis_df
      .merge(fundamentals, on='ticker', how='left')
      .merge(split_flags, on=['ticker','date'], how='left')
      .merge(ee, on=['ticker','date'], how='left')
      .sort_values(['ticker','date'])
      .reset_index(drop=True)
)

# Fill/convert flags; forward-fill shares/float within each ticker
for c in ['performed_split','performed_reverse_split']:
    if c not in ae.columns:
        ae[c] = 0
    ae[c] = ae[c].fillna(0).astype('int8')

for c in ['shares_outstanding','float_shares']:
    if c in ae.columns:
        ae[c] = ae.groupby('ticker')[c].ffill()

ae['free_float'] = ae.get('float_shares')

# ---- 5) Save as new version to avoid clobbering old outputs
ae.to_parquet(OUT / "analysis_enriched_v2.parquet")
ae.to_csv(OUT / "analysis_enriched_v2.csv", index=False)

# Put back into the working var name if you want to continue with it
analysis_enriched = ae

need = {'shares_outstanding','float_shares','free_float','performed_split','performed_reverse_split'}
print("OK:", need.issubset(analysis_enriched.columns),
      "| rows:", len(analysis_enriched),
      "| tickers:", analysis_enriched['ticker'].nunique())
print("Saved:", (OUT/'analysis_enriched_v2.parquet'), (OUT/'analysis_enriched_v2.csv'))

OK: True | rows: 3845871 | tickers: 5121
Saved: outputs/analysis_enriched_v2.parquet outputs/analysis_enriched_v2.csv


In [70]:
# === Backfill v6 (yfinance snapshots): shares_outstanding / float_shares ===
import time, json, re
from pathlib import Path
import numpy as np
import pandas as pd
import yfinance as yf

assert 'analysis_enriched' in globals(), "Run the finalize cell so analysis_enriched exists."

OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
YF_CACHE = OUT / "yf_snapshot_cache"; YF_CACHE.mkdir(parents=True, exist_ok=True)

# Skip obvious non-common-stock symbols (warrants/units/rights/when-issued)
_SKIP_SUFFIX_RE = re.compile(r".*(?:W|WS|WT|W[A-D]?|U|R)$")

def tickers_missing_both(ae: pd.DataFrame) -> list:
    g = (ae.groupby('ticker')[['shares_outstanding','float_shares']]
           .apply(lambda df: pd.Series({
               'has_shares': df['shares_outstanding'].notna().any(),
               'has_float' : df['float_shares'].notna().any()
           })))
    return sorted(g.index[(~g['has_shares']) & (~g['has_float'])].tolist())

def _safe_float(x):
    try:
        return float(x)
    except Exception:
        return None

def _read_cache(sym):
    p = YF_CACHE / f"{sym}.json"
    if p.exists():
        try:
            return json.loads(p.read_text())
        except Exception:
            return None
    return None

def _write_cache(sym, d):
    p = YF_CACHE / f"{sym}.json"
    try:
        p.write_text(json.dumps(d))
    except Exception:
        pass

def fetch_yf_snapshot_one(sym: str):
    """Return {'ticker', 'shares_outstanding_yf', 'float_shares_yf'} using yfinance with fallbacks."""
    t = str(sym).upper()
    if _SKIP_SUFFIX_RE.match(t):
        return {'ticker': t, 'shares_outstanding_yf': None, 'float_shares_yf': None}

    # cache first
    cached = _read_cache(t)
    if isinstance(cached, dict) and 'shares_outstanding_yf' in cached and 'float_shares_yf' in cached:
        return {'ticker': t, **cached}

    out = {'ticker': t, 'shares_outstanding_yf': None, 'float_shares_yf': None}
    try:
        tk = yf.Ticker(t)

        # --- 1) Fast path
        fast = {}
        try:
            fast = tk.fast_info or {}
        except Exception:
            fast = {}

        last_price = _safe_float(fast.get('last_price')) or _safe_float(fast.get('lastPrice'))  # yfinance names vary
        if last_price is None:
            # short history fallback for price
            try:
                h = tk.history(period='5d', interval='1d')
                if not h.empty:
                    last_price = _safe_float(h['Close'].iloc[-1])
            except Exception:
                pass

        market_cap = _safe_float(fast.get('market_cap')) or _safe_float(fast.get('marketCap'))
        shares_fast = _safe_float(fast.get('shares'))

        # --- 2) Slower info fallback
        info = {}
        if shares_fast is None or market_cap is None:
            try:
                info = tk.info or {}
            except Exception:
                info = {}

        shares_info = _safe_float(info.get('sharesOutstanding'))
        float_info  = _safe_float(info.get('floatShares'))
        insiders    = _safe_float(info.get('heldPercentInsiders'))  # 0..1

        # choose shares_outstanding
        shares = shares_fast or shares_info
        if shares is None and market_cap is not None and last_price not in (None, 0.0):
            shares = market_cap / last_price

        # choose float_shares
        flt = float_info
        if flt is None and insiders is not None and shares is not None:
            flt = (1.0 - insiders) * shares

        out['shares_outstanding_yf'] = shares
        out['float_shares_yf'] = flt

    except Exception:
        # keep Nones
        pass

    _write_cache(t, {'shares_outstanding_yf': out['shares_outstanding_yf'], 'float_shares_yf': out['float_shares_yf']})
    time.sleep(0.12)  # polite pause
    return out

# 1) who still needs backfill?
need_yf = tickers_missing_both(analysis_enriched)
print(f"Tickers still missing both (pre-yf): {len(need_yf)}")

# 2) harvest with caching (sequential = most reliable for yfinance)
rows = []
for i, tkr in enumerate(need_yf, 1):
    rows.append(fetch_yf_snapshot_one(tkr))
    if i % 200 == 0:
        print(f"…processed {i}/{len(need_yf)}")

yf_snap = (pd.DataFrame(rows)
             .dropna(how='all', subset=['shares_outstanding_yf','float_shares_yf'])
             .drop_duplicates('ticker'))

print("yfinance snapshot rows usable:", len(yf_snap))

# 3) apply only where still missing; then forward-fill within ticker
if not yf_snap.empty:
    yf_snap['ticker'] = yf_snap['ticker'].astype(str).str.upper()
    map_sh = yf_snap.set_index('ticker')['shares_outstanding_yf'].to_dict()
    map_ff = yf_snap.set_index('ticker')['float_shares_yf'].to_dict()

    mask_sh = analysis_enriched['shares_outstanding'].isna()
    analysis_enriched.loc[mask_sh, 'shares_outstanding'] = analysis_enriched.loc[mask_sh, 'ticker'].map(map_sh)

    mask_ff = analysis_enriched['float_shares'].isna()
    analysis_enriched.loc[mask_ff, 'float_shares'] = analysis_enriched.loc[mask_ff, 'ticker'].map(map_ff)

    analysis_enriched['free_float'] = analysis_enriched['float_shares']

    for c in ['shares_outstanding','float_shares','free_float']:
        if c in analysis_enriched.columns:
            analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

    # Save new version
    analysis_enriched.to_parquet(OUT / "analysis_enriched_v6.parquet")
    analysis_enriched.to_csv(OUT / "analysis_enriched_v6.csv", index=False)
    print("Backfill applied. Saved analysis_enriched_v6.*")

# Coverage report
cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage now -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

Tickers still missing both (pre-yf): 1544
…processed 200/1544
…processed 400/1544
…processed 600/1544
…processed 800/1544
…processed 1000/1544
…processed 1200/1544
…processed 1400/1544
yfinance snapshot rows usable: 327
Backfill applied. Saved analysis_enriched_v6.*
Coverage now -> shares_outstanding: 3658 | float_shares: 2752


In [71]:
# === SEC enrichment from cache (per-ticker join; robust) ===
from pathlib import Path
import json, numpy as np, pandas as pd

OUT = Path("outputs")
SEC_FACTS_DIR = OUT / "sec_facts"

assert 'analysis_enriched' in globals(), "Run your finalize/backfill cells first."
assert SEC_FACTS_DIR.exists(), "No sec_facts cache found. Run the SEC companyfacts step at least once."

# 1) Helpers to read cached companyfacts and extract expanded shares/public float
def _collect_any_units(facts, tax, name):
    try:
        units = facts['facts'][tax][name]['units']
    except Exception:
        return []
    out = []
    for unit, arr in (units or {}).items():
        for d in arr or []:
            end = d.get('end'); val = d.get('val')
            if end and val is not None:
                out.append({
                    'date': pd.to_datetime(end).tz_localize(None).normalize(),
                    'val' : float(val),
                    'unit': unit,
                    'form': d.get('form')
                })
    return out

def extract_shares_expanded(facts_json: dict) -> pd.DataFrame:
    keys = [
        ('us-gaap',  'CommonStockSharesOutstanding'),
        ('dei',      'EntityCommonStockSharesOutstanding'),
        ('us-gaap',  'CommonStockSharesIssued'),
        ('ifrs-full','NumberOfSharesOutstanding'),
        ('ifrs-full','IssuedCapitalNumberOfShares'),
        # weighted-average fallbacks (approximate):
        ('us-gaap',  'WeightedAverageNumberOfSharesOutstandingBasic'),
        ('us-gaap',  'WeightedAverageNumberOfDilutedSharesOutstanding'),
        ('ifrs-full','WeightedAverageNumberOfOrdinarySharesOutstandingBasic'),
        ('ifrs-full','WeightedAverageNumberOfOrdinarySharesOutstandingDiluted'),
    ]
    rows = []
    for tax, name in keys:
        rows += [r for r in _collect_any_units(facts_json, tax, name) if r['unit'] == 'shares']
    if not rows:
        return pd.DataFrame(columns=['date','shares_outstanding'])
    df = (pd.DataFrame(rows)
            .sort_values('date')
            .drop_duplicates('date', keep='last')
            .rename(columns={'val':'shares_outstanding'})[['date','shares_outstanding']])
    return df

def extract_public_float_any_currency(facts_json: dict) -> pd.DataFrame:
    rows = _collect_any_units(facts_json, 'dei', 'EntityPublicFloat')
    if not rows:
        return pd.DataFrame(columns=['date','entity_public_float_val','entity_public_float_unit'])
    df = (pd.DataFrame(rows)
            .sort_values('date')
            .drop_duplicates('date', keep='last')
            .rename(columns={'val':'entity_public_float_val','unit':'entity_public_float_unit'})
            [['date','entity_public_float_val','entity_public_float_unit']])
    return df

# 2) Build events from cached files only
parts = []
tk_to_cik = dict(zip(fundamentals['ticker'].astype(str), fundamentals['cik']))
for tkr, cik in tk_to_cik.items():
    if not isinstance(cik, str) or pd.isna(cik):
        continue
    fp = SEC_FACTS_DIR / f"{str(cik).zfill(10)}.json"
    if not fp.exists():
        continue
    try:
        facts = json.loads(fp.read_text())
    except Exception:
        continue
    sh = extract_shares_expanded(facts)
    pf = extract_public_float_any_currency(facts)
    if sh.empty and pf.empty:
        continue
    ev = sh.merge(pf, on='date', how='outer').sort_values('date')
    ev['ticker'] = str(tkr)
    parts.append(ev[['date','ticker','shares_outstanding','entity_public_float_val','entity_public_float_unit']])

sec_enriched = (pd.concat([p for p in parts if not p.empty], ignore_index=True)
                if parts else pd.DataFrame(columns=['date','ticker','shares_outstanding','entity_public_float_val','entity_public_float_unit']))

print("SEC-enriched rows:", len(sec_enriched), "| tickers:", (sec_enriched['ticker'].nunique() if not sec_enriched.empty else 0))

# 3) Find your adjusted-close column (supports 'adj close' or 'adj_close', etc.)
adj_candidates = ['adj close','adj_close','Adj Close','adjClose']
adj_col = next((c for c in adj_candidates if c in analysis_enriched.columns), None)
if adj_col is None:
    raise KeyError("Couldn't find an adjusted-close column in analysis_enriched. "
                   "Looked for: " + ", ".join(adj_candidates))

price_ev = (analysis_enriched[['date','ticker',adj_col]]
            .dropna()
            .rename(columns={adj_col:'adj_close'})
            .astype({'ticker':str})
            .assign(date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize())
            .sort_values(['ticker','date']))

# 4) Per-ticker asof join (robust to any sorting/categorical issues)
sec_enriched = sec_enriched.assign(
    ticker=lambda d: d['ticker'].astype(str),
    date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize()
).dropna(subset=['date'])

events_list = []
for t, left_t in sec_enriched.groupby('ticker', sort=True):
    left_t = left_t.sort_values('date')
    right_t = price_ev[price_ev['ticker'] == t][['date','adj_close']]
    if right_t.empty:
        tmp = left_t.copy()
        tmp['adj_close'] = np.nan
    else:
        tmp = pd.merge_asof(left_t, right_t.sort_values('date'),
                            on='date', direction='backward', allow_exact_matches=True)
    tmp['ticker'] = t
    events_list.append(tmp)

sec_enriched_px = pd.concat(events_list, ignore_index=True) if events_list else pd.DataFrame(columns=['ticker','date','adj_close'])

# 5) Convert USD public float to shares, then merge into analysis_enriched (fill only gaps)
sec_enriched_px['float_shares_from_usd'] = np.where(
    (sec_enriched_px.get('entity_public_float_val').notna()) &
    (sec_enriched_px.get('entity_public_float_unit') == 'USD') &
    (sec_enriched_px['adj_close'].notna()) &
    (sec_enriched_px['adj_close'] != 0),
    sec_enriched_px['entity_public_float_val'] / sec_enriched_px['adj_close'],
    np.nan
)

merged = analysis_enriched.merge(
    sec_enriched_px[['ticker','date','shares_outstanding','float_shares_from_usd']],
    on=['ticker','date'], how='left', suffixes=('','_sec')
)

# Fill where missing
if 'shares_outstanding_sec' in merged.columns:
    mask_sh = merged['shares_outstanding'].isna() & merged['shares_outstanding_sec'].notna()
    merged.loc[mask_sh, 'shares_outstanding'] = merged.loc[mask_sh, 'shares_outstanding_sec']
    merged = merged.drop(columns=['shares_outstanding_sec'])

if 'float_shares' not in merged.columns:
    merged['float_shares'] = np.nan
if 'float_shares_from_usd' in merged.columns:
    mask_ff = merged['float_shares'].isna() & merged['float_shares_from_usd'].notna()
    merged.loc[mask_ff, 'float_shares'] = merged.loc[mask_ff, 'float_shares_from_usd']
    merged = merged.drop(columns=['float_shares_from_usd'])

# Alias + forward fill within ticker
merged['free_float'] = merged['float_shares']
for c in ['shares_outstanding','float_shares','free_float']:
    merged[c] = merged.groupby('ticker')[c].ffill()

analysis_enriched = merged

# 6) Save new artifact
analysis_enriched.to_parquet(OUT / "analysis_enriched_v8.parquet")
analysis_enriched.to_csv(OUT / "analysis_enriched_v8.csv", index=False)

cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage after SEC enrichment -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

  sec_enriched = (pd.concat([p for p in parts if not p.empty], ignore_index=True)


SEC-enriched rows: 14067 | tickers: 256
Coverage after SEC enrichment -> shares_outstanding: 3668 | float_shares: 2752


In [72]:
# === YF float-only: cache/NO_NETWORK apply (standalone safe) ===
from pathlib import Path
import pandas as pd, numpy as np, json

OUT = Path("outputs")
YF_CACHE = OUT / "yf_snapshot_cache"
P_YQ_FLOAT_SNAP = OUT / "yq_float_snapshot.parquet"

# Ensure flags exist (safe defaults)
YQ_FLOAT_CACHE_ONLY = globals().get('YQ_FLOAT_CACHE_ONLY', False)
NO_NETWORK          = globals().get('NO_NETWORK', False)

# Recompute who is missing float_shares
need_float = (analysis_enriched.groupby('ticker')['float_shares']
                .apply(lambda s: s.notna().any())
                .pipe(lambda s: sorted(s.index[~s].tolist())))
print("Tickers missing float_shares:", len(need_float))

if not need_float:
    print("Skipping YF float-only backfill (nothing missing).")

elif YQ_FLOAT_CACHE_ONLY or NO_NETWORK:
    # --- helpers ---
    def _read_snap():
        """Load consolidated snapshot if present; return DataFrame or empty."""
        if not P_YQ_FLOAT_SNAP.exists():
            return pd.DataFrame()
        try:
            df = pd.read_parquet(P_YQ_FLOAT_SNAP)
        except Exception:
            return pd.DataFrame()
        if 'ticker' not in df.columns:  # ticker stored as index? move to column
            df = df.reset_index().rename(columns={'index': 'ticker', 'symbol': 'ticker'})
        df['ticker'] = df['ticker'].astype(str).str.upper()
        return df

    def _rebuild_snap_from_cache():
        """Build snapshot from local per-ticker JSON files only (no network)."""
        rows = []
        if YF_CACHE.exists():
            for p in YF_CACHE.glob("*.json"):
                try:
                    j = json.loads(p.read_text())
                    tkr = p.stem.upper()
                    # Accept any plausible key name and normalize to float_shares_yf
                    v = j.get('float_shares_yf')
                    if v is None: v = j.get('float_shares')
                    if v is None: v = j.get('floatShares')
                    if v is not None:
                        rows.append({'ticker': tkr, 'float_shares_yf': float(v)})
                except Exception:
                    pass
        snap = pd.DataFrame(rows)
        if not snap.empty:
            snap = snap.drop_duplicates('ticker', keep='last')
            try: snap.to_parquet(P_YQ_FLOAT_SNAP)
            except Exception: pass
            print(f"[YF float] Rebuilt snapshot from cache: {len(snap)} rows.")
        else:
            print("[YF float] No snapshot and no cache contents — nothing to apply.")
        return snap

    # Try to read snapshot; if empty, rebuild from local cache
    snap = _read_snap()
    if snap.empty:
        snap = _rebuild_snap_from_cache()

    if not snap.empty:
        # Find usable float column
        float_col = next((c for c in ['float_shares_yf', 'float_shares', 'free_float', 'floatShares']
                          if c in snap.columns), None)
        if float_col is None:
            print("[YF float] Snapshot lacks a usable float column; skipping apply.")
        else:
            mapper = (snap.dropna(subset=[float_col])
                        .drop_duplicates('ticker', keep='last')
                        .set_index('ticker')[float_col].astype(float).to_dict())

            m = analysis_enriched['float_shares'].isna()
            analysis_enriched.loc[m, 'float_shares'] = analysis_enriched.loc[m, 'ticker'].map(mapper)

            if 'free_float' not in analysis_enriched.columns:
                analysis_enriched['free_float'] = np.nan
            analysis_enriched['free_float'] = analysis_enriched['free_float'].fillna(analysis_enriched['float_shares'])

            for c in ['float_shares', 'free_float']:
                analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')
                analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

            print("[YF float] Applied cached snapshot (no network).")
    # else: nothing to do (message already printed)

else:
    print("Cache-only flags are off; run your networked YF backfill cell instead.")

Tickers missing float_shares: 2369
[YF float] Applied cached snapshot (no network).


In [73]:
# === T2b (robust): FX-aware SEC public float -> float_shares (per-currency joins) ===
from pathlib import Path
import pandas as pd, numpy as np, json, re
import yfinance as yf

OUT = Path("outputs")
SEC_FACTS_DIR = OUT / "sec_facts"
assert SEC_FACTS_DIR.exists(), "Missing outputs/sec_facts; run the SEC companyfacts step first."
assert 'analysis_enriched' in globals(), "analysis_enriched not found. Run finalize/backfill first."

# ---------- helpers ----------
def _collect_any_units(facts, tax, name):
    try:
        units = facts['facts'][tax][name]['units']
    except Exception:
        return []
    out = []
    for unit, arr in (units or {}).items():
        for d in arr or []:
            end = d.get('end'); val = d.get('val')
            if end and val is not None:
                out.append({
                    'date': pd.to_datetime(end).tz_localize(None).normalize(),
                    'val' : float(val),
                    'unit': str(unit).upper()
                })
    return out

def extract_public_float_any_currency(facts_json: dict) -> pd.DataFrame:
    rows = _collect_any_units(facts_json, 'dei', 'EntityPublicFloat')
    if not rows:
        return pd.DataFrame(columns=['date','entity_public_float_val','entity_public_float_unit'])
    df = (pd.DataFrame(rows)
            .sort_values('date')
            .drop_duplicates('date', keep='last')
            .rename(columns={'val':'entity_public_float_val','unit':'entity_public_float_unit'})
            [['date','entity_public_float_val','entity_public_float_unit']])
    return df

def get_fx_usd_series(cur: str, start: pd.Timestamp, end: pd.Timestamp) -> pd.DataFrame | None:
    """
    Return a daily series with columns ['date','fx_usd'] representing USD per 1 CUR.
    Tries 'CURUSD=X' first; if missing, tries 'USDCUR=X' and inverts.
    """
    cur = cur.upper()
    pairs = [(f"{cur}USD=X", False), (f"USD{cur}=X", True)]  # (symbol, invert)
    for sym, invert in pairs:
        try:
            h = yf.download(
                tickers=sym,
                start=start.date().isoformat(),
                end=(end + pd.Timedelta(days=1)).date().isoformat(),
                interval="1d",
                auto_adjust=True,
                progress=False
            )
        except Exception:
            h = None
        if h is None or h.empty:
            continue

        if isinstance(h.columns, pd.MultiIndex):
            # multi-index (Close under sym)
            if 'Close' in h.columns.get_level_values(-1):
                sub = h.xs('Close', axis=1, level=-1).iloc[:, 0].to_frame('fx')
            else:
                continue
        else:
            # single-index
            if 'Close' not in h.columns:
                continue
            sub = h[['Close']].rename(columns={'Close':'fx'})

        if sub.empty:
            continue
        sub = sub.reset_index().rename(columns={'Date':'date'})
        sub['date'] = pd.to_datetime(sub['date']).dt.tz_localize(None).dt.normalize()
        fx = sub['fx'].astype(float)
        if invert:
            # sym is USD/CUR -> convert to USD per 1 CUR
            fx = 1.0 / fx.replace(0, np.nan)
        out = pd.DataFrame({'date': sub['date'], 'fx_usd': fx})
        out = out.dropna(subset=['fx_usd']).sort_values('date').drop_duplicates('date', keep='last')
        if not out.empty:
            return out
    return None

# ---------- rebuild SEC public-float table from cache (fast, no network) ----------
parts = []
tk_to_cik = dict(zip(fundamentals['ticker'].astype(str), fundamentals['cik']))
for tkr, cik in tk_to_cik.items():
    if not isinstance(cik, str) or pd.isna(cik):
        continue
    fp = SEC_FACTS_DIR / f"{str(cik).zfill(10)}.json"
    if not fp.exists():
        continue
    try:
        facts = json.loads(fp.read_text())
    except Exception:
        continue
    pf = extract_public_float_any_currency(facts)
    if pf.empty:
        continue
    pf['ticker'] = str(tkr)
    parts.append(pf)

sec_float = (pd.concat(parts, ignore_index=True)
             if parts else pd.DataFrame(columns=['date','entity_public_float_val','entity_public_float_unit','ticker']))

if sec_float.empty:
    print("No SEC public-float facts found; nothing to do.")
else:
    # keep only non-USD floats with sane 3-letter ISO unit codes
    sec_float = sec_float.assign(
        ticker=lambda d: d['ticker'].astype(str),
        date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize(),
        unit=lambda d: d['entity_public_float_unit'].astype(str).str.upper()
    )
    non_usd = sec_float[(sec_float['unit'] != 'USD') & sec_float['unit'].str.fullmatch(r'[A-Z]{3}')].copy()

    if non_usd.empty:
        print("All SEC floats are USD; nothing to add.")
    else:
        # date window for FX
        start = non_usd['date'].min() - pd.Timedelta(days=5)
        end   = non_usd['date'].max() + pd.Timedelta(days=2)

        # build FX joined table per-currency (avoids asof sorting errors with 'by=')
        fx_joined = []
        for cur, grp in non_usd.groupby('unit', sort=True):
            fx = get_fx_usd_series(cur, start, end)
            g  = grp[['date','ticker','entity_public_float_val','unit']].sort_values('date').copy()
            if fx is None or fx.empty:
                g['fx_usd'] = np.nan
            else:
                g = pd.merge_asof(g, fx, on='date', direction='backward', allow_exact_matches=True)
            g['unit'] = cur
            fx_joined.append(g)
        non_usd_px = (pd.concat(fx_joined, ignore_index=True)
                      if fx_joined else pd.DataFrame(columns=['date','ticker','entity_public_float_val','unit','fx_usd']))

        # adjusted-close column name
        adj_candidates = ['adj close','adj_close','Adj Close','adjClose']
        adj_col = next((c for c in adj_candidates if c in analysis_enriched.columns), None)
        if adj_col is None:
            raise KeyError("Couldn't find an adjusted-close column in analysis_enriched.")

        price_ev = (analysis_enriched[['date','ticker',adj_col]]
                    .dropna()
                    .rename(columns={adj_col:'adj_close'})
                    .astype({'ticker':str})
                    .assign(date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize())
                    .sort_values(['ticker','date']))

        # per-ticker asof join to price
        rows = []
        for t, grp in non_usd_px.groupby('ticker', sort=True):
            left_t = grp.sort_values('date')
            right_t = price_ev[price_ev['ticker'] == t][['date','adj_close']].sort_values('date')
            if right_t.empty:
                tmp = left_t.copy(); tmp['adj_close'] = np.nan
            else:
                tmp = pd.merge_asof(left_t, right_t, on='date', direction='backward', allow_exact_matches=True)
            tmp['ticker'] = t
            rows.append(tmp)
        non_usd_full = (pd.concat(rows, ignore_index=True)
                        if rows else pd.DataFrame(columns=['ticker','date','entity_public_float_val','fx_usd','adj_close']))

        # convert: (float_value_in_unit * USD_per_unit) / price
        non_usd_full['float_shares_from_fx'] = np.where(
            non_usd_full['entity_public_float_val'].notna() &
            non_usd_full['fx_usd'].notna() &
            non_usd_full['adj_close'].notna() &
            (non_usd_full['adj_close'] != 0),
            (non_usd_full['entity_public_float_val'] * non_usd_full['fx_usd']) / non_usd_full['adj_close'],
            np.nan
        )

        # merge into analysis_enriched (fill only gaps) and ffill by ticker
        merged = analysis_enriched.merge(
            non_usd_full[['ticker','date','float_shares_from_fx']],
            on=['ticker','date'], how='left', suffixes=('','_fx')
        )
        if 'float_shares' not in merged.columns:
            merged['float_shares'] = np.nan
        mask_ff = merged['float_shares'].isna() & merged['float_shares_from_fx'].notna()
        merged.loc[mask_ff, 'float_shares'] = merged.loc[mask_ff, 'float_shares_from_fx']
        merged = merged.drop(columns=['float_shares_from_fx'])

        merged['free_float'] = merged['float_shares']
        for c in ['shares_outstanding','float_shares','free_float']:
            merged[c] = merged.groupby('ticker')[c].ffill()

        analysis_enriched = merged

        # save as v9
        analysis_enriched.to_parquet(OUT / "analysis_enriched_v9.parquet")
        analysis_enriched.to_csv(OUT / "analysis_enriched_v9.csv", index=False)

        cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
        cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
        print("Coverage after FX float enrichment -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

Coverage after FX float enrichment -> shares_outstanding: 3668 | float_shares: 3966


In [74]:
# === T2c: SEC us-gaap:PublicFloat -> float_shares (fills only where missing) ===
from pathlib import Path
import pandas as pd, numpy as np, json

OUT = Path("outputs")
SEC_FACTS_DIR = OUT / "sec_facts"
assert SEC_FACTS_DIR.exists(), "Missing outputs/sec_facts; run the SEC companyfacts step first."
assert 'analysis_enriched' in globals(), "analysis_enriched not found."

def _collect_any_units(facts, tax, name):
    try:
        units = facts['facts'][tax][name]['units']
    except Exception:
        return []
    out = []
    for unit, arr in (units or {}).items():
        for d in arr or []:
            end = d.get('end'); val = d.get('val')
            if end and val is not None:
                out.append({
                    'date': pd.to_datetime(end).tz_localize(None).normalize(),
                    'val' : float(val),
                    'unit': str(unit).upper()
                })
    return out

def extract_public_float_usgaap_any_currency(facts_json: dict) -> pd.DataFrame:
    rows = _collect_any_units(facts_json, 'us-gaap', 'PublicFloat')
    if not rows:
        return pd.DataFrame(columns=['date','public_float_val','public_float_unit'])
    df = (pd.DataFrame(rows)
            .sort_values('date')
            .drop_duplicates('date', keep='last')
            .rename(columns={'val':'public_float_val','unit':'public_float_unit'})
            [['date','public_float_val','public_float_unit']])
    return df

# Build from cache only (fast)
parts = []
tk_to_cik = dict(zip(fundamentals['ticker'].astype(str), fundamentals['cik']))
for tkr, cik in tk_to_cik.items():
    if not isinstance(cik, str) or pd.isna(cik): 
        continue
    fp = SEC_FACTS_DIR / f"{str(cik).zfill(10)}.json"
    if not fp.exists(): 
        continue
    try:
        facts = json.loads(fp.read_text())
    except Exception:
        continue
    pf = extract_public_float_usgaap_any_currency(facts)
    if pf.empty:
        continue
    pf['ticker'] = str(tkr)
    parts.append(pf)

usgaap_float = (pd.concat(parts, ignore_index=True)
                if parts else pd.DataFrame(columns=['date','public_float_val','public_float_unit','ticker']))

if usgaap_float.empty:
    print("No us-gaap:PublicFloat found; nothing to add.")
else:
    # We already handled USD & FX for dei:EntityPublicFloat.
    # Use the SAME conversion logic: value in USD / adj_close => shares.
    # First identify the adjusted-close column
    adj_candidates = ['adj close','adj_close','Adj Close','adjClose']
    adj_col = next((c for c in adj_candidates if c in analysis_enriched.columns), None)
    if adj_col is None:
        raise KeyError("Couldn't find an adjusted-close column in analysis_enriched.")

    usgaap_float = (usgaap_float
        .assign(
            ticker=lambda d: d['ticker'].astype(str),
            date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize(),
            unit=lambda d: d['public_float_unit'].astype(str).upper()
        )
        .sort_values(['ticker','date'])
    )

    # USD-only conversion here; (optional) you could port the FX step to us-gaap too.
    usd_only = usgaap_float[usgaap_float['unit'] == 'USD'].copy()
    price_ev = (analysis_enriched[['date','ticker',adj_col]]
                .dropna()
                .rename(columns={adj_col:'adj_close'})
                .astype({'ticker':str})
                .assign(date=lambda d: pd.to_datetime(d['date']).dt.tz_localize(None).dt.normalize())
                .sort_values(['ticker','date']))

    # per-ticker asof join for robustness
    rows = []
    for t, grp in usd_only.groupby('ticker', sort=True):
        left_t = grp.sort_values('date')
        right_t = price_ev[price_ev['ticker']==t][['date','adj_close']].sort_values('date')
        if right_t.empty:
            tmp = left_t.copy(); tmp['adj_close'] = np.nan
        else:
            tmp = pd.merge_asof(left_t, right_t, on='date', direction='backward', allow_exact_matches=True)
        tmp['ticker'] = t
        rows.append(tmp)
    usd_join = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame(columns=['ticker','date','public_float_val','adj_close'])

    usd_join['float_shares_from_usgaap'] = np.where(
        usd_join['public_float_val'].notna() &
        usd_join['adj_close'].notna() &
        (usd_join['adj_close'] != 0),
        usd_join['public_float_val'] / usd_join['adj_close'],
        np.nan
    )

    merged = analysis_enriched.merge(
        usd_join[['ticker','date','float_shares_from_usgaap']],
        on=['ticker','date'], how='left', suffixes=('','_usgaap')
    )
    if 'float_shares' not in merged.columns:
        merged['float_shares'] = np.nan
    mask_ff = merged['float_shares'].isna() & merged['float_shares_from_usgaap'].notna()
    merged.loc[mask_ff, 'float_shares'] = merged.loc[mask_ff, 'float_shares_from_usgaap']
    merged = merged.drop(columns=['float_shares_from_usgaap'])

    merged['free_float'] = merged['float_shares']
    for c in ['shares_outstanding','float_shares','free_float']:
        merged[c] = merged.groupby('ticker')[c].ffill()

    analysis_enriched = merged
    analysis_enriched.to_parquet(OUT / "analysis_enriched_v10.parquet")
    analysis_enriched.to_csv(OUT / "analysis_enriched_v10.csv", index=False)

    cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
    cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
    print("Coverage after us-gaap PublicFloat -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

No us-gaap:PublicFloat found; nothing to add.


In [75]:
# === T3r (robust + cached-first): normalize & apply Yahoo float snapshot ===
import time, json
from pathlib import Path
import numpy as np
import pandas as pd

# Optional: yahooquery import (not used for network here, but kept for completeness)
try:
    from yahooquery import Ticker as YQ
except Exception:
    YQ = None

assert 'analysis_enriched' in globals(), "analysis_enriched not found. Run the finalize/backfill first."

# Paths & flags
OUT = Path("outputs"); OUT.mkdir(parents=True, exist_ok=True)
P_YQ_FLOAT_SNAP = OUT / "yq_float_snapshot.parquet"
YQ_FLOAT_CACHE_ONLY = globals().get('YQ_FLOAT_CACHE_ONLY', False)
REBUILD_YQ_FLOAT   = globals().get('REBUILD_YQ_FLOAT', False)

# Ensure columns exist
for col in ['shares_outstanding', 'float_shares', 'free_float']:
    if col not in analysis_enriched.columns:
        analysis_enriched[col] = pd.NA

# Helper: normalize any snapshot schema to ['ticker','float_shares_yq']
def _normalize_snapshot(df: pd.DataFrame, ae: pd.DataFrame) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=['ticker','float_shares_yq'])
    # If ticker is index, move it to a column
    if 'ticker' not in df.columns:
        df = df.reset_index().rename(columns={'index':'ticker', 'symbol':'ticker'})
    df['ticker'] = df['ticker'].astype(str).str.upper()

    # 1) If it already has 'float_shares_yq', use it
    if 'float_shares_yq' in df.columns:
        out = df[['ticker','float_shares_yq']].copy()
        out['float_shares_yq'] = pd.to_numeric(out['float_shares_yq'], errors='coerce')
        return out.dropna(subset=['float_shares_yq']).drop_duplicates('ticker', keep='last')

    # 2) If it has another usable shares column, adopt it
    for alt in ['float_shares', 'free_float', 'floatShares', 'float_shares_yf']:
        if alt in df.columns:
            out = df[['ticker', alt]].copy().rename(columns={alt:'float_shares_yq'})
            out['float_shares_yq'] = pd.to_numeric(out['float_shares_yq'], errors='coerce')
            return out.dropna(subset=['float_shares_yq']).drop_duplicates('ticker', keep='last')

    # 3) If it only has float_percent, derive shares = float_percent * latest shares_outstanding
    if 'float_percent' in df.columns:
        tmp = df[['ticker','float_percent']].copy()
        tmp['float_percent'] = pd.to_numeric(tmp['float_percent'], errors='coerce')
        tmp = tmp.dropna(subset=['float_percent']).drop_duplicates('ticker', keep='last')

        # latest non-null shares_outstanding per ticker from analysis_enriched
        sh_last = (
            ae.dropna(subset=['shares_outstanding'])
              .sort_values(['ticker','date'])
              .groupby('ticker', as_index=False)
              .tail(1)[['ticker','shares_outstanding']]
              .assign(shares_outstanding=lambda d: pd.to_numeric(d['shares_outstanding'], errors='coerce'))
        )
        joined = tmp.merge(sh_last, on='ticker', how='left')
        joined['float_shares_yq'] = joined['float_percent'] * joined['shares_outstanding']
        out = joined[['ticker','float_shares_yq']]
        return out.dropna(subset=['float_shares_yq']).drop_duplicates('ticker', keep='last')

    # Nothing usable
    return pd.DataFrame(columns=['ticker','float_shares_yq'])

# ----------------- LOAD OR BUILD SNAPSHOT (cache-first, no network here) -----------------
if (not REBUILD_YQ_FLOAT) and P_YQ_FLOAT_SNAP.exists():
    raw_snap = pd.read_parquet(P_YQ_FLOAT_SNAP)
    print(f"[yq-float] loaded snapshot: {len(raw_snap)} rows from {P_YQ_FLOAT_SNAP.name}")
else:
    # If you previously saved a snapshot under a different schema or it's missing,
    # we just start with an empty frame; other cells can rebuild from local JSON cache.
    raw_snap = pd.DataFrame()
    print("[yq-float] no usable snapshot found (or REBUILD_YQ_FLOAT=True). Proceeding with empty snapshot.")

# Normalize whatever we have
snap = _normalize_snapshot(raw_snap, analysis_enriched)

# ----------------- APPLY TO ANALYSIS -----------------
if not snap.empty:
    snap_map = snap.set_index('ticker')['float_shares_yq'].to_dict()
    mask_ff = analysis_enriched['float_shares'].isna()
    analysis_enriched.loc[mask_ff, 'float_shares'] = analysis_enriched.loc[mask_ff, 'ticker'].map(snap_map)

# Keep free_float in sync and forward-fill within ticker
if 'free_float' not in analysis_enriched.columns:
    analysis_enriched['free_float'] = np.nan
analysis_enriched['free_float'] = analysis_enriched['free_float'].fillna(analysis_enriched['float_shares'])

for c in ['shares_outstanding','float_shares','free_float']:
    analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')
    analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

# Coverage + save
cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage after T3r -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

analysis_enriched.to_parquet(OUT / "analysis_enriched_v11.parquet")
analysis_enriched.to_csv(OUT / "analysis_enriched_v11.csv", index=False)

[yq-float] loaded snapshot: 1498 rows from yq_float_snapshot.parquet
Coverage after T3r -> shares_outstanding: 3668 | float_shares: 3966


In [76]:
# === Float resolver (vectorized + snapshot-first; robust sort & fallback) ===
from pathlib import Path
import pandas as pd, numpy as np, json

OUT = Path("outputs")
YF_CACHE = OUT / "yf_snapshot_cache"
P_YQ_FLOAT_SNAP = OUT / "yq_float_snapshot.parquet"

def _prep_events(df, colname):
    if df is None or df.empty or colname not in df.columns:
        return pd.DataFrame(columns=['ticker','date','value'])
    out = (
        df[['ticker','date',colname]]
        .rename(columns={colname:'value'})
        .dropna(subset=['value'])
        .assign(
            ticker=lambda d: d['ticker'].astype(str).str.upper(),
            date=lambda d: pd.to_datetime(d['date'], errors='coerce')
                              .dt.tz_localize(None).dt.normalize(),
        )
        .dropna(subset=['date'])
        .sort_values(['ticker','date'], kind='mergesort')
        .drop_duplicates(['ticker','date'], keep='last')
    )
    # avoid categorical dtypes
    if pd.api.types.is_categorical_dtype(out['ticker']):
        out['ticker'] = out['ticker'].astype(str)
    return out

# 1) Build a single time-varying “float events” table from everything we already computed
sources = []
if 'events_enriched' in globals():  sources.append(_prep_events(events_enriched, 'float_shares'))
if 'sec_enriched_px' in globals():  sources.append(_prep_events(sec_enriched_px, 'float_shares_from_usd'))
if 'fx_float_enriched' in globals(): sources.append(_prep_events(fx_float_enriched, 'float_shares_fx'))

float_events = (
    pd.concat([s for s in sources if not s.empty], ignore_index=True)
    if any(not s.empty for s in sources) else
    pd.DataFrame(columns=['ticker','date','value'])
)

# 2) Prep analysis_enriched (only rows that actually need filling)
ae = analysis_enriched.copy()
ae['ticker'] = ae['ticker'].astype(str).str.upper()
ae['date']   = pd.to_datetime(ae['date'], errors='coerce').dt.tz_localize(None).dt.normalize()
ae = ae.dropna(subset=['date'])
if 'float_shares' not in ae.columns: ae['float_shares'] = np.nan

need_mask = ae['float_shares'].isna()
left = (ae.loc[need_mask, ['ticker','date']]
          .astype({'ticker':'string'})
          .sort_values(['ticker','date'], kind='mergesort'))

# 3) Vectorized as-of merge across ALL tickers, with defensive sorting and dtype alignment
if not float_events.empty and not left.empty:
    right = (float_events.astype({'ticker':'string'})
                        .sort_values(['ticker','date'], kind='mergesort'))
    try:
        tmp = pd.merge_asof(
            left=left, right=right,
            by='ticker', on='date',
            direction='backward', allow_exact_matches=True
        )
        ae.loc[left.index, '__float_ev'] = tmp['value'].to_numpy()
    except ValueError:
        # Fallback: only for the few tickers that need it (keeps it fast)
        ae['__float_ev'] = np.nan
        for t in left['ticker'].unique():
            L = left[left['ticker'] == t]
            R = right[right['ticker'] == t]
            if R.empty:
                continue
            # both sides already sorted; do a vectorized searchsorted
            ldates = L['date'].to_numpy()
            rdates = R['date'].to_numpy()
            rval   = R['value'].to_numpy()
            idx = np.searchsorted(rdates, ldates, side='right') - 1
            vals = np.where(idx >= 0, rval[idx], np.nan)
            ae.loc[L.index, '__float_ev'] = vals
else:
    ae['__float_ev'] = np.nan

# Apply event values where missing
mask_ev = ae['float_shares'].isna() & ae['__float_ev'].notna()
ae.loc[mask_ev, 'float_shares'] = ae.loc[mask_ev, '__float_ev']
ae.drop(columns=['__float_ev'], inplace=True, errors='ignore')

# 4) If still missing, try a consolidated snapshot first; rebuild from local cache only if needed
def _load_snapshot():
    if not P_YQ_FLOAT_SNAP.exists():
        return pd.DataFrame()
    try:
        df = pd.read_parquet(P_YQ_FLOAT_SNAP)
        if 'ticker' not in df.columns:  # in case saved with ticker as index
            df = df.reset_index().rename(columns={'index':'ticker'})
        return df.assign(ticker=lambda d: d['ticker'].astype(str).str.upper())
    except Exception:
        return pd.DataFrame()

def _rebuild_snapshot_from_cache():
    rows = []
    if YF_CACHE.exists():
        for p in YF_CACHE.glob("*.json"):
            try:
                j = json.loads(p.read_text())
                tkr = p.stem.upper()
                v = j.get('float_shares_yf') or j.get('float_shares') or j.get('floatShares')
                if v is not None:
                    rows.append({'ticker': tkr, 'float_shares_yf': float(v)})
            except Exception:
                pass
    snap = pd.DataFrame(rows).drop_duplicates('ticker', keep='last') if rows else pd.DataFrame()
    if not snap.empty:
        try: snap.to_parquet(P_YQ_FLOAT_SNAP, index=False)
        except Exception: pass
    return snap

if ae['float_shares'].isna().any():
    snap = _load_snapshot()
    if snap.empty:
        snap = _rebuild_snapshot_from_cache()
    if not snap.empty:
        float_col = next((c for c in ['float_shares_yf','float_shares','free_float','floatShares'] if c in snap.columns), None)
        if float_col:
            mapper = (snap.dropna(subset=[float_col])
                        .drop_duplicates('ticker', keep='last')
                        .set_index('ticker')[float_col].astype(float).to_dict())
            m = ae['float_shares'].isna()
            ae.loc[m, 'float_shares'] = ae.loc[m, 'ticker'].map(mapper)

# 5) free_float alias + forward fill
if 'free_float' not in ae.columns:
    ae['free_float'] = np.nan
ae['free_float'] = ae['free_float'].fillna(ae['float_shares'])

for c in ['float_shares','free_float','shares_outstanding']:
    ae[c] = pd.to_numeric(ae[c], errors='coerce')
    ae[c] = ae.groupby('ticker')[c].ffill()

analysis_enriched = ae

# quick peek and coverage
print("AAPL after float resolver:")
display(analysis_enriched.loc[analysis_enriched['ticker']=='AAPL',
                              ['date','shares_outstanding','float_shares','free_float']].head(12))
cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

  if pd.api.types.is_categorical_dtype(out['ticker']):
  if pd.api.types.is_categorical_dtype(out['ticker']):


AAPL after float resolver:


Unnamed: 0,date,shares_outstanding,float_shares,free_float
12016,2022-10-03,,1.481427e+10,1.481427e+10
12017,2022-10-04,,1.481427e+10,1.481427e+10
12018,2022-10-05,,1.481427e+10,1.481427e+10
12019,2022-10-06,,1.481427e+10,1.481427e+10
12020,2022-10-07,,1.481427e+10,1.481427e+10
...,...,...,...,...
12023,2022-10-12,,1.481427e+10,1.481427e+10
12024,2022-10-13,,1.481427e+10,1.481427e+10
12025,2022-10-14,1.590812e+10,1.481427e+10,1.481427e+10
12026,2022-10-17,1.590812e+10,1.481427e+10,1.481427e+10


Coverage -> shares_outstanding: 3668 | float_shares: 3966


In [77]:
# === YF float-only backfill (with snapshot+cache guard; idempotent) ===
from pathlib import Path
import pandas as pd, numpy as np, json, time
import yfinance as yf

assert 'analysis_enriched' in globals(), "Run the finalize/SEC steps first."

OUT = Path("outputs"); OUT.mkdir(parents=True, exist_ok=True)
YF_CACHE = OUT / "yf_snapshot_cache"; YF_CACHE.mkdir(parents=True, exist_ok=True)
P_YQ_FLOAT_SNAP = OUT / "yq_float_snapshot.parquet"

# Defaults if not already defined in your bootstrap
if 'YQ_FLOAT_CACHE_ONLY' not in globals(): YQ_FLOAT_CACHE_ONLY = False
if 'NO_NETWORK' not in globals():          NO_NETWORK = False

def _safe_float(x):
    try: return float(x)
    except Exception: return None

def _read_cache(sym):
    p = YF_CACHE / f"{sym}.json"
    if p.exists():
        try: return json.loads(p.read_text())
        except Exception: return {}
    return {}

def _write_cache(sym, d):
    p = YF_CACHE / f"{sym}.json"
    try: p.write_text(json.dumps(d))
    except Exception: pass

def fetch_yf_float_only(sym: str):
    """Return {'ticker','float_shares_yf'} and update per-ticker cache."""
    t = str(sym).upper()

    # cache first
    cached = _read_cache(t)
    if 'float_shares_yf' in cached and cached['float_shares_yf'] is not None:
        return {'ticker': t, 'float_shares_yf': cached['float_shares_yf']}

    out = {'ticker': t, 'float_shares_yf': None}
    try:
        tk = yf.Ticker(t)
        info = {}
        try:
            info = tk.info or {}
        except Exception:
            info = {}

        flt = _safe_float(info.get('floatShares'))
        # fallback from insiders %
        if flt is None:
            shares = _safe_float(info.get('sharesOutstanding'))
            insiders = _safe_float(info.get('heldPercentInsiders'))
            if shares is not None and insiders is not None:
                flt = (1.0 - insiders) * shares

        # last resort: if your SEC event pipeline produced float_shares, use latest
        if flt is None and 'events_enriched' in globals():
            ee = events_enriched.query("ticker == @t")
            if not ee.empty and 'float_shares' in ee and ee['float_shares'].notna().any():
                flt = _safe_float(ee['float_shares'].dropna().iloc[-1])

        out['float_shares_yf'] = flt
    except Exception:
        pass

    merged = _read_cache(t)
    merged.update(out)
    _write_cache(t, merged)
    time.sleep(0.10)
    return out

# 1) who is missing float_shares?
need_float = (analysis_enriched.groupby('ticker')['float_shares']
                .apply(lambda s: s.notna().any())
                .pipe(lambda s: sorted(s.index[~s].tolist())))
print("Tickers missing float_shares:", len(need_float))

# 2) Fast exits / cache-only mode
if not need_float:
    print("[YF float] Nothing missing — skipping.")
elif YQ_FLOAT_CACHE_ONLY or NO_NETWORK:
    if P_YQ_FLOAT_SNAP.exists():
        snap = pd.read_parquet(P_YQ_FLOAT_SNAP)
        if not snap.empty:
            mapper = snap.set_index('ticker')['float_shares_yf'].to_dict()
            m = analysis_enriched['float_shares'].isna()
            analysis_enriched.loc[m, 'float_shares'] = analysis_enriched.loc[m, 'ticker'].map(mapper)

            if 'free_float' not in analysis_enriched.columns:
                analysis_enriched['free_float'] = np.nan
            analysis_enriched['free_float'] = analysis_enriched['free_float'].fillna(analysis_enriched['float_shares'])

            for c in ['float_shares','free_float']:
                analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')
                analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

            print("[YF float] Applied cached snapshot; no network.")
    else:
        print("[YF float] Cache-only but no snapshot found — skipped by design.")
else:
    # 3) Networked harvest (only for those still missing)
    rows = []
    for i, sym in enumerate(need_float, 1):
        rows.append(fetch_yf_float_only(sym))
        if i % 200 == 0:
            print(f"... processed {i}/{len(need_float)}")

    yf_float_df = (pd.DataFrame(rows)
                     .dropna(subset=['float_shares_yf'])
                     .drop_duplicates('ticker'))
    print("New float entries:", len(yf_float_df))

    # Save/merge consolidated snapshot for future cache-only runs
    prev = pd.read_parquet(P_YQ_FLOAT_SNAP) if P_YQ_FLOAT_SNAP.exists() else pd.DataFrame(columns=['ticker','float_shares_yf'])
    snap = (pd.concat([prev, yf_float_df], ignore_index=True)
              .dropna(subset=['ticker'])
              .drop_duplicates('ticker', keep='last'))
    snap.to_parquet(P_YQ_FLOAT_SNAP)
    print(f"[YF float] Snapshot saved -> {P_YQ_FLOAT_SNAP.name} ({len(snap)} tickers).")

    # Apply to analysis_enriched + free_float + ffill
    if not yf_float_df.empty:
        mapper = snap.set_index('ticker')['float_shares_yf'].to_dict()
        mask = analysis_enriched['float_shares'].isna()
        analysis_enriched.loc[mask, 'float_shares'] = analysis_enriched.loc[mask, 'ticker'].map(mapper)

        if 'free_float' not in analysis_enriched.columns:
            analysis_enriched['free_float'] = np.nan
        analysis_enriched['free_float'] = analysis_enriched['free_float'].fillna(analysis_enriched['float_shares'])

        for c in ['float_shares','free_float']:
            analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')
            analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

# Coverage summary
cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

Tickers missing float_shares: 1155
[YF float] Applied cached snapshot; no network.
Coverage -> shares_outstanding: 3668 | float_shares: 3966


In [78]:
# === Targeted float fix for a ticker (e.g., AAPL) ===
import json, time
from pathlib import Path
import numpy as np
import pandas as pd
import yfinance as yf

assert 'analysis_enriched' in globals()

OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
YF_CACHE = OUT / "yf_snapshot_cache"; YF_CACHE.mkdir(parents=True, exist_ok=True)

def _safe_float(x):
    try: return float(x)
    except Exception: return None

def _read_cache(sym):
    p = YF_CACHE / f"{sym}.json"
    if p.exists():
        try: return json.loads(p.read_text())
        except Exception: return {}
    return {}

def _write_cache(sym, d):
    p = YF_CACHE / f"{sym}.json"
    try: p.write_text(json.dumps(d))
    except Exception: pass

def ensure_float_for(sym: str):
    t = str(sym).upper()

    # 1) check cache
    cache = _read_cache(t)
    float_cached = cache.get('float_shares_yf')

    # 2) fetch if missing
    if float_cached is None:
        flt = None
        try:
            tk = yf.Ticker(t)

            # direct floatShares from info
            info = {}
            try: info = tk.info or {}
            except Exception: info = {}

            flt = _safe_float(info.get('floatShares'))

            # fallback: free float ≈ (1 - insiders%) * sharesOutstanding
            if flt is None:
                shares   = _safe_float(info.get('sharesOutstanding'))
                insiders = _safe_float(info.get('heldPercentInsiders'))
                if shares is not None and insiders is not None:
                    flt = (1.0 - insiders) * shares
        except Exception:
            flt = None

        # update cache (preserve any other keys)
        cache.update({'float_shares_yf': flt})
        _write_cache(t, cache)
        float_cached = flt

    # 3) apply to analysis_enriched
    if float_cached is not None:
        mask = (analysis_enriched['ticker'].str.upper()==t) & (analysis_enriched['float_shares'].isna())
        analysis_enriched.loc[mask, 'float_shares'] = float(float_cached)

        if 'free_float' not in analysis_enriched.columns:
            analysis_enriched['free_float'] = np.nan
        analysis_enriched['free_float'] = analysis_enriched['free_float'].fillna(analysis_enriched['float_shares'])

        for c in ['float_shares','free_float']:
            analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')
            analysis_enriched[c] = analysis_enriched.groupby('ticker')[c].ffill()

    return float_cached

# --- run it for AAPL (change symbol if needed) ---
val = ensure_float_for("AAPL")
print("AAPL float from yfinance (or fallback):", val)

display(analysis_enriched.loc[analysis_enriched['ticker']=='AAPL',
                              ['date','shares_outstanding','float_shares','free_float']].head(12))

# coverage snapshot after fix
cov_sh = analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
cov_ff = analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum()
print("Coverage -> shares_outstanding:", cov_sh, "| float_shares:", cov_ff)

AAPL float from yfinance (or fallback): 14814270914.0


Unnamed: 0,date,shares_outstanding,float_shares,free_float
12016,2022-10-03,,1.481427e+10,1.481427e+10
12017,2022-10-04,,1.481427e+10,1.481427e+10
12018,2022-10-05,,1.481427e+10,1.481427e+10
12019,2022-10-06,,1.481427e+10,1.481427e+10
12020,2022-10-07,,1.481427e+10,1.481427e+10
...,...,...,...,...
12023,2022-10-12,,1.481427e+10,1.481427e+10
12024,2022-10-13,,1.481427e+10,1.481427e+10
12025,2022-10-14,1.590812e+10,1.481427e+10,1.481427e+10
12026,2022-10-17,1.590812e+10,1.481427e+10,1.481427e+10


Coverage -> shares_outstanding: 3668 | float_shares: 3966


In [79]:
need = {'shares_outstanding','float_shares','free_float','performed_split','performed_reverse_split'}
print("present:", need & set(analysis_enriched.columns))
print("tickers with any shares_outstanding:", 
      analysis_enriched.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum())
print("tickers with any float_shares:", 
      analysis_enriched.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum())

present: {'float_shares', 'free_float', 'performed_reverse_split', 'performed_split', 'shares_outstanding'}
tickers with any shares_outstanding: 3668
tickers with any float_shares: 3966


In [80]:
import re
import pandas as pd

# which tickers have neither series anywhere in analysis_enriched?
missing = (analysis_enriched.groupby('ticker')[['shares_outstanding','float_shares']]
           .apply(lambda df: pd.Series({'has_sh': df['shares_outstanding'].notna().any(),
                                        'has_ff': df['float_shares'].notna().any()}))
           .reset_index())
still_none = missing[(~missing['has_sh']) & (~missing['has_ff'])]['ticker'].astype(str)

warrant_like = still_none[still_none.str.match(r'.*(?:W|WS|WT|W[A-D]?|U|R)$')]
dollar_pref  = still_none[still_none.str.startswith('$')]
no_cik       = set(still_none) - set(fundamentals.dropna(subset=['cik'])['ticker'])

print("Still missing both:", len(still_none))
print("… warrant/unit-ish suffix:", len(warrant_like))
print("… dollar-prefixed/test-ish:", len(dollar_pref))
print("… with no CIK in fundamentals:", len(no_cik))

# peek a few examples
print("\nExamples (warrant/unit-ish):", list(warrant_like[:10]))
print("Examples (no CIK):", list(sorted(no_cik))[:10])

Still missing both: 1053
… warrant/unit-ish suffix: 157
… dollar-prefixed/test-ish: 0
… with no CIK in fundamentals: 1050

Examples (warrant/unit-ish): ['AADR', 'AAPU', 'AIRR', 'ALLW', 'AMDU', 'AMUU', 'AMZU', 'APACU', 'AQWA', 'ARVR']
Examples (no CIK): ['AADR', 'AALG', 'AAPB', 'AAPD', 'AAPU', 'AAUS', 'AAVM', 'AAXJ', 'ABCS', 'ABI']


In [81]:
analysis_enriched.sample(20)

Unnamed: 0,date,ticker,open,high,low,close,adj close,volume,ret,logret,...,cik,sic,sic_desc,recent_form,recent_filing_date,performed_split,performed_reverse_split,shares_outstanding,float_shares,free_float
3670610,2024-08-21,WABF,26.320000,26.320000,26.309999,26.309999,24.530266,200.0,0.002286,0.002283,...,,,,,,0,0,,,
2771432,2023-09-20,PULM,2.110000,2.190000,2.110000,2.170000,2.170000,14300.0,-0.013636,-0.013730,...,,,,,,0,0,3652285.0,3.652200e+06,3.652200e+06
73940,2024-02-13,ADTX,35700.000000,41500.000000,35700.000000,39000.000000,39000.000000,35.0,0.051213,0.049945,...,,,,,,0,0,441851.0,1.671270e+01,1.671270e+01
797437,2025-04-02,COLB,24.450001,25.139999,24.450001,25.120001,24.412327,1029100.0,0.014950,0.014839,...,,,,,,0,0,210112415.0,1.919472e+08,1.919472e+08
740991,2024-10-07,CLAR,4.370000,4.390000,4.290000,4.390000,4.274121,233500.0,0.002283,0.002281,...,,,,,,0,0,38362000.0,3.217769e+07,3.217769e+07
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
295227,2023-02-02,ATHR,,,,,,,,,...,0002026353,7372,Services-Prepackaged Software,10-Q,2025-08-19,0,0,,4.014718e+06,4.014718e+06
1149933,2023-05-11,EUDA,1.480000,1.480000,1.230000,1.290000,1.290000,103300.0,-0.116438,-0.123794,...,,,,,,0,0,20191770.0,1.840318e+07,1.840318e+07
1224182,2022-12-15,FDT,48.570000,48.590000,47.820000,48.020000,43.102501,45100.0,-0.022195,-0.022445,...,,,,,,0,0,,,
96852,2025-08-22,AFRIW,,,,,,,,,...,,,,,,0,0,26901592.0,3.281994e+06,3.281994e+06


In [82]:
need = {'shares_outstanding','float_shares','free_float','performed_split','performed_reverse_split'}
print("present:", need & set(analysis_enriched.columns))
print("missing:", need - set(analysis_enriched.columns))
print("rows:", len(analysis_enriched), "| tickers:", analysis_enriched['ticker'].nunique())

present: {'float_shares', 'free_float', 'performed_reverse_split', 'performed_split', 'shares_outstanding'}
missing: set()
rows: 3845871 | tickers: 5121


In [83]:
analysis_enriched.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3845871 entries, 0 to 3845870
Data columns (total 23 columns):
 #   Column                   Dtype         
---  ------                   -----         
 0   date                     datetime64[ns]
 1   ticker                   object        
 2   open                     float64       
 3   high                     float64       
 4   low                      float64       
 5   close                    float64       
 6   adj close                float64       
 7   volume                   float64       
 8   ret                      float64       
 9   logret                   float64       
 10  market_cap               float64       
 11  sector                   object        
 12  industry                 object        
 13  cik                      object        
 14  sic                      object        
 15  sic_desc                 object        
 16  recent_form              object        
 17  recent_filing_date       ob

Here’s a compact data dictionary for your current analysis_enriched (21 columns shown in your screenshot) 👇
* `date (datetime64[ns])` — Trading date (tz-naive).
* `ticker (object)` — Upper-cased symbol (e.g., AAPL).

Prices (Yahoo)
* `open (float64)` — Session open (split-adjusted).
* `high (float64)` — Session high (split-adjusted).
* `low (float64)` — Session low (split-adjusted).
* `close (float64)` — Session close (split-adjusted, not dividend-adjusted).
* `adj close (float64)` — Adjusted close (split and dividend adjusted). Use for return calcs.
* `volume (float64)` — Shares traded that day (float due to missing values).

Returns
* `ret (float64)` — Simple daily return from adj close: (\text{AdjClose}t / \text{AdjClose}{t-1}) - 1.
* `logret (float64)` — Log daily return: \ln(\text{AdjClose}t / \text{AdjClose}{t-1}).

Firm metadata
* `market_cap (float64)` — Latest market cap from Yahoo (generally USD). Snapshot value repeated across dates (not historical).
* `sector (object)` — Sector (Yahoo; backfilled from SEC SIC when missing).
* `industry (object)` — Industry (Yahoo; may be missing).
* `cik (object)` — 10-digit SEC Central Index Key.
* `sic (object)` — 4-digit SEC Standard Industrial Classification code (string).
* `sic_desc (object)` — Text description of the SIC code.
* `recent_form (object)` — Most recent SEC filing form (e.g., 10-K, 10-Q, 8-K) from cached SEC data.
* `recent_filing_date (object)` — Filing date as string; convert to datetime for comparisons.

Share supply
* `shares_outstanding (object → numeric)` — Outstanding shares (point-in-time). Built from SEC companyfacts when available, then backfilled via Yahoo snapshots; forward-filled within ticker.
* `float_shares (object → numeric)` — Tradable “free float” shares. From SEC public float (USD) converted to shares using price on/as-of filing date, else from Yahoo; forward-filled within ticker.
* `free_float (object → numeric)` — Alias of float_shares for convenience.

In [84]:
# Convert recent_filing_date to datetime
analysis_enriched['recent_filing_date'] = pd.to_datetime(
    analysis_enriched['recent_filing_date'], errors='coerce'
)

# Volume as nullable integer (if you prefer)
if 'volume' in analysis_enriched:
    analysis_enriched['volume'] = analysis_enriched['volume'].round().astype('Int64')

# Sector/industry as categories (saves memory, faster groupbys)
for col in ['sector', 'industry', 'recent_form', 'sic_desc']:
    if col in analysis_enriched:
        analysis_enriched[col] = analysis_enriched[col].astype('category')

for c in ['shares_outstanding', 'float_shares', 'free_float']:
    analysis_enriched[c] = pd.to_numeric(analysis_enriched[c], errors='coerce')

In [85]:
analysis_enriched.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3845871 entries, 0 to 3845870
Data columns (total 23 columns):
 #   Column                   Dtype         
---  ------                   -----         
 0   date                     datetime64[ns]
 1   ticker                   object        
 2   open                     float64       
 3   high                     float64       
 4   low                      float64       
 5   close                    float64       
 6   adj close                float64       
 7   volume                   Int64         
 8   ret                      float64       
 9   logret                   float64       
 10  market_cap               float64       
 11  sector                   category      
 12  industry                 category      
 13  cik                      object        
 14  sic                      object        
 15  sic_desc                 category      
 16  recent_form              category      
 17  recent_filing_date       da

In [86]:
# Quick summary after full run
try:
    print({
        "prices_shape": None if prices_raw is None else prices_raw.shape,
        "fundamentals_tickers": fundamentals['ticker'].nunique(),
        "analysis_rows": len(analysis_enriched),
    })
except Exception as e:
    print("Summary skipped:", e)

{'prices_shape': (751, 30726), 'fundamentals_tickers': 338, 'analysis_rows': 3845871}


In [87]:
aapl = analysis_enriched[analysis_enriched["ticker"] == "AAPL"]
aapl

Unnamed: 0,date,ticker,open,high,low,close,adj close,volume,ret,logret,...,cik,sic,sic_desc,recent_form,recent_filing_date,performed_split,performed_reverse_split,shares_outstanding,float_shares,free_float
12016,2022-10-03,AAPL,138.210007,143.070007,137.690002,142.449997,140.236282,114311700,,,...,,,,,NaT,1,0,,1.481427e+10,1.481427e+10
12017,2022-10-04,AAPL,145.029999,146.220001,144.259995,146.100006,143.829605,87830100,0.025623,0.025301,...,,,,,NaT,0,0,,1.481427e+10,1.481427e+10
12018,2022-10-05,AAPL,144.070007,147.380005,143.009995,146.399994,144.124924,79471000,0.002053,0.002051,...,,,,,NaT,0,0,,1.481427e+10,1.481427e+10
12019,2022-10-06,AAPL,145.809998,147.539993,145.220001,145.429993,143.169983,68402200,-0.006626,-0.006648,...,,,,,NaT,0,0,,1.481427e+10,1.481427e+10
12020,2022-10-07,AAPL,142.539993,143.100006,139.449997,140.089996,137.912994,85925600,-0.036719,-0.037410,...,,,,,NaT,0,0,,1.481427e+10,1.481427e+10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
12762,2025-09-24,AAPL,255.220001,255.740005,251.039993,252.309998,252.309998,42303700,-0.008332,-0.008367,...,,,,,NaT,0,0,1.484039e+10,1.591087e+10,1.591087e+10
12763,2025-09-25,AAPL,253.210007,257.170013,251.710007,256.869995,256.869995,55202100,0.018073,0.017912,...,,,,,NaT,0,0,1.484039e+10,1.591087e+10,1.591087e+10
12764,2025-09-26,AAPL,254.100006,257.600006,253.779999,255.460007,255.460007,46076300,-0.005489,-0.005504,...,,,,,NaT,0,0,1.484039e+10,1.591087e+10,1.591087e+10
12765,2025-09-29,AAPL,254.559998,255.000000,253.009995,254.429993,254.429993,40127700,-0.004032,-0.004040,...,,,,,NaT,0,0,1.484039e+10,1.591087e+10,1.591087e+10


In [88]:
# AAPL presence in the SEC-derived events
print("events_enriched rows for AAPL:",
      0 if 'events_enriched' not in globals() else
      len(events_enriched.query("ticker == 'AAPL'")))

# First/last dates we have for AAPL in analysis_enriched
ae_aapl = analysis_enriched.query("ticker == 'AAPL'")[['date']].agg(['min','max'])
print(ae_aapl)

events_enriched rows for AAPL: 145
          date
min 2022-10-03
max 2025-09-30


In [89]:
# === S_final (robust, per-ticker, cache-only): fill missing shares_outstanding ===
import pandas as pd, numpy as np, json
from pathlib import Path

assert 'analysis_enriched' in globals(), "Run your finalize cell so analysis_enriched exists."

OUT = Path("outputs")
YF_CACHE = OUT / "yf_snapshot_cache"

ae = analysis_enriched.copy()
ae['ticker'] = ae['ticker'].astype(str).str.upper()
ae['date']   = pd.to_datetime(ae['date'], errors='coerce').dt.tz_localize(None)
ae = ae.dropna(subset=['date']).sort_values(['ticker','date'])

before = ae.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()

# 1) Fill from SEC events (time-varying) per ticker if available
if 'events_enriched' in globals() and not events_enriched.empty:
    ev = events_enriched[['ticker','date','shares_outstanding']].copy()
    ev['ticker'] = ev['ticker'].astype(str).str.upper()
    ev['date']   = pd.to_datetime(ev['date'], errors='coerce').dt.tz_localize(None)
    ev = ev.dropna(subset=['date']).sort_values(['ticker','date'])

    parts = []
    g_ev = ev.groupby('ticker', sort=False)
    for tkr, left in ae.groupby('ticker', sort=False):
        left = left.sort_values('date').copy()
        right = g_ev.get_group(tkr).sort_values('date') if tkr in g_ev.groups else None

        if right is None or right.empty:
            parts.append(left); continue

        j = pd.merge_asof(
            left=left,
            right=right[['date','shares_outstanding']].rename(
                columns={'shares_outstanding':'shares_outstanding_ev'}
            ),
            on='date',
            direction='backward',
            allow_exact_matches=True
        )
        m = j['shares_outstanding'].isna() & j['shares_outstanding_ev'].notna()
        j.loc[m, 'shares_outstanding'] = j.loc[m, 'shares_outstanding_ev']
        j.drop(columns=['shares_outstanding_ev'], inplace=True, errors='ignore')
        parts.append(j)

    ae = pd.concat(parts, ignore_index=True)

# 2) Static snapshots from yfinance cache (no network)
if YF_CACHE.exists():
    rows = []
    for p in YF_CACHE.glob("*.json"):
        try:
            j = json.loads(p.read_text())
            v = j.get('shares_outstanding_yf')
            if v is not None:
                rows.append((p.stem.upper(), float(v)))
        except Exception:
            pass
    if rows:
        map_sh = dict(rows)
        mask = ae['shares_outstanding'].isna()
        ae.loc[mask, 'shares_outstanding'] = ae.loc[mask, 'ticker'].map(map_sh)

# 3) Last-resort estimate = market_cap / latest adj close (per ticker)
adj_col = next((c for c in ['adj close','adj_close','Adj Close'] if c in ae.columns), None)
if adj_col is not None and 'market_cap' in ae.columns:
    # find the index of latest row (max date) with a non-null adj close per ticker
    valid = ae.dropna(subset=[adj_col])
    if not valid.empty:
        idx_last = (valid.sort_values('date')
                         .groupby('ticker')['date']
                         .idxmax())
        last_px = (ae.loc[idx_last, ['ticker', adj_col]]
                     .rename(columns={adj_col: '_ref_px'}))
        map_px = last_px.set_index('ticker')['_ref_px'].to_dict()
        map_mc = (ae.drop_duplicates('ticker')
                    .set_index('ticker')['market_cap']
                    .to_dict())

        est = {}
        for t in ae['ticker'].unique():
            px = map_px.get(t); mc = map_mc.get(t)
            if px not in (None, 0, np.nan) and mc not in (None, 0, np.nan):
                est[t] = float(mc) / float(px)

        if est:
            mask = ae['shares_outstanding'].isna()
            ae.loc[mask, 'shares_outstanding'] = ae.loc[mask, 'ticker'].map(est)

# Final tidy: numeric + ffill within ticker
ae['shares_outstanding'] = pd.to_numeric(ae['shares_outstanding'], errors='coerce')
ae['shares_outstanding'] = ae.groupby('ticker')['shares_outstanding'].ffill()

after = ae.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum()
print(f"shares_outstanding coverage: {before} -> {after} tickers")

analysis_enriched = ae

# quick peek on first few rows (AACB/AAPL if present)
for t in ['AACB','AAPL']:
    if t in analysis_enriched['ticker'].unique():
        print(f"\n{t} sample:")
        display(analysis_enriched.loc[analysis_enriched['ticker'].eq(t),
                  ['date', adj_col if adj_col else 'adj close', 'market_cap',
                   'shares_outstanding','float_shares','free_float']].head(7))

shares_outstanding coverage: 3668 -> 3705 tickers

AACB sample:


Unnamed: 0,date,adj close,market_cap,shares_outstanding,float_shares,free_float
0,2022-10-03,,,27675000.0,22147902.0,22147902.0
1,2022-10-04,,,27675000.0,22147902.0,22147902.0
2,2022-10-05,,,27675000.0,22147902.0,22147902.0
3,2022-10-06,,,27675000.0,22147902.0,22147902.0
4,2022-10-07,,,27675000.0,22147902.0,22147902.0
5,2022-10-10,,,27675000.0,22147902.0,22147902.0
6,2022-10-11,,,27675000.0,22147902.0,22147902.0



AAPL sample:


Unnamed: 0,date,adj close,market_cap,shares_outstanding,float_shares,free_float
12016,2022-10-03,140.236282,,15943420000.0,14814270000.0,14814270000.0
12017,2022-10-04,143.829605,,15943420000.0,14814270000.0,14814270000.0
12018,2022-10-05,144.124924,,15943420000.0,14814270000.0,14814270000.0
12019,2022-10-06,143.169983,,15943420000.0,14814270000.0,14814270000.0
12020,2022-10-07,137.912994,,15943420000.0,14814270000.0,14814270000.0
12021,2022-10-10,138.237854,,15943420000.0,14814270000.0,14814270000.0
12022,2022-10-11,136.820251,,15943420000.0,14814270000.0,14814270000.0


In [90]:
rename_map = {}
if 'Adj Close' in analysis_enriched.columns: rename_map['Adj Close'] = 'adj_close'
if 'adj close' in analysis_enriched.columns: rename_map['adj close'] = 'adj_close'
analysis_enriched = analysis_enriched.rename(columns=rename_map)

In [91]:
analysis_enriched.head()

Unnamed: 0,date,ticker,open,high,low,close,adj_close,volume,ret,logret,...,cik,sic,sic_desc,recent_form,recent_filing_date,performed_split,performed_reverse_split,shares_outstanding,float_shares,free_float
0,2022-10-03,AACB,,,,,,,,,...,,,,,NaT,0,0,27675000.0,22147902.0,22147902.0
1,2022-10-04,AACB,,,,,,,,,...,,,,,NaT,0,0,27675000.0,22147902.0,22147902.0
2,2022-10-05,AACB,,,,,,,,,...,,,,,NaT,0,0,27675000.0,22147902.0,22147902.0
3,2022-10-06,AACB,,,,,,,,,...,,,,,NaT,0,0,27675000.0,22147902.0,22147902.0
4,2022-10-07,AACB,,,,,,,,,...,,,,,NaT,0,0,27675000.0,22147902.0,22147902.0


In [92]:
import pandas as pd

t = "AACB"

ae = analysis_enriched.copy()
ae['ticker'] = ae['ticker'].astype(str).str.upper()

print("Any adj_close for", t, "?:", ae.loc[ae['ticker'].eq(t), 'adj_close'].notna().any())
print("Any close for", t, "?:",     ae.loc[ae['ticker'].eq(t), 'close'].notna().any())
print("SIC / SIC desc:", (ae.loc[ae['ticker'].eq(t), ['sic','sic_desc']]
                            .drop_duplicates()
                            .head(3)))

Any adj_close for AACB ?: True
Any close for AACB ?: True
SIC / SIC desc:    sic sic_desc
0  NaN      NaN


In [93]:
t = "AACB"
ae = analysis_enriched.copy()
ae['ticker'] = ae['ticker'].astype(str).str.upper()

col = 'adj_close'
print("Any adj_close for", t, "?:", ae.loc[ae['ticker'].eq(t), col].notna().any())
print("Any close for", t, "?:",     ae.loc[ae['ticker'].eq(t), 'close'].notna().any())

# How many non-null points and their date span?
s = (ae.loc[ae['ticker'].eq(t), ['date', col]]
       .dropna()
       .sort_values('date'))
print("Non-null adj_close rows:", len(s))
if not s.empty:
    print("First/last non-null dates:", s['date'].iloc[0].date(), "→", s['date'].iloc[-1].date())
    display(s.head(3))
    display(s.tail(3))

Any adj_close for AACB ?: True
Any close for AACB ?: True
Non-null adj_close rows: 122
First/last non-null dates: 2025-04-07 → 2025-09-30


Unnamed: 0,date,adj_close
629,2025-04-07,9.88
630,2025-04-08,9.95
631,2025-04-09,9.9


Unnamed: 0,date,adj_close
748,2025-09-26,10.18
749,2025-09-29,10.18
750,2025-09-30,10.18


In [94]:
analysis_enriched[analysis_enriched["market_cap"] < 300000000]

Unnamed: 0,date,ticker,open,high,low,close,adj_close,volume,ret,logret,...,cik,sic,sic_desc,recent_form,recent_filing_date,performed_split,performed_reverse_split,shares_outstanding,float_shares,free_float
287633,2022-10-03,ASUR,5.63,5.71,5.21,5.37,5.37,50100,,,...,0000884144,7373,Services-Computer Integrated Systems Design,SCHEDULE 13G,2025-09-17,0,1,20160000.0,,
287634,2022-10-04,ASUR,5.31,5.49,5.10,5.20,5.20,58600,-0.031657,-0.032169,...,0000884144,7373,Services-Computer Integrated Systems Design,SCHEDULE 13G,2025-09-17,0,0,20160000.0,,
287635,2022-10-05,ASUR,5.20,5.31,5.14,5.25,5.25,16000,0.009615,0.009569,...,0000884144,7373,Services-Computer Integrated Systems Design,SCHEDULE 13G,2025-09-17,0,0,20160000.0,,
287636,2022-10-06,ASUR,5.34,5.52,5.16,5.51,5.51,22200,0.049524,0.048337,...,0000884144,7373,Services-Computer Integrated Systems Design,SCHEDULE 13G,2025-09-17,0,0,20160000.0,,
287637,2022-10-07,ASUR,5.38,5.40,5.15,5.15,5.15,6300,-0.065336,-0.067568,...,0000884144,7373,Services-Computer Integrated Systems Design,SCHEDULE 13G,2025-09-17,0,0,20160000.0,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3845866,2025-09-24,ZYXI,1.55,1.59,1.50,1.50,1.50,80600,-0.019608,-0.019803,...,0000846475,3845,Electromedical & Electrotherapeutic Apparatus,8-K,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07
3845867,2025-09-25,ZYXI,1.51,1.54,1.45,1.47,1.47,109300,-0.020000,-0.020203,...,0000846475,3845,Electromedical & Electrotherapeutic Apparatus,8-K,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07
3845868,2025-09-26,ZYXI,1.47,1.50,1.44,1.45,1.45,54700,-0.013605,-0.013699,...,0000846475,3845,Electromedical & Electrotherapeutic Apparatus,8-K,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07
3845869,2025-09-29,ZYXI,1.45,1.49,1.42,1.47,1.47,115800,0.013793,0.013699,...,0000846475,3845,Electromedical & Electrotherapeutic Apparatus,8-K,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07


1) Setup & column detection (handles adj_close vs adj close)

In [95]:
import numpy as np, pandas as pd

ae = analysis_enriched.copy()
ae['ticker'] = ae['ticker'].astype(str).str.upper()
ae['date']   = pd.to_datetime(ae['date']).dt.tz_localize(None)

# Find the adjusted close column robustly
ADJ_COLS = ['adj_close','adj close','Adj Close','adjClose']
adj_col = next((c for c in ADJ_COLS if c in ae.columns), None)
print("Using adjusted close column:", adj_col)

Using adjusted close column: adj_close


2) Schema, key uniqueness, date window

In [96]:
print("Rows:", len(ae), "| Columns:", len(ae.columns))
print("Columns:", list(ae.columns))

dups = ae.duplicated(['ticker','date']).sum()
print("Duplicate (ticker,date) rows:", dups)

per_tkr = (
    ae.groupby('ticker')
      .agg(first_date=('date','min'), last_date=('date','max'), n_rows=('date','size'))
      .reset_index()
)
display(per_tkr.head())
print("Tickers:", len(per_tkr))

Rows: 3845871 | Columns: 23
Columns: ['date', 'ticker', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'ret', 'logret', 'market_cap', 'sector', 'industry', 'cik', 'sic', 'sic_desc', 'recent_form', 'recent_filing_date', 'performed_split', 'performed_reverse_split', 'shares_outstanding', 'float_shares', 'free_float']
Duplicate (ticker,date) rows: 0


Unnamed: 0,ticker,first_date,last_date,n_rows
0,AACB,2022-10-03,2025-09-30,751
1,AACBR,2022-10-03,2025-09-30,751
2,AACBU,2022-10-03,2025-09-30,751
3,AACG,2022-10-03,2025-09-30,751
4,AACI,2022-10-03,2025-09-30,751


Tickers: 5121


3) Missingness & coverage snapshot

In [97]:
# Overall nulls
nulls = ae.isna().mean().sort_values(ascending=False).round(3)
display(nulls.to_frame('null_rate').head(12))

# Coverage by ticker for core fields
core = ['open','high','low','close', adj_col, 'volume',
        'ret','logret','market_cap','shares_outstanding','float_shares','free_float']
core = [c for c in core if c in ae.columns]

cov = (ae.groupby('ticker')[core].apply(lambda df: df.notna().any()).sum().sort_values(ascending=False))
print("Tickers with any data by field:")
display(cov.to_frame('tickers_with_any'))

Unnamed: 0,null_rate
market_cap,0.954
sector,0.948
industry,0.948
recent_filing_date,0.947
recent_form,0.947
...,...
cik,0.947
float_shares,0.350
free_float,0.350
shares_outstanding,0.324


Tickers with any data by field:


Unnamed: 0,tickers_with_any
open,5118
high,5118
low,5118
close,5118
adj_close,5118
...,...
logret,4730
float_shares,3966
free_float,3966
shares_outstanding,3705


4) Price integrity rules

In [98]:
issues = {}

if all(c in ae.columns for c in ['high','open','close','low']):
    issues['high_below'] = (ae['high'] < ae[['open','close','low']].max(axis=1)).sum()
    issues['low_above']  = (ae['low']  > ae[['open','close','low']].min(axis=1)).sum()

if 'volume' in ae.columns:
    issues['neg_volume'] = (ae['volume'] < 0).sum()

if 'close' in ae.columns and adj_col:
    issues['nonpos_prices'] = ((ae['close'] <= 0) | (ae[adj_col] <= 0)).sum()

print("Price integrity issues:", issues)

Price integrity issues: {'high_below': np.int64(104), 'low_above': np.int64(111), 'neg_volume': np.int64(0), 'nonpos_prices': np.int64(0)}


5) Return sanity: recompute and compare

In [99]:
# Recompute exactly how the dataset was built, but keep index alignment
tmp = ae.sort_values(['ticker','date'])[['ticker', adj_col, 'ret', 'logret']].copy()

# Index-aligned recomputes (no .apply)
ref_ret = tmp.groupby('ticker')[adj_col].pct_change(fill_method=None)
ref_log = np.log(tmp[adj_col] / tmp.groupby('ticker')[adj_col].shift(1))

# Compare only where both sides are present
m_ret = tmp['ret'].notna() & ref_ret.notna()
m_log = tmp['logret'].notna() & ref_log.notna()

ok_ret = np.isclose(tmp.loc[m_ret, 'ret'].to_numpy(),
                    ref_ret.loc[m_ret].to_numpy(), rtol=1e-10).mean()
ok_log = np.isclose(tmp.loc[m_log, 'logret'].to_numpy(),
                    ref_log.loc[m_log].to_numpy(), rtol=1e-10).mean()

print(f"ret match: {ok_ret:.6f}")
print(f"logret match: {ok_log:.6f}")

# (optional) peek at any mismatches
bad_ret = m_ret & ~np.isclose(tmp['ret'], ref_ret, rtol=1e-10)
if bad_ret.any():
    display(tmp.loc[bad_ret, ['ticker','date','ret']].head())

ret match: 1.000000
logret match: 1.000000


6) Cross-field coherence: market cap vs shares × price

In [100]:
if {'market_cap','shares_outstanding'}.issubset(ae.columns) and adj_col:
    m = ae[['ticker','date','market_cap','shares_outstanding',adj_col]].dropna()
    ratio = (m['market_cap'] / (m['shares_outstanding'] * m[adj_col])).replace([np.inf,-np.inf], np.nan)
    q = ratio.quantile([0.01,0.05,0.5,0.95,0.99]).round(3)
    print("MCAP / (shares * adj_close) quantiles:")
    display(q)
    bad = m.loc[(ratio < 0.2) | (ratio > 5.0)]
    print("Rows with very off ratios:", len(bad))

MCAP / (shares * adj_close) quantiles:


0.01     0.000
0.05     0.010
0.50     1.005
0.95     4.505
0.99    43.962
dtype: float64

Rows with very off ratios: 29297


In [101]:
core = ae[['ticker','date','market_cap','shares_outstanding', adj_col]].dropna()
ratio = core['market_cap'] / (core['shares_outstanding'] * core[adj_col])
bad   = ratio[(ratio < 0.2) | (ratio > 5)]  # adjust bands to taste
print("Extreme mktcap ratios:", len(bad))

Extreme mktcap ratios: 29677


7) Float logic checks

In [102]:
m = ae['float_shares'].notna() & ae['shares_outstanding'].notna()
over = m & (ae['float_shares'] > ae['shares_outstanding'] * 1.2)  # hard cap at 120%
ae.loc[over, 'float_shares'] = ae.loc[over, 'shares_outstanding']
ae['free_float'] = ae['float_shares']

In [103]:
if 'float_shares' in ae.columns and 'shares_outstanding' in ae.columns:
    cond = (ae['float_shares'] <= ae['shares_outstanding']) | ae['shares_outstanding'].isna()
    print("float_shares ≤ shares_outstanding fraction:", round(cond.mean(),4))

if 'free_float' in ae.columns and 'float_shares' in ae.columns:
    eq = (ae['free_float'].fillna(-1) == ae['float_shares'].fillna(-1)).mean()
    print("free_float equals float_shares fraction (after fillna sentinel):", round(float(eq),4))

float_shares ≤ shares_outstanding fraction: 0.8244
free_float equals float_shares fraction (after fillna sentinel): 1.0


8) Split flags sanity (no both flags same day; big price moves around split)

In [104]:
if {'performed_split','performed_reverse_split'}.issubset(ae.columns):
    both = ae[(ae['performed_split'] == 1) & (ae['performed_reverse_split'] == 1)]
    print("Rows with both split & reverse flags ON:", len(both))

    # crude price-ratio check around flagged days
    if adj_col:
        g = ae.sort_values(['ticker','date']).copy()
        prev = g.groupby('ticker')[adj_col].shift(1)
        ratio = prev / g[adj_col]
        around_split = g.loc[g['performed_split']==1, ['ticker','date']].assign(ratio=ratio[g['performed_split']==1])
        around_rev   = g.loc[g['performed_reverse_split']==1, ['ticker','date']].assign(ratio=ratio[g['performed_reverse_split']==1])
        print("Median prev/next price ratio on split days (expect >1):", round(around_split['ratio'].median(skipna=True),3))
        print("Median prev/next price ratio on reverse-split days (expect <1):", round(around_rev['ratio'].median(skipna=True),3))

Rows with both split & reverse flags ON: 0
Median prev/next price ratio on split days (expect >1): 0.999
Median prev/next price ratio on reverse-split days (expect <1): 1.026


9) Ticker spot-check helper (quick look at any symbol)

In [105]:
def spotcheck(tkr, n=8):
    t = str(tkr).upper()
    cols = ['date','open','high','low','close',adj_col,'volume','ret','logret',
            'shares_outstanding','float_shares','free_float','performed_split','performed_reverse_split']
    cols = [c for c in cols if c in ae.columns]
    df = ae.loc[ae['ticker'].eq(t), cols].sort_values('date')
    print(f"{t}: rows={len(df)} | first={df['date'].min().date() if len(df) else None} | last={df['date'].max().date() if len(df) else None}")
    display(df.head(n))
    display(df.tail(n))

# Example:
# spotcheck("AAPL")

10) Save a tiny QC summary table (handy when you rerun)

In [106]:
qc = pd.DataFrame({
    'rows':[len(ae)],
    'tickers':[ae['ticker'].nunique()],
    'dupe_key_rows':[dups],
    'pct_null_adj_close':[ae[adj_col].isna().mean() if adj_col else np.nan],
    'tickers_with_any_shares_outstanding':[ae.groupby('ticker')['shares_outstanding'].apply(lambda s: s.notna().any()).sum() if 'shares_outstanding' in ae else np.nan],
    'tickers_with_any_float':[ae.groupby('ticker')['float_shares'].apply(lambda s: s.notna().any()).sum() if 'float_shares' in ae else np.nan],
})
display(qc)
qc.to_csv("outputs/qc_summary.csv", index=False)

Unnamed: 0,rows,tickers,dupe_key_rows,pct_null_adj_close,tickers_with_any_shares_outstanding,tickers_with_any_float
0,3845871,5121,0,0.251105,3705,3966


In [107]:
analysis_enriched["recent_form"].unique()

[NaN, '6-K', '4', 'SCHEDULE 13G', 'S-3ASR', ..., '424B3', 'F-3', 'F-1', 'S-8', '20-F']
Length: 30
Categories (29, object): ['10-Q', '10-Q/A', '144', '20-F', ..., 'SCHEDULE 13D', 'SCHEDULE 13D/A', 'SCHEDULE 13G', 'SCHEDULE 13G/A']

In [108]:
# ---- run-mode flag (near your other flags) ----
REBUILD_FILINGS = False   # set True only when you want to refresh filings from sec_cache

# --- SKIP GUARD: avoid re-annotating if already present ---
if ('filing_form' in analysis_enriched.columns
    and analysis_enriched['filing_form'].notna().any()
    and not REBUILD_FILINGS):
    print("[filings] Annotation already present; skipping (set REBUILD_FILINGS=True to refresh).")
else:
    # === Annotate using prebuilt `filings` snapshot (fast, no re-parse) ===
    import pandas as pd
    from pathlib import Path

    OUT = Path("outputs")
    P_FILINGS = OUT / "filings_recent.parquet"
    TOL_DAYS = 365

    # Read the snapshot that you’ve already built
    filings = pd.read_parquet(P_FILINGS).rename(columns={'filing_date':'event_date', 'form':'filing_form'})
    filings['ticker'] = filings['ticker'].astype(str).str.upper()
    filings['event_date'] = pd.to_datetime(filings['event_date'], errors='coerce').dt.tz_localize(None).dt.normalize()
    filings = filings[['ticker','event_date','filing_form']].sort_values(['ticker','event_date'])

    # Prep left (prices panel)
    ae = analysis_enriched.copy()
    ae['ticker'] = ae['ticker'].astype(str).str.upper()
    ae['date']   = pd.to_datetime(ae['date'], errors='coerce').dt.tz_localize(None).dt.normalize()
    ae = ae.drop(columns=['filing_form','is_filing_day','last_filing_date','days_since_filing',
                          'event_date','filing_form_x','filing_form_y'], errors='ignore')
    ae = ae.dropna(subset=['ticker','date']).sort_values(['ticker','date'])

    tol = pd.Timedelta(f'{TOL_DAYS}D')
    parts = []
    for tkr, left_t in ae.groupby('ticker', sort=False):
        left_t = left_t.sort_values('date').copy()
        right_t = filings.loc[filings['ticker'].eq(tkr), ['event_date','filing_form']].sort_values('event_date')

        if right_t.empty:
            left_t['filing_form']       = pd.NA
            left_t['last_filing_date']  = pd.NaT
            left_t['is_filing_day']     = False
            left_t['days_since_filing'] = pd.NA
        else:
            j = pd.merge_asof(
                left=left_t[['date']], right=right_t,
                left_on='date', right_on='event_date',
                direction='backward', allow_exact_matches=True, tolerance=tol
            )
            left_t['filing_form']       = j['filing_form'].values
            left_t['last_filing_date']  = j['event_date'].values
            left_t['is_filing_day']     = left_t['date'].eq(left_t['last_filing_date'])
            left_t['days_since_filing'] = (left_t['date'] - left_t['last_filing_date']).dt.days

        parts.append(left_t)

    analysis_enriched = pd.concat(parts, ignore_index=True)

    # Quick checks
    rows_with = int(analysis_enriched['filing_form'].notna().sum())
    tickers_with = int(analysis_enriched.groupby('ticker')['filing_form'].apply(lambda s: s.notna().any()).sum())
    print("rows with filing_form:", rows_with)
    print("tickers with ≥1 filing:", tickers_with)

  analysis_enriched = pd.concat(parts, ignore_index=True)


rows with filing_form: 2338594
tickers with ≥1 filing: 3335


In [109]:
cols_ok = {'filing_form','is_filing_day'}.issubset(analysis_enriched.columns)
non_null_rows = int(analysis_enriched['filing_form'].notna().sum())
tickers_with  = int(analysis_enriched.groupby('ticker')['filing_form'].apply(lambda s: s.notna().any()).sum())
print("filings columns present:", cols_ok, "| rows:", non_null_rows, "| tickers:", tickers_with)

filings columns present: True | rows: 2338594 | tickers: 3335


In [110]:
import pandas as pd
ae = analysis_enriched

# 1) What fraction of all rows now have a filing form?
coverage = ae['filing_form'].notna().mean()
print("share of rows with filing_form:", round(float(coverage), 3))

# 2) How many *actual filing events* (not carry-forward)?
events = ae.loc[ae['is_filing_day'] == True, ['ticker','date','filing_form']].drop_duplicates()
print("unique filing event rows:", len(events))
print("tickers with ≥1 filing event:", events['ticker'].nunique())

# 3) Event count per ticker (median/mean) — should be far smaller than total daily rows
per_tkr = events.groupby('ticker').size()
print("median events/ticker:", int(per_tkr.median()))
print("mean events/ticker:", round(float(per_tkr.mean()), 2))

# 4) Top 10 filing forms by event-day count
print(ae.loc[ae['is_filing_day'] == True, 'filing_form'].value_counts().head(10))

# 5) Spot-check a ticker’s event days only (e.g., AAPL)
print(ae.query("ticker == 'AAPL' and is_filing_day")[['date','filing_form']].head(10))

share of rows with filing_form: 0.608
unique filing event rows: 334067
tickers with ≥1 filing event: 3335
median events/ticker: 97
mean events/ticker: 100.17
filing_form
4           80436
8-K         74139
144         28363
6-K         26642
10-Q        12818
SC 13G/A    12814
UPLOAD       8916
3            6561
CORRESP      6344
DEF 14A      5305
Name: count, dtype: int64
            date filing_form
12017 2022-10-04           4
12027 2022-10-18           4
12034 2022-10-27         8-K
12035 2022-10-28        10-K
12037 2022-11-01           4
12041 2022-11-07         8-K
12043 2022-11-09      25-NSE
12053 2022-11-23           4
12086 2023-01-12     DEF 14A
12092 2023-01-23     PX14A6G


In [111]:
from pathlib import Path
import pandas as pd

P_FILINGS = Path("outputs/filings_recent.parquet")
fil = pd.read_parquet(P_FILINGS)

# Basic stats
print("unique filing events:", len(fil))
print("tickers with ≥1 filing:", fil['ticker'].nunique())
print("\nTop forms by event count:")
print(fil['form'].value_counts().head(10))

# Example: AAPL’s events (if present)
print("\nAAPL events (first 10):")
print(fil.query("ticker=='AAPL'")[['filing_date','form']].sort_values('filing_date').head(10))

unique filing events: 1286534
tickers with ≥1 filing: 3357

Top forms by event count:
form
4           327407
8-K         280107
10-Q         76999
6-K          74339
SC 13G/A     55677
144          33719
3            33514
CORRESP      30493
UPLOAD       30241
SC 13G       25815
Name: count, dtype: int64

AAPL events (first 10):
     filing_date     form
2994  2014-11-13  CERTNYS
2995  2014-11-21        4
2996  2014-11-25        4
2997  2014-12-04   NO ACT
2998  2014-12-11   NO ACT
2999  2014-12-23        4
3000  2014-12-29   NO ACT
3001  2014-12-30   NO ACT
3002  2015-01-02        4
3004  2015-01-22  DEF 14A


In [112]:
from pathlib import Path
import json
import pandas as pd

OUT = Path("outputs"); OUT.mkdir(exist_ok=True)

# Save (compact, fast to reload)
p_with_pq  = OUT / "analysis_enriched_with_filings.parquet"
p_with_csv = OUT / "analysis_enriched_with_filings.csv"
analysis_enriched.to_parquet(p_with_pq, index=False, compression="zstd")
analysis_enriched.to_csv(p_with_csv, index=False)

# Tiny schema manifest
schema = {c: str(t) for c, t in analysis_enriched.dtypes.items()}
(OUT / "analysis_enriched_with_filings.schema.json").write_text(json.dumps(schema, indent=2))

# Mirror to canonical filenames (byte-for-byte)
p_can_pq  = OUT / "analysis_enriched.parquet"
p_can_csv = OUT / "analysis_enriched.csv"
p_can_pq.write_bytes(p_with_pq.read_bytes())
p_can_csv.write_bytes(p_with_csv.read_bytes())

print("Saved with filings columns and mirrored to canonical names.")

Saved with filings columns and mirrored to canonical names.


In [113]:
# How many rows got a filing_form?
total_rows_with_form = analysis_enriched['filing_form'].notna().sum()
tickers_with_form = (analysis_enriched
                     .groupby('ticker')['filing_form']
                     .apply(lambda s: s.notna().any())
                     .sum())
print("rows with filing_form:", int(total_rows_with_form))
print("tickers with ≥1 filing:", int(tickers_with_form))

# Show a few examples anywhere filing_form is present
display(analysis_enriched.loc[analysis_enriched['filing_form'].notna(),
                              ['ticker','date','filing_form','is_filing_day']].head(20))

# Check a specific ticker (AAPL as example)
t = "AAPL"
display(analysis_enriched.loc[analysis_enriched['ticker'].eq(t) &
                              analysis_enriched['filing_form'].notna(),
                              ['date','filing_form','is_filing_day']].head(15))

# Count of forms (top 15)
display(analysis_enriched.loc[analysis_enriched['filing_form'].notna(), 'filing_form']
        .value_counts()
        .head(15)
        .to_frame('rows'))

# What fraction of annotated rows are exact filing days?
if total_rows_with_form:
    exact = analysis_enriched.loc[analysis_enriched['filing_form'].notna(), 'is_filing_day'].mean()
    print("share of annotated rows that fall exactly on filing day:", round(float(exact), 3))

rows with filing_form: 2338594
tickers with ≥1 filing: 3335


Unnamed: 0,ticker,date,filing_form,is_filing_day
472,AACB,2024-08-20,DRS,True
473,AACB,2024-08-21,DRS,False
474,AACB,2024-08-22,DRS,False
475,AACB,2024-08-23,DRS,False
476,AACB,2024-08-26,DRS,False
...,...,...,...,...
487,AACB,2024-09-11,DRS,False
488,AACB,2024-09-12,DRS,False
489,AACB,2024-09-13,DRS,False
490,AACB,2024-09-16,UPLOAD,True


Unnamed: 0,date,filing_form,is_filing_day
12016,2022-10-03,4,False
12017,2022-10-04,4,True
12018,2022-10-05,4,False
12019,2022-10-06,4,False
12020,2022-10-07,4,False
...,...,...,...
12026,2022-10-17,4,False
12027,2022-10-18,4,True
12028,2022-10-19,4,False
12029,2022-10-20,4,False


Unnamed: 0_level_0,rows
filing_form,Unnamed: 1_level_1
4,529741
8-K,528086
6-K,265074
10-Q,131379
SC 13G/A,80908
...,...
SC 13G,36403
10-K,32993
CORRESP,26079
424B3,24179


share of annotated rows that fall exactly on filing day: 0.143


In [114]:
analysis_enriched.filing_form.nunique()

239

In [115]:
analysis_enriched.query("market_cap < 300000000")

Unnamed: 0,date,ticker,open,high,low,close,adj_close,volume,ret,logret,...,recent_filing_date,performed_split,performed_reverse_split,shares_outstanding,float_shares,free_float,filing_form,last_filing_date,is_filing_day,days_since_filing
287633,2022-10-03,ASUR,5.63,5.71,5.21,5.37,5.37,50100,,,...,2025-09-17,0,1,20160000.0,,,4,2022-08-15,False,49.0
287634,2022-10-04,ASUR,5.31,5.49,5.10,5.20,5.20,58600,-0.031657,-0.032169,...,2025-09-17,0,0,20160000.0,,,4,2022-08-15,False,50.0
287635,2022-10-05,ASUR,5.20,5.31,5.14,5.25,5.25,16000,0.009615,0.009569,...,2025-09-17,0,0,20160000.0,,,4,2022-08-15,False,51.0
287636,2022-10-06,ASUR,5.34,5.52,5.16,5.51,5.51,22200,0.049524,0.048337,...,2025-09-17,0,0,20160000.0,,,4,2022-08-15,False,52.0
287637,2022-10-07,ASUR,5.38,5.40,5.15,5.15,5.15,6300,-0.065336,-0.067568,...,2025-09-17,0,0,20160000.0,,,4,2022-08-15,False,53.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3845866,2025-09-24,ZYXI,1.55,1.59,1.50,1.50,1.50,80600,-0.019608,-0.019803,...,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,2.0
3845867,2025-09-25,ZYXI,1.51,1.54,1.45,1.47,1.47,109300,-0.020000,-0.020203,...,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,3.0
3845868,2025-09-26,ZYXI,1.47,1.50,1.44,1.45,1.45,54700,-0.013605,-0.013699,...,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,4.0
3845869,2025-09-29,ZYXI,1.45,1.49,1.42,1.47,1.47,115800,0.013793,0.013699,...,2025-09-22,0,0,30297442.0,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,7.0


In [116]:
# ---- splits: skip-guard so we don't recompute on reruns ----
SPLITS_REBUILD = False  # set True only if you need to rebuild/re-snap

have_cols = {'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'}.issubset(analysis_enriched.columns)
if have_cols and analysis_enriched['is_split_day'].notna().any() and not SPLITS_REBUILD:
    print("[splits] already annotated; skipping (set SPLITS_REBUILD=True to rebuild).")
else:
    # <-- keep your split-annotation block here -->
    pass

In [120]:
# === Split loader (cache-first, schema-tolerant) ===
from pathlib import Path
import pandas as pd, numpy as np
import re

OUT = Path("outputs")
P_SPLITS = OUT / "split_events.parquet"

def _parse_ratio_series(s: pd.Series) -> pd.Series:
    """Accept numeric, or strings like '3:1', '1:10', returns shares-multiplier (3.0, 0.1, ...)."""
    s = s.astype(str)
    m = s.str.extract(r'^\s*(\d+(?:\.\d+)?)\s*[:/]\s*(\d+(?:\.\d+)?)\s*$')
    ok = m.notna().all(axis=1)
    out = pd.Series(np.nan, index=s.index, dtype='float64')
    out.loc[ok] = m.loc[ok, 0].astype(float) / m.loc[ok, 1].astype(float)
    # try plain numeric fallback where parse failed
    out = out.where(ok, pd.to_numeric(s, errors='coerce'))
    return out

def _standardize_splits(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # --- choose/compute event_date ---
    if 'event_date' in df.columns:
        ev = df['event_date']
    elif 'date' in df.columns:
        ev = df['date']
    else:
        # last resort: common variants
        for c in ['executionDate','effective_date','ex_date']:
            if c in df.columns:
                ev = df[c]
                break
        else:
            raise AssertionError("No event_date/date column in split_events source.")
    ev = pd.to_datetime(ev, errors='coerce').dt.tz_localize(None).dt.normalize()

    # --- compute split_ratio (shares multiplier; forward>1, reverse<1) ---
    if 'split_ratio' in df.columns:
        ratio = pd.to_numeric(df['split_ratio'], errors='coerce')
    elif {'numerator','denominator'}.issubset(df.columns):
        # e.g., 3-for-1 -> 3/1 = 3.0 ; 1-for-10 -> 1/10 = 0.1
        ratio = pd.to_numeric(df['numerator'], errors='coerce') / pd.to_numeric(df['denominator'], errors='coerce')
    elif 'ratio' in df.columns:
        ratio = _parse_ratio_series(df['ratio'])
    elif 'split' in df.columns:
        ratio = _parse_ratio_series(df['split'])
    else:
        raise AssertionError("No split ratio columns found (expected split_ratio, ratio, numerator/denominator, or split).")

    out = pd.DataFrame({
        'ticker': df.get('ticker'),
        'event_date': ev,
        'split_ratio': ratio
    })
    out['ticker'] = out['ticker'].astype(str).str.upper()
    out = out.dropna(subset=['ticker','event_date','split_ratio'])
    return (out
            .sort_values(['ticker','event_date'])
            .drop_duplicates(['ticker','event_date'], keep='last')
            .reset_index(drop=True))

# --- load/create `splits` ---
if 'splits' in globals() and isinstance(splits, pd.DataFrame) and not splits.empty:
    splits = _standardize_splits(splits)
    print(f"[splits] using existing variable -> {len(splits)} rows, {splits['ticker'].nunique()} tickers")
elif P_SPLITS.exists():
    raw = pd.read_parquet(P_SPLITS)
    splits = _standardize_splits(raw)
    print(f"[splits] loaded from {P_SPLITS.name} -> {len(splits)} rows, {splits['ticker'].nunique()} tickers")
elif {'is_split_day','split_ratio'}.issubset(analysis_enriched.columns):
    # derive from panel (only exact trading-day events)
    tmp = (analysis_enriched.loc[analysis_enriched['is_split_day']]
           .rename(columns={'date':'event_date'})[['ticker','event_date','split_ratio']])
    splits = _standardize_splits(tmp)
    print(f"[splits] derived from analysis_enriched -> {len(splits)} rows, {splits['ticker'].nunique()} tickers")
else:
    raise AssertionError("Run your split collection step (or provide outputs/split_events.parquet) before snapping.")

# quick peek
display(splits.head(10))

[splits] loaded from split_events.parquet -> 75614 rows, 2505 tickers


Unnamed: 0,ticker,event_date,split_ratio
0,AAME,1985-11-04,2.0
1,AAME,1986-11-04,1.25
2,AAON,1993-09-16,0.25
3,AAON,1995-03-07,1.1
4,AAON,2001-10-01,1.5
5,AAON,2002-06-05,1.5
6,AAON,2007-08-22,1.5
7,AAON,2011-06-14,1.5
8,AAON,2013-07-03,1.5
9,AAON,2014-07-17,1.5


In [122]:
# === Split events: snap to nearest trading day (±3D) per-ticker & re-annotate (robust, vectorized) ===
import pandas as pd, numpy as np

# --- early skip for this cell ---
SPLITS_REBUILD = globals().get("SPLITS_REBUILD", False)

already = (
    {'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'}.issubset(analysis_enriched.columns)
    and analysis_enriched['is_split_day'].notna().any()
)

if already and not SPLITS_REBUILD:
    print("[splits] already annotated; skipping this cell (set SPLITS_REBUILD=True to rebuild).")
else:
    assert 'analysis_enriched' in globals(), "Run your finalize/backfill first."
    assert 'splits' in globals() and not splits.empty, "Run the split loader cell first."

    # Normalize panel
    ae = analysis_enriched.copy()
    ae['ticker'] = ae['ticker'].astype(str).str.upper()
    ae['date']   = pd.to_datetime(ae['date'], errors='coerce').dt.tz_localize(None).dt.normalize()
    ae = ae.dropna(subset=['ticker','date']).sort_values(['ticker','date'])

    # Trading calendar (unique trading dates per ticker)
    cal = (ae[['ticker','date']]
           .drop_duplicates()
           .sort_values(['ticker','date']))

    # Normalize split events
    sp = (splits.assign(
            ticker=lambda d: d['ticker'].astype(str).str.upper(),
            event_date=lambda d: pd.to_datetime(d['event_date'], errors='coerce').dt.tz_localize(None).dt.normalize(),
            split_ratio=lambda d: pd.to_numeric(d['split_ratio'], errors='coerce')
         )
         .dropna(subset=['ticker','event_date','split_ratio'])
         .sort_values(['ticker','event_date'])
         .copy())

    tol = pd.Timedelta('3D')

# ---------- SNAP USING SEARCHSORTED (no merge_asof) ----------
snapped = []  # holds per-ticker frames with event_trading_date
tol_ns = np.timedelta64(3, 'D')

for tkr, sp_t in sp.groupby('ticker', sort=False):
    # numpy datetime arrays (ns)
    r = cal.loc[cal['ticker'].eq(tkr), 'date'].values.astype('datetime64[ns]')
    if r.size == 0:
        continue

    ev = sp_t['event_date'].values.astype('datetime64[ns]')

    # forward index (first trading date >= event_date)
    idx_f = np.searchsorted(r, ev, side='left')
    # backward index (last trading date <= event_date), clipped to [0, size-1]
    idx_b = np.clip(idx_f - 1, 0, r.size - 1)

    # candidate forward dates (safe construction: fill with NaT, then set where in-bounds)
    d_f = np.full(ev.shape, np.datetime64('NaT', 'ns'), dtype='datetime64[ns]')
    mask_f = idx_f < r.size
    d_f[mask_f] = r[idx_f[mask_f]]

    # candidate backward dates (always valid after clipping)
    d_b = r[idx_b]

    # distances (timedelta64[ns])
    df = d_f - ev  # forward distance (>=0 if valid)
    db = ev - d_b  # backward distance (>=0)

    # within tolerance
    ok_f = mask_f & (df >= np.timedelta64(0, 'ns')) & (df <= tol_ns)
    ok_b = (db >= np.timedelta64(0, 'ns')) & (db <= tol_ns)

    # choose closer; prefer forward if tie / both valid
    choose_f = ok_f & (~ok_b | (df <= db))
    chosen = np.where(choose_f, d_f, np.where(ok_b, d_b, np.datetime64('NaT', 'ns')))

    out = sp_t.copy()
    # chosen is numpy datetime64; convert to pandas (naive date)
    out['event_trading_date'] = pd.to_datetime(chosen).tz_localize(None).normalize()
    snapped.append(out)

In [123]:
from pathlib import Path
OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
analysis_enriched.to_parquet(OUT / "analysis_enriched_with_splits.parquet", index=False, compression="zstd")
analysis_enriched.to_csv(OUT / "analysis_enriched_with_splits.csv", index=False)
# optional: mirror to canonical if you want downstream code to keep using the same filename
(OUT / "analysis_enriched.parquet").write_bytes((OUT / "analysis_enriched_with_splits.parquet").read_bytes())
(OUT / "analysis_enriched.csv").write_bytes((OUT / "analysis_enriched_with_splits.csv").read_bytes())
print("Saved analysis_enriched with split columns.")

Saved analysis_enriched with split columns.


In [127]:
{'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'}.issubset(analysis_enriched.columns)

False

In [130]:
# --- Ensure split annotations exist on `analysis_enriched` (idempotent) ---
import pandas as pd, numpy as np
from pathlib import Path

NEEDED = {'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'}
have_cols = NEEDED.issubset(analysis_enriched.columns)

if not have_cols:
    # 1) Load/normalize `splits` if missing (try a few on-disk candidates)
    if 'splits' not in globals() or splits is None or getattr(splits, 'empty', True):
        OUT = Path("outputs")
        cand = [
            OUT / "split_events.parquet",
            OUT / "splits_events.parquet",
            OUT / "shares_events.parquet",   # sometimes contains split info
        ]
        splits = pd.DataFrame()
        for p in cand:
            if p.exists():
                try:
                    tmp = pd.read_parquet(p)
                    # normalize to ['ticker','event_date','split_ratio']
                    if {'ticker','event_date','split_ratio'} <= set(tmp.columns):
                        splits = tmp[['ticker','event_date','split_ratio']].copy()
                        break
                    elif {'ticker','date','split_ratio'} <= set(tmp.columns):
                        tmp = tmp.rename(columns={'date':'event_date'})
                        splits = tmp[['ticker','event_date','split_ratio']].copy()
                        break
                    elif {'ticker','date','splitTo','splitFrom'} <= set(tmp.columns):
                        tmp = tmp.rename(columns={'date':'event_date'})
                        tmp['split_ratio'] = pd.to_numeric(tmp['splitTo'], errors='coerce') / pd.to_numeric(tmp['splitFrom'], errors='coerce')
                        splits = tmp[['ticker','event_date','split_ratio']].copy()
                        break
                except Exception:
                    pass

    # If still nothing, add empty columns so downstream cells don’t crash
    if splits is None or splits.empty:
        print("[splits] No split events found on disk; adding empty columns.")
        analysis_enriched = analysis_enriched.copy()
        if 'split_ratio' not in analysis_enriched.columns:
            analysis_enriched['split_ratio'] = np.nan
        if 'is_split_day' not in analysis_enriched.columns:
            analysis_enriched['is_split_day'] = False
        if 'is_reverse_split_day' not in analysis_enriched.columns:
            analysis_enriched['is_reverse_split_day'] = False
        if 'split_cum_factor' not in analysis_enriched.columns:
            # if no splits, cum factor is 1.0 by default
            analysis_enriched['split_cum_factor'] = 1.0
    else:
        # 2) Snap events to nearest trading day (±3D) and annotate panel
        ae = analysis_enriched.copy()
        ae['ticker'] = ae['ticker'].astype(str).str.upper()
        ae['date']   = pd.to_datetime(ae['date'], errors='coerce').dt.tz_localize(None).dt.normalize()
        ae = ae.dropna(subset=['ticker','date']).sort_values(['ticker','date'])

        # trading calendar
        cal = (ae[['ticker','date']].drop_duplicates().sort_values(['ticker','date']))

        # normalize splits table
        sp = (splits.assign(
                ticker=lambda d: d['ticker'].astype(str).str.upper(),
                event_date=lambda d: pd.to_datetime(d['event_date'], errors='coerce').dt.tz_localize(None).dt.normalize(),
                split_ratio=lambda d: pd.to_numeric(d['split_ratio'], errors='coerce')
             )
             .dropna(subset=['ticker','event_date','split_ratio'])
             .sort_values(['ticker','event_date'])
             .copy())

        tol = np.timedelta64(3, 'D')
        snapped = []

        for tkr, sp_t in sp.groupby('ticker', sort=False):
            r = cal.loc[cal['ticker'].eq(tkr), 'date'].to_numpy('datetime64[ns]')
            if r.size == 0:
                continue
            ev = sp_t['event_date'].to_numpy('datetime64[ns]')

            idx_f = np.searchsorted(r, ev, side='left')
            idx_b = np.clip(idx_f - 1, 0, r.size - 1)

            mask_f = idx_f < r.size
            # safe forward candidate (avoid out-of-bounds)
            d_f = np.where(mask_f, r[np.where(mask_f, idx_f, 0)], np.datetime64('NaT'))
            d_b = r[idx_b]

            df = d_f - ev
            db = ev - d_b

            ok_f = mask_f & (df >= np.timedelta64(0,'ns')) & (df <= tol)
            ok_b = (db >= np.timedelta64(0,'ns')) & (db <= tol)

            choose_f = ok_f & (~ok_b | (df <= db))
            chosen = np.where(choose_f, d_f, np.where(ok_b, d_b, np.datetime64('NaT')))

            out = sp_t.copy()
            out['event_trading_date'] = pd.to_datetime(chosen).normalize()
            snapped.append(out)

        if snapped:
            sp2 = (pd.concat(snapped, ignore_index=True)
                     .dropna(subset=['event_trading_date'])
                     .loc[:, ['ticker','event_trading_date','split_ratio']]
                     .drop_duplicates(['ticker','event_trading_date'], keep='last')
                     .rename(columns={'event_trading_date':'date'}))
        else:
            sp2 = pd.DataFrame(columns=['ticker','date','split_ratio'])

        # merge onto panel + flags + cum factor
        ae = ae.drop(columns=['split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'], errors='ignore')
        merged = ae.merge(sp2, on=['ticker','date'], how='left')

        merged['is_split_day'] = merged['split_ratio'].notna() & (merged['split_ratio'] != 1.0)
        merged['is_reverse_split_day'] = merged['is_split_day'] & (merged['split_ratio'] < 1.0)

        base = merged['split_ratio'].astype(float).fillna(1.0)
        merged['split_cum_factor'] = base.groupby(merged['ticker']).cumprod()

        analysis_enriched = merged

# Quick confirmation for your next cell
print("split cols present:", NEEDED.issubset(analysis_enriched.columns))

split cols present: True


In [131]:
# 1) Do we have the columns?
cols_ok = {'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'} <= set(analysis_enriched.columns)
print("split cols present:", cols_ok)

# 2) Count split & reverse-split event rows (panel days, not unique events)
print("is_split_day rows:", int((analysis_enriched['is_split_day'] == True).sum()))
print("reverse_split rows:", int((analysis_enriched['is_reverse_split_day'] == True).sum()))
print("tickers with ≥1 split day:", analysis_enriched.loc[analysis_enriched['is_split_day'], 'ticker'].nunique())

# 3) Verify split_cum_factor is the groupwise cumprod of split_ratio (NaNs→1.0)
_base = analysis_enriched['split_ratio'].astype(float).fillna(1.0)
check = _base.groupby(analysis_enriched['ticker']).cumprod()
mismatch = (~(analysis_enriched['split_cum_factor'].astype(float).round(12)
              .eq(check.round(12)))).sum()
print("cumprod mismatches:", int(mismatch))

# 4) Peek a few split days
display(analysis_enriched.loc[analysis_enriched['is_split_day'],
                              ['ticker','date','split_ratio','is_reverse_split_day','split_cum_factor']]
        .sort_values(['ticker','date']).head(20))

split cols present: True
is_split_day rows: 17695
reverse_split rows: 17588
tickers with ≥1 split day: 2040
cumprod mismatches: 0


Unnamed: 0,ticker,date,split_ratio,is_reverse_split_day,split_cum_factor
9231,AAON,2023-08-17,1.500000,False,1.500000
16757,ABAT,2023-09-11,0.066667,True,0.066667
27813,ABTC,2022-11-08,0.050000,True,0.050000
28127,ABTC,2024-02-09,0.050000,True,0.002500
28518,ABTC,2025-09-03,0.200000,True,0.000500
...,...,...,...,...,...
73818,ADTX,2023-08-18,0.025000,True,0.025000
74100,ADTX,2024-10-02,0.025000,True,0.000625
74212,ADTX,2025-03-17,0.004000,True,0.000003
77721,ADVM,2024-03-21,0.100000,True,0.100000


In [132]:
import pandas as pd
import numpy as np

# Always bind to the master panel that has the split columns
panel = analysis_enriched.copy()

# Sanity: make sure the columns exist
need = {'split_ratio', 'is_split_day', 'date', 'ticker'}
missing = need - set(panel.columns)
assert not missing, f"Missing columns: {missing}. Re-run the split-annotation cell."

# Normalize date just in case
panel['date'] = pd.to_datetime(panel['date'], errors='coerce').dt.tz_localize(None).dt.normalize()

# 1) Known forward split: TSLA 3-for-1 on 2022-08-25 -> expect split_ratio ~ 3.0
print(
    panel.query("ticker == 'TSLA' and date >= '2022-08-20' and date <= '2022-08-30'")
         [['date','split_ratio','is_split_day']]
         .sort_values('date')
)

# 2) Event-day return sanity (adj prices are already split-adjusted → small |logret|)
evt = panel.loc[panel['is_split_day'] == True, ['ticker','date','logret']]
print("median |logret| on split-days:", float(evt['logret'].abs().median()))

Empty DataFrame
Columns: [date, split_ratio, is_split_day]
Index: []
median |logret| on split-days: 0.006226211191315398


In [133]:
import pandas as pd
panel = analysis_enriched.copy()
panel['date'] = pd.to_datetime(panel['date']).dt.tz_localize(None).dt.normalize()

# Split-day returns split by direction
evt = panel.loc[panel['is_split_day'] == True,
                ['ticker','date','logret','is_reverse_split_day','split_ratio']].copy()
evt['abs_lr'] = evt['logret'].abs()
print("median |logret| by type:")
print(evt.groupby('is_reverse_split_day')['abs_lr'].median())

median |logret| by type:
is_reverse_split_day
False    0.011819
True     0.006179
Name: abs_lr, dtype: float64


In [134]:
import numpy as np
import pandas as pd

ae = analysis_enriched.copy()
ae['date'] = pd.to_datetime(ae['date']).dt.tz_localize(None).dt.normalize()

# Get previous day's close per ticker
ae = ae.sort_values(['ticker','date'])
ae['prev_close'] = ae.groupby('ticker')['close'].shift(1)

spl = ae.loc[ae['is_split_day']].copy()

# For forward splits (ratio > 1): prev_close / close ≈ split_ratio
fwd = spl.loc[~spl['is_reverse_split_day'] & spl['split_ratio'].notna()].copy()
fwd['est_ratio'] = fwd['prev_close'] / fwd['close']
fwd['rel_err']   = (fwd['est_ratio'] - fwd['split_ratio']).abs() / fwd['split_ratio']

# For reverse splits (ratio < 1): close / prev_close ≈ 1/ratio
rev = spl.loc[spl['is_reverse_split_day'] & spl['split_ratio'].notna()].copy()
rev['est_ratio_inv'] = rev['close'] / rev['prev_close']
rev['rel_err']       = (rev['est_ratio_inv'] - (1.0 / rev['split_ratio'])).abs() / (1.0 / rev['split_ratio'])

print("Forward splits — median rel. error:", float(fwd['rel_err'].median()))
print("Reverse splits — median rel. error:", float(rev['rel_err'].median()))
print("Forward splits — share within 10%:", float((fwd['rel_err'] <= 0.10).mean()))
print("Reverse splits — share within 10%:", float((rev['rel_err'] <= 0.10).mean()))

Forward splits — median rel. error: 0.48480164863472436
Reverse splits — median rel. error: 0.9370940162858776
Forward splits — share within 10%: 0.22429906542056074
Reverse splits — share within 10%: 0.00011371389583807141


In [146]:
import numpy as np
import pandas as pd

panel = analysis_enriched.copy()
panel['date'] = pd.to_datetime(panel['date']).dt.tz_localize(None).dt.normalize()
panel = panel.sort_values(['ticker','date'])

# previous day's cumulative factor per ticker (from the full panel)
panel['prev_cum'] = panel.groupby('ticker')['split_cum_factor'].shift(1)

# restrict to actual split days
evt = panel.loc[panel['is_split_day']].copy()

# factor step should equal the announced split_ratio on the event trading day
evt['factor_step'] = evt['split_cum_factor'] / evt['prev_cum']

# evaluate only where we have a previous trading day cum factor
valid = evt['prev_cum'].notna() & evt['split_ratio'].notna()
ok = np.isclose(evt.loc[valid, 'factor_step'],
                evt.loc[valid, 'split_ratio'],
                rtol=1e-6)

print("factor step matches split_ratio (excluding first splits per ticker):",
      float(ok.mean()))

print("first-split rows (no prev_cum, expected NaN):",
      int((evt['prev_cum'].isna()).sum()))

print("mismatch examples among valid rows (if any):")
bad = evt.loc[valid].loc[~ok, ['ticker','date','split_ratio','factor_step']].head(10)
print(bad if not bad.empty else "None")

factor step matches split_ratio (excluding first splits per ticker): 0.1296485260770975
first-split rows (no prev_cum, expected NaN): 55
mismatch examples among valid rows (if any):
       ticker       date  split_ratio  factor_step
593643     CA 2024-02-29          0.0          NaN
593664     CA 2024-04-01          0.0          NaN
593685     CA 2024-04-30          0.0          NaN
593708     CA 2024-06-03          0.0          NaN
593727     CA 2024-07-01          0.0          NaN
593748     CA 2024-07-31          0.0          NaN
593771     CA 2024-09-03          0.0          NaN
593790     CA 2024-09-30          0.0          NaN
593813     CA 2024-10-31          0.0          NaN
593834     CA 2024-12-02          0.0          NaN


In [145]:
analysis_enriched

Unnamed: 0,date,ticker,open,high,low,close,adj_close,volume,ret,logret,...,float_shares,free_float,filing_form,last_filing_date,is_filing_day,days_since_filing,split_ratio,is_split_day,is_reverse_split_day,split_cum_factor
0,2022-10-03,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
1,2022-10-04,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
2,2022-10-05,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
3,2022-10-06,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
4,2022-10-07,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3845866,2025-09-24,ZYXI,1.55,1.59,1.50,1.50,1.50,80600,-0.019608,-0.019803,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,2.0,,False,False,1.0
3845867,2025-09-25,ZYXI,1.51,1.54,1.45,1.47,1.47,109300,-0.020000,-0.020203,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,3.0,,False,False,1.0
3845868,2025-09-26,ZYXI,1.47,1.50,1.44,1.45,1.45,54700,-0.013605,-0.013699,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,4.0,,False,False,1.0
3845869,2025-09-29,ZYXI,1.45,1.49,1.42,1.47,1.47,115800,0.013793,0.013699,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,7.0,,False,False,1.0


In [147]:
NEEDED = {'split_ratio','is_split_day','is_reverse_split_day','split_cum_factor'}
print("cols present:", NEEDED.issubset(analysis_enriched.columns))
print("shape:", analysis_enriched.shape)
analysis_enriched.loc[analysis_enriched['is_split_day'], 
                      ['ticker','date','split_ratio','is_reverse_split_day']].head()

cols present: True
shape: (3845871, 31)


Unnamed: 0,ticker,date,split_ratio,is_reverse_split_day
9231,AAON,2023-08-17,1.5,False
16757,ABAT,2023-09-11,0.066667,True
27813,ABTC,2022-11-08,0.05,True
28127,ABTC,2024-02-09,0.05,True
28518,ABTC,2025-09-03,0.2,True


In [148]:
from pathlib import Path
OUT = Path("outputs"); OUT.mkdir(exist_ok=True)
analysis_enriched.to_parquet(OUT / "analysis_enriched.parquet", index=False, compression="zstd")
analysis_enriched.to_csv(OUT / "analysis_enriched.csv", index=False)

Below two files are now your latest, “master” snapshots:
* outputs/analysis_enriched.parquet (preferred for speed & types)
* outputs/analysis_enriched.csv (useful for Excel/sharing)

In [149]:
import pandas as pd, pathlib as pl

P = pl.Path("outputs")
mem = analysis_enriched
pq  = pd.read_parquet(P / "analysis_enriched.parquet")
csv = pd.read_csv(P / "analysis_enriched.csv")

print("same columns (parquet)?", set(mem.columns) == set(pq.columns))
print("same row count (parquet)?", len(mem) == len(pq))
print("same columns (csv)?", set(mem.columns) == set(csv.columns))
print("same row count (csv)?", len(mem) == len(csv))

  csv = pd.read_csv(P / "analysis_enriched.csv")


same columns (parquet)? True
same row count (parquet)? True
same columns (csv)? True
same row count (csv)? True


In [150]:
from datetime import datetime
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
mem.to_parquet(P / f"analysis_enriched_{ts}.parquet", index=False, compression="zstd")

In [151]:
analysis_enriched

Unnamed: 0,date,ticker,open,high,low,close,adj_close,volume,ret,logret,...,float_shares,free_float,filing_form,last_filing_date,is_filing_day,days_since_filing,split_ratio,is_split_day,is_reverse_split_day,split_cum_factor
0,2022-10-03,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
1,2022-10-04,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
2,2022-10-05,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
3,2022-10-06,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
4,2022-10-07,AACB,,,,,,,,,...,2.214790e+07,2.214790e+07,,NaT,False,,,False,False,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3845866,2025-09-24,ZYXI,1.55,1.59,1.50,1.50,1.50,80600,-0.019608,-0.019803,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,2.0,,False,False,1.0
3845867,2025-09-25,ZYXI,1.51,1.54,1.45,1.47,1.47,109300,-0.020000,-0.020203,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,3.0,,False,False,1.0
3845868,2025-09-26,ZYXI,1.47,1.50,1.44,1.45,1.45,54700,-0.013605,-0.013699,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,4.0,,False,False,1.0
3845869,2025-09-29,ZYXI,1.45,1.49,1.42,1.47,1.47,115800,0.013793,0.013699,...,2.139729e+07,2.139729e+07,8-K,2025-09-22,False,7.0,,False,False,1.0
