In [20]:
import os, re, pandas as pd, numpy as np
from pandas.tseries.holiday import (
    AbstractHolidayCalendar, Holiday, nearest_workday, USFederalHolidayCalendar,
    GoodFriday, EasterMonday
)
from pandas.tseries.offsets import CustomBusinessDay

In [12]:
# builds: company name, symbol (acronym), date of listing, stock offering

dataset_dir = os.path.expanduser("~/csc1171/data/raw/AMEX_NYSE_NASDAQ_stock_histories")
symbols_file = os.path.join(dataset_dir, "all_symbols.txt")
big_file = os.path.join(dataset_dir, "fh_5yrs.csv")
output_file = os.path.expanduser("~/csc1171/notebooks/Dominik/clean_company_listings.csv")
full_dir = os.path.join(dataset_dir, "full_history")


with open(symbols_file, "r", encoding="utf-8") as f:
    symbols = [ln.strip().upper() for ln in f if ln.strip() and not ln.startswith("#")]
symset = set(symbols)
use_full = os.path.isdir(full_dir)

# build lookup maps from nasdaq symbol directory files
nasdaq_file = os.path.join(dataset_dir, "nasdaqlisted.txt")
other_file = os.path.join(dataset_dir, "otherlisted.txt")

name_map, exch_map = {}, {}

if os.path.isfile(nasdaq_file):
    nl = pd.read_csv(nasdaq_file, sep="|")
    if "Symbol" in nl.columns and "Security Name" in nl.columns:
        nl = nl[nl["Symbol"] != "File Creation Time"]
        for _, r in nl.iterrows():
            s = str(r["Symbol"]).upper()
            name_map[s] = str(r["Security Name"]).strip()
            exch_map[s] = "NASDAQ"

if os.path.isfile(other_file):
    ol = pd.read_csv(other_file, sep="|")
    col = {c.lower(): c for c in ol.columns}
    symcol = col.get("act symbol") or col.get("symbol")
    namecol = col.get("security name") or col.get("name")
    excol = col.get("exchange")
    if symcol and namecol and excol:
        ol = ol[ol[symcol] != "File Creation Time"]
        xmap = {"A": "AMEX", "N": "NYSE", "P": "NYSE ARCA", "Q": "NASDAQ", "Z": "BATS"}
        for _, r in ol.iterrows():
            s = str(r[symcol]).upper()
            name_map.setdefault(s, str(r[namecol]).strip())
            exch_raw = str(r[excol]).strip().upper()
            exch_map.setdefault(s, xmap.get(exch_raw, exch_raw))


def detect_cols(df):
    cols = {c.lower(): c for c in df.columns}
    s = cols.get("symbol") or cols.get("ticker")
    d = cols.get("date") or cols.get("timestamp")
    e = cols.get("exchange")
    n = cols.get("name") or cols.get("security name") or cols.get("company name")
    return s, d, e, n

min_date = {}
first_ex = {}
first_name = {}

probe = pd.read_csv(big_file, nrows=5)
scol, dcol, ecol, ncol = detect_cols(probe)

if use_full:
    for s in symbols:
        p = os.path.join(full_dir, f"{s}.csv")
        if os.path.isfile(p):
            hdr = pd.read_csv(p, nrows=1)
            dcol = next((c for c in hdr.columns if c.lower() == "date"), None)
            if dcol:
                d = pd.read_csv(p, usecols=[dcol])[dcol]
                d = pd.to_datetime(d, errors="coerce").dropna()
                if len(d):
                    min_date[s] = d.min()

else:
    probe = pd.read_csv(big_file, nrows=5)
    scol, dcol, ecol, ncol = detect_cols(probe)

    for chunk in pd.read_csv(big_file, usecols=[c for c in [scol, dcol, ecol, ncol] if c], chunksize=500000):
        chunk[scol] = chunk[scol].astype(str).str.upper()
        chunk = chunk[chunk[scol].isin(symset)]
        if not len(chunk):
            continue
        dt = pd.to_datetime(chunk[dcol], errors="coerce")
        chunk = chunk.loc[dt.notna()].copy()
        chunk["_dt"] = dt[dt.notna()]
        mins = chunk.groupby(scol)["_dt"].min()
        for s, d in mins.items():
            if (s not in min_date) or (d < min_date[s]):
                min_date[s] = d
        if ecol:
            for s, ex in chunk[[scol, ecol]].dropna().drop_duplicates(subset=[scol]).itertuples(index=False):
                if s not in first_ex:
                    first_ex[s] = str(ex).upper()
        if ncol:
            for s, nm in chunk[[scol, ncol]].dropna().drop_duplicates(subset=[scol]).itertuples(index=False):
                if s not in first_name:
                    first_name[s] = str(nm).strip()


rows = []
# for s in symbols:
#     rows.append({
#         "company name": first_name.get(s, name_map.get(s, "")),
#         "symbol (acronym)": s,
#         "date of listing": (min_date.get(s).date().isoformat() if s in min_date else ""),
#         "stock offering": first_ex.get(s, exch_map.get(s, "")),
#     })
for s in symbols:
    s_raw = s
    alts = {s_raw, s_raw.replace('.', '-'), s_raw.replace('-', '.')}
    rows.append({
        "company name": (first_name.get(s) or next((name_map[a] for a in alts if a in name_map), "")),
        "symbol (acronym)": s,
        "date of listing": (min_date.get(s).date().isoformat() if s in min_date else ""),
        "stock offering": (first_ex.get(s) or next((exch_map[a] for a in alts if a in exch_map), "")),
    })

pd.DataFrame(rows, columns=["company name","symbol (acronym)","date of listing","stock offering"]).to_csv(output_file, index=False)


In [21]:
# Fix names and dtypes

STANDARD_COLS = {
    "open":"open","o":"open",
    "high":"high","h":"high",
    "low":"low","l":"low",
    "close":"close","c":"close","last":"close","adjclose":"adj_close","adj_close":"adj_close","adjusted_close":"adj_close",
    "volume":"volume","vol":"volume","shares_traded":"volume",
    "date":"date","timestamp":"date","time":"date","trade_date":"date",
    "symbol":"symbol","ticker":"symbol",
    "exchange":"exchange","market":"exchange","stock_offering":"exchange",
    "currency":"currency"
}

STANDARD_COLS.update({
    "adj_close":"adj_close",
    "adj_close_":"adj_close",
    "adj__close":"adj_close",
})

PRICE_COLS = ["open","high","low","close","adj_close"]
INT_COLS = ["volume"]
CAT_COLS = ["exchange","currency"]
REQ_DATE = "date"
REQ_SYMBOL = "symbol"

def _snake(s:str)->str:
    s = re.sub(r"[^\w]+","_", s.strip().lower())
    s = re.sub(r"_+","_", s).strip("_")
    return s

def normalize_columns(df:pd.DataFrame)->pd.DataFrame:
    colmap = {}
    for c in df.columns:
        k = _snake(c)
        k = STANDARD_COLS.get(k, k)
        colmap[c] = k
    df = df.rename(columns=colmap)
    if len(set(df.columns)) != len(df.columns):
        agg = {}
        for c in df.columns:
            if c not in agg: agg[c] = lambda x: x.bfill().ffill().iloc[0] if hasattr(x,"bfill") else x
        df = df.groupby(axis=1, level=0).first()
    return df

# def parse_date_col(df:pd.DataFrame)->pd.DataFrame:
#     if REQ_DATE not in df.columns: return df
#     # coerce strings like 'YYYY-MM-DD', 'MM/DD/YYYY', 'YYYY-MM-DD HH:MM:SS', etc.
#     dt = pd.to_datetime(df[REQ_DATE], errors="coerce", utc=True)
#     # if parsed tz-naive, convert to UTC; if already tz-aware, keep UTC
#     if dt.dtype == "datetime64[ns, UTC]":
#         df[REQ_DATE] = dt
#     else:
#         df[REQ_DATE] = pd.to_datetime(df[REQ_DATE], errors="coerce").dt.tz_localize("UTC")
#     return df

def parse_date_col(df: pd.DataFrame) -> pd.DataFrame:
    if REQ_DATE not in df.columns:
        return df
    dt = pd.to_datetime(df[REQ_DATE], errors="coerce", utc=False)
    # If tz-aware, convert to UTC; if naive, localize to UTC.
    if getattr(dt.dt, "tz", None) is not None:
        dt = dt.dt.tz_convert("UTC")
    else:
        dt = dt.dt.tz_localize("UTC")
    df[REQ_DATE] = dt
    return df


def coerce_numeric(df:pd.DataFrame)->pd.DataFrame:
    for c in PRICE_COLS:
        if c in df.columns:
            df[c] = (df[c].astype(str).str.replace(",","",regex=False)
                              .str.replace(" ","",regex=False)
                              .str.replace("$","",regex=False)
                              .replace(["", "nan","None"], np.nan)
                              .astype(float))
    for c in INT_COLS:
        if c in df.columns:
            ser = df[c].astype(str).str.replace(",","",regex=False).str.replace(" ","",regex=False)
            ser = ser.replace(["", "nan","None"], np.nan)

            ser = pd.to_numeric(ser, errors="coerce")
            if ser.notna().any() and (ser.dropna() % 1 == 0).all():
                df[c] = ser.astype("Int64")
            else:
                df[c] = ser.astype("Int64")
    return df

def tidy_symbol(df:pd.DataFrame, symbol_hint:str|None=None)->pd.DataFrame:
    if REQ_SYMBOL not in df.columns and symbol_hint:
        df[REQ_SYMBOL] = symbol_hint
    if REQ_SYMBOL in df.columns:
        df[REQ_SYMBOL] = (df[REQ_SYMBOL].astype(str).str.upper().str.strip()
                          .str.replace("\u200b","",regex=False))
        df = df[df[REQ_SYMBOL]!=""]
    return df

def cat_types(df:pd.DataFrame)->pd.DataFrame:
    for c in CAT_COLS:
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip()
            df.loc[df[c].isin(["", "nan","None"]), c] = np.nan
            df[c] = df[c].astype("category")
    return df

def finalize_schema(df:pd.DataFrame)->pd.DataFrame:
    # Ensure all price cols exist even if missing in source
    for c in PRICE_COLS:
        if c not in df.columns: df[c] = np.nan
    for c in INT_COLS:
        if c not in df.columns: df[c] = pd.Series(pd.array([pd.NA]*len(df), dtype="Int64"))
    if "adj_close" in df.columns and df["adj_close"].isna().all() and "close" in df.columns:
        df["adj_close"] = df["close"].astype("float64")
    if REQ_DATE in df.columns:
        df = df[df[REQ_DATE].notna()]
    if REQ_SYMBOL in df.columns:
        df = df[df[REQ_SYMBOL].notna() & (df[REQ_SYMBOL]!="")]
    keep_order = [x for x in [REQ_DATE,REQ_SYMBOL,"exchange","currency","open","high","low","close","adj_close","volume"] if x in df.columns]
    others = [c for c in df.columns if c not in keep_order]
    df = df[keep_order+others]
    df = df.drop_duplicates(subset=[c for c in [REQ_DATE, REQ_SYMBOL] if c in df.columns]).sort_values(by=[c for c in [REQ_DATE, REQ_SYMBOL] if c in df.columns])
    # Assert dtypes
    for c in PRICE_COLS:
        if c in df.columns: df[c] = df[c].astype("float64")
    if "volume" in df.columns: df["volume"] = df["volume"].astype("Int64")
    if "symbol" in df.columns: df["symbol"] = df["symbol"].astype("object")
    if "source" in df.columns: df["source"] = df["source"].astype("object")
    if "date" in df.columns and df["date"].dtype.tz is None:
        df["date"] = df["date"].dt.tz_localize("UTC")
    return df.reset_index(drop=True)

def looks_like_returns_matrix(df: pd.DataFrame) -> bool:
    """
    Heuristic: has 'date' but no OHLC columns, and >3 other columns (strategies).
    Good for HF.csv (EDHEC) style monthly returns panel.
    """
    cols = set(df.columns.str.lower())
    has_date = "date" in cols
    has_ohlc = bool(cols & {"open","high","low","close","adj_close","volume","symbol"})
    many_wide_cols = (len(cols) >= 5) # date + many strategy columns
    return has_date and (not has_ohlc) and many_wide_cols

def clean_edhec_returns(df_raw: pd.DataFrame, source: str = "EDHEC") -> pd.DataFrame:
    df = df_raw.rename(columns={str(c): str(c).strip() for c in df_raw.columns})
    # Parse date to UTC
    df = df.rename(columns={"date":"date"})
    df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True)
    df = df[df["date"].notna()].copy()

    value_cols = [c for c in df.columns if c != "date"]
    df = df.melt(id_vars=["date"], value_vars=value_cols,
                 var_name="symbol", value_name="ret")

    df["symbol"] = (df["symbol"].astype(str)
                    .str.replace(".", "_", regex=False)
                    .str.strip().str.upper())
    df["ret"] = pd.to_numeric(df["ret"], errors="coerce").astype("float64")
    df = df.dropna(subset=["symbol","ret"]).sort_values(["symbol","date"]).reset_index(drop=True)
    df["source"] = source
    df["symbol"] = df["symbol"].astype("object")
    df["source"] = df["source"].astype("object")
    # Ensure tz-aware UTC (pd.to_datetime(utc=True) already does, but belt & braces)
    if getattr(df["date"].dt, "tz", None) is None:
        df["date"] = df["date"].dt.tz_localize("UTC")
    return df[["date","symbol","ret","source"]]

def clean_any(df: pd.DataFrame, source: str, symbol_hint: str | None = None) -> pd.DataFrame:
    cols_lower = set(map(str.lower, df.columns))
    if looks_like_returns_matrix(df.rename(columns=str.lower)):
        return clean_edhec_returns(df, source=source)
    return clean_names_and_dtypes(df, source=source, symbol_hint=symbol_hint)

def clean_names_and_dtypes(df:pd.DataFrame, source:str, symbol_hint:str|None=None)->pd.DataFrame:
    df = normalize_columns(df)
    df = tidy_symbol(df, symbol_hint)
    df = parse_date_col(df)
    df = coerce_numeric(df)
    df = cat_types(df)
    df["source"] = source
    return finalize_schema(df)


In [27]:
# Test names dtypes 

# # 1) Load
# p = os.path.expanduser("~/csc1171/data/raw/SP500_ETF_FX_Crypto_Daily/AAPL.csv")
# df_raw = pd.read_csv(p)

# # 2) Clean
# df_clean = clean_names_and_dtypes(df_raw, source="Yahoo", symbol_hint="AAPL")

# # 3) Inspect schema & a few rows
# print(df_clean.dtypes)
# print(df_clean.head(3))
# print(df_clean[['date','symbol']].isna().sum())
# print(df_clean[['open','high','low','close','adj_close','volume']].describe())

# p = "~/csc1171/data/raw/Global_Stock_Market_2008-2023/2008_Globla_Markets_Data.csv"
# df = pd.read_csv(os.path.expanduser(p))
# df = clean_names_and_dtypes(df, source="Global")
# print(df.dtypes)

# folder = os.path.expanduser("~/csc1171/data/raw/SP500_ETF_FX_Crypto_Daily")
# fn = "JSM.csv"
# df = pd.read_csv(os.path.join(folder, fn))
# df = clean_names_and_dtypes(df, source="Yahoo", symbol_hint=os.path.splitext(fn)[0].upper())
# print(df.dtypes)

# p = "~/csc1171/data/raw/AMEX_NYSE_NASDAQ_stock_histories/fh_5yrs.csv"
# df = pd.read_csv(os.path.expanduser(p))
# df = clean_names_and_dtypes(df, source="FH5")
# print(df.dtypes)

# import os, pandas as pd

# # make wide prints readable
# pd.set_option("display.width", None)
# pd.set_option("display.max_columns", 50)

# def preview(df: pd.DataFrame, name: str):
#     print(f"\n {name}")
#     print("shape:", df.shape)
#     print("dtypes:\n", df.dtypes.to_string())
#     print("\nhead(10):")
#     print(df.head(10))

# # Global (prices)
# p = os.path.expanduser("~/csc1171/data/raw/Global_Stock_Market_2008-2023/2008_Globla_Markets_Data.csv")
# df = pd.read_csv(p)
# df = clean_any(df, source="Global")
# preview(df, "Global")

# # Yahoo single-ticker (prices)
# folder = os.path.expanduser("~/csc1171/data/raw/SP500_ETF_FX_Crypto_Daily")
# fn = "JSM.csv"
# df = pd.read_csv(os.path.join(folder, fn))
# df = clean_any(df, source="Yahoo", symbol_hint=os.path.splitext(fn)[0].upper())
# preview(df, "Yahoo (JSM)")

# # FH panel (prices)
# p = os.path.expanduser("~/csc1171/data/raw/AMEX_NYSE_NASDAQ_stock_histories/fh_5yrs.csv")
# df = pd.read_csv(p)
# df = clean_any(df, source="FH5")
# preview(df, "FH5")

# # EDHEC (returns)
# p = os.path.expanduser("~/csc1171/data/raw/EDHEC_Hedge_Fund_Returns/HF.csv")
# df = pd.read_csv(p)
# df = clean_any(df, source="EDHEC")
# preview(df, "EDHEC (returns)")

# FH panel (prices)
p = os.path.expanduser("~/csc1171/data/raw/AMEX_NYSE_NASDAQ_stock_histories/fh_5yrs.csv")
df = clean_any(pd.read_csv(p), source="FH5")
print("\nFH5", df.shape); print(df.dtypes); print(df.head(5))

# EDHEC (returns)
p = os.path.expanduser("~/csc1171/data/raw/EDHEC_Hedge_Fund_Returns/HF.csv")
df = clean_any(pd.read_csv(p), source="EDHEC")
print("\n EDHEC", df.shape); print(df.dtypes); print(df.head(5))




=== FH5 === (6852038, 9)
date         datetime64[ns, UTC]
symbol                    object
open                     float64
high                     float64
low                      float64
close                    float64
adj_close                float64
volume                     Int64
source                    object
dtype: object
                       date symbol        open        high         low  \
0 2015-01-02 00:00:00+00:00   AADR   37.250000   37.250000   36.639999   
1 2015-01-02 00:00:00+00:00    AAL   54.279999   54.599998   53.070000   
2 2015-01-02 00:00:00+00:00   AAMC  308.000000  348.589996  308.000000   
3 2015-01-02 00:00:00+00:00   AAME    3.990000    4.030000    3.980000   
4 2015-01-02 00:00:00+00:00    AAN   30.809999   30.860001   30.040001   

        close   adj_close    volume source  
0   36.639999   35.399769      2000    FH5  
1   53.910000   51.079918  10748600    FH5  
2  327.179993  327.179993     11500    FH5  
3    4.030000    3.917722     11400   

In [39]:
# align offical trading calanders

# Minimal NYSE holiday calendar (covers core exchange closures)
class NYSEHolidayCalendar(AbstractHolidayCalendar):
    rules = [
        # New Year's Day
        Holiday("NewYearsDay", month=1, day=1, observance=nearest_workday),
        # Martin Luther King Jr. Day (since 1998)
        Holiday("MLK", month=1, day=1, offset=pd.DateOffset(weekday=pd.offsets.WeekOfMonth(week=2, weekday=0)), start_date="1998-01-01"),
        # Washington's Birthday (Presidents' Day) – 3rd Monday in Feb
        Holiday("PresidentsDay", month=2, day=1, offset=pd.DateOffset(weekday=pd.offsets.WeekOfMonth(week=2, weekday=0))),
        # Good Friday (NYSE closed)
        GoodFriday,
        # Memorial Day – last Monday in May
        Holiday("MemorialDay", month=5, day=31, offset=pd.DateOffset(weekday=pd.offsets.Week(weekday=0))),
        # Juneteenth (since 2022), nearest workday
        Holiday("Juneteenth", month=6, day=19, observance=nearest_workday, start_date="2022-06-19"),
        # Independence Day
        Holiday("IndependenceDay", month=7, day=4, observance=nearest_workday),
        # Labor Day – first Monday in September
        Holiday("LaborDay", month=9, day=1, offset=pd.DateOffset(weekday=pd.offsets.WeekOfMonth(week=0, weekday=0))),
        # Thanksgiving – fourth Thursday in November
        Holiday("Thanksgiving", month=11, day=1, offset=pd.DateOffset(weekday=pd.offsets.WeekOfMonth(week=3, weekday=3))),
        # Christmas Day
        Holiday("Christmas", month=12, day=25, observance=nearest_workday),
    ]

# build a sessions index for [start, end] inclusive
# def trading_sessions(start: pd.Timestamp, end: pd.Timestamp, calendar: str = "NYSE", tz: str = "UTC") -> pd.DatetimeIndex:
#     cal = NYSEHolidayCalendar() if calendar.upper() == "NYSE" else NYSEHolidayCalendar()
#     cbd = CustomBusinessDay(calendar=cal)
#     idx = pd.date_range(start=start.normalize(), end=end.normalize(), freq=cbd)
#     # lock tz
#     idx = pd.to_datetime(idx).tz_localize("UTC").tz_convert(tz)
#     return idx
def trading_sessions(start: pd.Timestamp, end: pd.Timestamp, calendar: str = "NYSE", tz: str = "UTC") -> pd.DatetimeIndex:
    cal = NYSEHolidayCalendar() if calendar.upper() == "NYSE" else NYSEHolidayCalendar()
    cbd = CustomBusinessDay(calendar=cal)
    idx = pd.date_range(start=start.normalize(), end=end.normalize(), freq=cbd)
    if getattr(idx, "tz", None) is None:
        idx = idx.tz_localize("UTC")
    else:
        idx = idx.tz_convert("UTC")
    return idx.tz_convert(tz)

_OHLCV_AGG = {
    "open": "first",
    "high": "max",
    "low": "min",
    "close": "last",
    "adj_close": "last",
    "volume": "sum",
}

def _ensure_tz_utc(dt: pd.Series) -> pd.Series:
    if getattr(dt.dt, "tz", None) is not None:
        return dt.dt.tz_convert("UTC")
    else:
        return dt.dt.tz_localize("UTC")

def _dedupe_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    keys = [c for c in ["date", "symbol"] if c in df.columns]
    if not keys:
        return df
    agg = {c: fn for c, fn in _OHLCV_AGG.items() if c in df.columns}
    others = [c for c in df.columns if c not in set(list(agg.keys()) + keys)]
    agg.update({c: "first" for c in others})
    out = (df.groupby(keys, as_index=False).agg(agg)
             .sort_values(keys)
             .reset_index(drop=True))
    return out

def align_to_trading_calendar(
    df: pd.DataFrame,
    calendar: str = "NYSE",
    tz: str = "UTC",
    fill: dict | None = None,
    limit_fill: int | None = None,
    clip_to_observed_span: bool = True
) -> pd.DataFrame:
    if "date" not in df.columns:
        return df.copy()
    work = df.copy()
    work["date"] = _ensure_tz_utc(pd.to_datetime(work["date"], errors="coerce"))
    work = work[work["date"].notna()]
    if "symbol" in work.columns:
        work["symbol"] = work["symbol"].astype(str)

    work["date"] = work["date"].dt.tz_convert("UTC").dt.normalize()

    if work.duplicated(subset=["symbol","date"], keep=False).sum() == 0:
        print("[WARN] No (symbol,date) duplicates pre-dedupe; upstream may have dropped intraday rows already.")

    work = _dedupe_ohlcv(work)
    if clip_to_observed_span and "symbol" in work.columns:
        spans = (work.groupby("symbol")["date"].agg(["min", "max"])
                 .rename(columns={"min":"start","max":"end"}))
        frames = []
        for sym, (start, end) in spans.iterrows():
            sess = trading_sessions(start, end, calendar=calendar, tz="UTC")
            g = work[work["symbol"] == sym].set_index("date").sort_index()
            g = g.reindex(sess)
            g["symbol"] = sym
            frames.append(g)
        aligned = (pd.concat(frames, axis=0)
                     .reset_index()
                     .rename(columns={"index":"date"}))
    else:
        start, end = work["date"].min(), work["date"].max()
        sess = trading_sessions(start, end, calendar=calendar, tz="UTC")
        aligned = (work.set_index("date").sort_index()
                        .groupby("symbol", group_keys=True)
                        .apply(lambda g: g.reindex(sess))
                        .reset_index(level=0))
        aligned = aligned.reset_index().rename(columns={"index":"date"})

    if fill:
        for col, how in fill.items():
            if col not in aligned.columns:
                continue
            if how == "ffill":
                aligned[col] = (aligned.groupby("symbol", dropna=False)[col]
                                .ffill(limit=limit_fill))
            elif how == "bfill":
                aligned[col] = (aligned.groupby("symbol", dropna=False)[col]
                                .bfill(limit=limit_fill))
            elif how == "fillna0":
                aligned[col] = aligned[col].fillna(0)

    keep_order = [c for c in ["date","symbol","exchange","currency","open","high","low","close","adj_close","volume"] if c in aligned.columns]
    other_cols = [c for c in aligned.columns if c not in keep_order]
    aligned = aligned[keep_order + other_cols]

    aligned["date"] = aligned["date"].dt.tz_convert(tz)

    sort_keys = [c for c in ["symbol","date"] if c in aligned.columns]
    aligned = aligned.sort_values(sort_keys).reset_index(drop=True)

    for c in ["open","high","low","close","adj_close"]:
        if c in aligned.columns:
            aligned[c] = aligned[c].astype("float64")
    if "volume" in aligned.columns:
        aligned["volume"] = pd.array(aligned["volume"], dtype="Int64")

    return aligned




In [41]:
# Testing (align offical trading calanders)

# synthetic sample (weekend, holiday, duplicates)
raw = pd.DataFrame({
    "date": [
        "2024-07-03 16:00:00", # ABC dup 1
        "2024-07-03 10:00:00-04:00", # ABC dup 2 (same calendar day)
        "2024-07-04 16:00:00", # NYSE holiday (Independence Day)
        "2024-07-05 16:00:00", # trading day
        "2024-07-06 16:00:00", # Saturday
        "2024-07-05 09:30:00-04:00", # XYZ trading day
    ],
    "symbol": ["ABC","ABC","ABC","ABC","ABC","XYZ"],
    "open": [10, 10.5, 11, 12, 12.5, 20],
    "high": [11, 11.5, 12, 13, 13.5, 21],
    "low": [ 9, 9.5, 10, 11, 11.5, 19],
    "close": [10.8,10.9, 11.8, 12.8,12.9, 20.5],
    "adj_close":[10.8,10.9,11.8,12.8,12.9,20.5],
    "volume": [100, 150, 200, 300, 400, 500],
})

print(raw)

aligned = align_to_trading_calendar(raw, calendar="NYSE", tz="UTC")

print("\n(UTC, NYSE sessions only)")
print(aligned)

print("\nContains holiday 2024-07-04 ->", any("2024-07-04" in s for s in aligned["date"].astype(str)))
print("Contains Saturday 2024-07-06 ->", any("2024-07-06" in s for s in aligned["date"].astype(str)))

row_703 = aligned[(aligned["symbol"]=="ABC") & (aligned["date"].astype(str).str.startswith("2024-07-03"))]
if not row_703.empty:
    r = row_703.iloc[0]
    print("\nABC 2024-07-03 -> open,high,low,close,volume:", r["open"], r["high"], r["low"], r["close"], r["volume"])
else:
    print("\nABC 2024-07-03 row not found")


=== RAW INPUT ===
                        date symbol  open  high   low  close  adj_close  \
0        2024-07-03 16:00:00    ABC  10.0  11.0   9.0   10.8       10.8   
1  2024-07-03 10:00:00-04:00    ABC  10.5  11.5   9.5   10.9       10.9   
2        2024-07-04 16:00:00    ABC  11.0  12.0  10.0   11.8       11.8   
3        2024-07-05 16:00:00    ABC  12.0  13.0  11.0   12.8       12.8   
4        2024-07-06 16:00:00    ABC  12.5  13.5  11.5   12.9       12.9   
5  2024-07-05 09:30:00-04:00    XYZ  20.0  21.0  19.0   20.5       20.5   

   volume  
0     100  
1     150  
2     200  
3     300  
4     400  
5     500  
[WARN] No (symbol,date) duplicates pre-dedupe; upstream may have dropped intraday rows already.

=== AFTER ALIGNMENT (UTC, NYSE sessions only) ===
                       date symbol  open  high   low  close  adj_close  volume
0 2024-07-03 00:00:00+00:00    ABC  10.0  11.0   9.0   10.8       10.8     100
1 2024-07-05 00:00:00+00:00    ABC  12.0  13.0  11.0   12.8       1

In [51]:
# Trading halts (same logic, with step-by-step prints) — FIXED
def annotate_trading_halts(
    df: pd.DataFrame,
    price_cols=("open","high","low","close","adj_close"),
    symbol_col="symbol",
    date_col="date",
    market_open_mask: pd.Series | None = None,
    span_mode: str = "since_first",
    verbose: bool = True,
) -> pd.DataFrame:
    if verbose:
        print("\n annotate_trading_halts(): START")
        print(f"Input rows: {len(df):,} | symbols: {df[symbol_col].nunique() if symbol_col in df else 'NA'}")
        if date_col in df:
            print(f"Date span: {df[date_col].min()} -> {df[date_col].max()}")

    out = df.copy()

    any_price = out[list(price_cols)].notna().any(axis=1)
    if verbose:
        print(f"Rows with ANY price present: {int(any_price.sum()):,}")

    sym_first = out.loc[any_price].groupby(symbol_col)[date_col].min()
    sym_last = out.loc[any_price].groupby(symbol_col)[date_col].max()
    if verbose:
        print(f"Symbols with ≥1 traded row: {len(sym_first):,}")

    out["_first_trade"] = out[symbol_col].map(sym_first)
    out["_last_trade"] = out[symbol_col].map(sym_last)

    if span_mode == "first_to_last":
        if verbose: print('Span mode = "first_to_last"')
        in_active_span = (
            out["_first_trade"].notna()
            & out["_last_trade"].notna()
            & (out[date_col] >= out["_first_trade"])
            & (out[date_col] <= out["_last_trade"])
        )
    else:
        if verbose: print('Span mode = "since_first"')
        in_active_span = out["_first_trade"].notna() & (out[date_col] >= out["_first_trade"])

    if verbose:
        print(f"in_active_span = True rows: {int(in_active_span.sum()):,}")

    no_prices = out[list(price_cols)].isna().all(axis=1)
    if verbose:
        print(f"Rows where ALL price cols are NaN: {int(no_prices.sum()):,}")

    if market_open_mask is not None:
        mok = market_open_mask.copy()
        mok.index = pd.to_datetime(mok.index).tz_convert(out[date_col].dt.tz)
        mapped = mok.reindex(out[date_col].values) # align by date values
        mapped.index = out.index # align row index 1:1
        open_flag = mapped.fillna(True).infer_objects(copy=False).astype(bool)
        if verbose:
            print(f"market_open_mask provided. True rows: {int(open_flag.sum()):,}")
    else:
        open_flag = pd.Series(True, index=out.index)
        if verbose:
            print("market_open_mask not provided assuming all trading-session dates were open")

    out["is_halt"] = in_active_span & no_prices & open_flag
    if verbose:
        print(f"HALT rows flagged (is_halt=True): {int(out['is_halt'].sum()):,}")

    prev = out.groupby(symbol_col)["is_halt"].shift(fill_value=False)
    block_start = out["is_halt"] & ~prev
    out["halt_block_id"] = block_start.groupby(out[symbol_col]).cumsum()
    out.loc[~out["is_halt"], "halt_block_id"] = pd.NA
    out["halt_len"] = (
        out[out["is_halt"]]
        .groupby([symbol_col, "halt_block_id"])[date_col]
        .transform("count")
    ).fillna(0).astype("Int64")
    out = out.drop(columns=["_first_trade","_last_trade"])

    if verbose:
        sym_halts = (out.groupby(symbol_col)["is_halt"].sum()).sort_values(ascending=False)
        syms_with_halts = sym_halts[sym_halts > 0].index.tolist()
        if syms_with_halts:
            print("Halt summary")
            for s in syms_with_halts[:8]:
                sub = out[(out[symbol_col]==s) & (out["is_halt"])]
                blocks = sub.groupby("halt_block_id")[date_col].agg(["min","max","count"])
                total = int(sub["is_halt"].sum())
                print(f" {s}: {total} halt day(s), {len(blocks)} block(s)")
                for _, r in blocks.reset_index(drop=True).iterrows():
                    print(f"{r['min']} -> {r['max']}  (len={int(r['count'])})")
    return out


In [52]:
# Test: annotate_trading_halts on a small panel

# Synthetic intraday-ish data across an NYSE holiday (Jul 4) + weekend,
# with one *halt* day for ABC (all prices missing on a real session) while XYZ trades.

raw = pd.DataFrame({
    "date": [
        "2024-07-02 16:00:00", # both trade
        "2024-07-03 16:00:00", # both trade
        "2024-07-04 16:00:00", # NYSE holiday (should be removed by calendar)
        "2024-07-05 10:00:00", # XYZ trades; ABC halted (no rows for ABC -> will be NaN after reindex)
        "2024-07-06 16:00:00", # Saturday (removed)
        "2024-07-08 16:00:00", # both trade
    ],
    "symbol": ["ABC","ABC","ABC","XYZ","ABC","ABC"],
    "open": [10, 10.5, 11, 20, 12.5, 13.0],
    "high": [11, 11.5, 12, 21, 13.5, 14.0],
    "low": [ 9, 9.5, 10, 19, 11.5, 12.0],
    "close": [10.8,10.9, 11.8, 20.5,12.9, 13.5],
    "adj_close":[10.8,10.9,11.8,20.5,12.9,13.5],
    "volume": [100, 150, 200, 500, 400, 300],
})

aligned = align_to_trading_calendar(raw, calendar="NYSE", tz="UTC")
def build_market_open_mask(panel, date_col="date", price_cols=("open","high","low","close","adj_close")):
    return panel.groupby(date_col)[list(price_cols)].apply(lambda x: x.notna().any().any())

mopen = build_market_open_mask(aligned)
halted = annotate_trading_halts(aligned, market_open_mask=mopen)
view = halted.loc[
    (halted["date"].astype(str) >= "2024-07-02") & (halted["date"].astype(str) <= "2024-07-08"),
    ["date","symbol","open","high","low","close","adj_close","volume","is_halt","halt_block_id","halt_len"]
].sort_values(["date","symbol"]).reset_index(drop=True)

[WARN] No (symbol,date) duplicates pre-dedupe; upstream may have dropped intraday rows already.

---- annotate_trading_halts(): START ----
Input rows: 5 | symbols: 2
Date span: 2024-07-02 00:00:00+00:00  ->  2024-07-08 00:00:00+00:00
Rows with ANY price present: 4
Symbols with ≥1 traded row: 2
Span mode = "since_first"
in_active_span = True rows: 5
Rows where ALL price cols are NaN: 1
market_open_mask provided. True rows: 5
HALT rows flagged (is_halt=True): 1
-- Halt summary (per symbol) --
  ABC: 1 halt day(s), 1 block(s)
    2024-07-05 00:00:00+00:00 -> 2024-07-05 00:00:00+00:00  (len=1)
---- annotate_trading_halts(): END ----

=== Aligned panel with halt annotations (focus window) ===
                     date symbol  open  high  low  close  adj_close  volume  is_halt  halt_block_id  halt_len
2024-07-02 00:00:00+00:00    ABC  10.0  11.0  9.0   10.8       10.8     100    False            NaN      <NA>
2024-07-03 00:00:00+00:00    ABC  10.5  11.5  9.5   10.9       10.9     150    Fals

  open_flag = mapped.fillna(True).infer_objects(copy=False).astype(bool)
